1use async_trait::async_trait;
2use chrono::{DateTime, Utc};
3use http::StatusCode;
4use serde_json::{json, Value};
5
6use std::collections::HashMap;
7use std::sync::Arc;
8
9use fakecloud_aws::arn::Arn;
10use fakecloud_core::delivery::DeliveryBus;
11use fakecloud_core::pagination::paginate;
12use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
13use fakecloud_core::validation::*;
14
15use fakecloud_lambda::runtime::ContainerRuntime;
16use fakecloud_lambda::state::{LambdaInvocation, SharedLambdaState};
17use fakecloud_logs::state::SharedLogsState;
18
19use crate::state::{
20 ApiDestination, Archive, Connection, Endpoint, EventBus, EventRule, EventTarget,
21 PartnerEventSource, PutEvent, Replay, SharedEventBridgeState,
22};
23
24pub struct EventBridgeService {
25 state: SharedEventBridgeState,
26 delivery: Arc<DeliveryBus>,
27 lambda_state: Option<SharedLambdaState>,
28 logs_state: Option<SharedLogsState>,
29 container_runtime: Option<Arc<ContainerRuntime>>,
30}
31
32impl EventBridgeService {
33 pub fn new(state: SharedEventBridgeState, delivery: Arc<DeliveryBus>) -> Self {
34 Self {
35 state,
36 delivery,
37 lambda_state: None,
38 logs_state: None,
39 container_runtime: None,
40 }
41 }
42
43 pub fn with_lambda(mut self, lambda_state: SharedLambdaState) -> Self {
44 self.lambda_state = Some(lambda_state);
45 self
46 }
47
48 pub fn with_logs(mut self, logs_state: SharedLogsState) -> Self {
49 self.logs_state = Some(logs_state);
50 self
51 }
52
53 pub fn with_runtime(mut self, runtime: Arc<ContainerRuntime>) -> Self {
54 self.container_runtime = Some(runtime);
55 self
56 }
57}
58
59#[async_trait]
60impl AwsService for EventBridgeService {
61 fn service_name(&self) -> &str {
62 "events"
63 }
64
65 async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
66 match req.action.as_str() {
67 "CreateEventBus" => self.create_event_bus(&req),
68 "DeleteEventBus" => self.delete_event_bus(&req),
69 "ListEventBuses" => self.list_event_buses(&req),
70 "DescribeEventBus" => self.describe_event_bus(&req),
71 "PutRule" => self.put_rule(&req),
72 "DeleteRule" => self.delete_rule(&req),
73 "ListRules" => self.list_rules(&req),
74 "DescribeRule" => self.describe_rule(&req),
75 "EnableRule" => self.enable_rule(&req),
76 "DisableRule" => self.disable_rule(&req),
77 "PutTargets" => self.put_targets(&req),
78 "RemoveTargets" => self.remove_targets(&req),
79 "ListTargetsByRule" => self.list_targets_by_rule(&req),
80 "ListRuleNamesByTarget" => self.list_rule_names_by_target(&req),
81 "PutEvents" => self.put_events(&req),
82 "PutPermission" => self.put_permission(&req),
83 "RemovePermission" => self.remove_permission(&req),
84 "TagResource" => self.tag_resource(&req),
85 "UntagResource" => self.untag_resource(&req),
86 "ListTagsForResource" => self.list_tags_for_resource(&req),
87 "CreateArchive" => self.create_archive(&req),
88 "DescribeArchive" => self.describe_archive(&req),
89 "ListArchives" => self.list_archives(&req),
90 "UpdateArchive" => self.update_archive(&req),
91 "DeleteArchive" => self.delete_archive(&req),
92 "CreateConnection" => self.create_connection(&req),
93 "DescribeConnection" => self.describe_connection(&req),
94 "ListConnections" => self.list_connections(&req),
95 "UpdateConnection" => self.update_connection(&req),
96 "DeleteConnection" => self.delete_connection(&req),
97 "CreateApiDestination" => self.create_api_destination(&req),
98 "DescribeApiDestination" => self.describe_api_destination(&req),
99 "ListApiDestinations" => self.list_api_destinations(&req),
100 "UpdateApiDestination" => self.update_api_destination(&req),
101 "DeleteApiDestination" => self.delete_api_destination(&req),
102 "StartReplay" => self.start_replay(&req),
103 "DescribeReplay" => self.describe_replay(&req),
104 "ListReplays" => self.list_replays(&req),
105 "CancelReplay" => self.cancel_replay(&req),
106 "CreatePartnerEventSource" => self.create_partner_event_source(&req),
107 "DeletePartnerEventSource" => self.delete_partner_event_source(&req),
108 "DescribePartnerEventSource" => self.describe_partner_event_source(&req),
109 "ListPartnerEventSources" => self.list_partner_event_sources(&req),
110 "ListPartnerEventSourceAccounts" => self.list_partner_event_source_accounts(&req),
111 "ActivateEventSource" => self.activate_event_source(&req),
112 "DeactivateEventSource" => self.deactivate_event_source(&req),
113 "DescribeEventSource" => self.describe_event_source(&req),
114 "ListEventSources" => self.list_event_sources(&req),
115 "PutPartnerEvents" => self.put_partner_events(&req),
116 "TestEventPattern" => self.test_event_pattern(&req),
117 "UpdateEventBus" => self.update_event_bus(&req),
118 "CreateEndpoint" => self.create_endpoint(&req),
119 "DeleteEndpoint" => self.delete_endpoint(&req),
120 "DescribeEndpoint" => self.describe_endpoint(&req),
121 "ListEndpoints" => self.list_endpoints(&req),
122 "UpdateEndpoint" => self.update_endpoint(&req),
123 "DeauthorizeConnection" => self.deauthorize_connection(&req),
124 _ => Err(AwsServiceError::action_not_implemented(
125 "events",
126 &req.action,
127 )),
128 }
129 }
130
131 fn supported_actions(&self) -> &[&str] {
132 &[
133 "CreateEventBus",
134 "DeleteEventBus",
135 "ListEventBuses",
136 "DescribeEventBus",
137 "PutRule",
138 "DeleteRule",
139 "ListRules",
140 "DescribeRule",
141 "EnableRule",
142 "DisableRule",
143 "PutTargets",
144 "RemoveTargets",
145 "ListTargetsByRule",
146 "ListRuleNamesByTarget",
147 "PutEvents",
148 "PutPermission",
149 "RemovePermission",
150 "TagResource",
151 "UntagResource",
152 "ListTagsForResource",
153 "CreateArchive",
154 "DescribeArchive",
155 "ListArchives",
156 "UpdateArchive",
157 "DeleteArchive",
158 "CreateConnection",
159 "DescribeConnection",
160 "ListConnections",
161 "UpdateConnection",
162 "DeleteConnection",
163 "CreateApiDestination",
164 "DescribeApiDestination",
165 "ListApiDestinations",
166 "UpdateApiDestination",
167 "DeleteApiDestination",
168 "StartReplay",
169 "DescribeReplay",
170 "ListReplays",
171 "CancelReplay",
172 "CreatePartnerEventSource",
173 "DeletePartnerEventSource",
174 "DescribePartnerEventSource",
175 "ListPartnerEventSources",
176 "ListPartnerEventSourceAccounts",
177 "ActivateEventSource",
178 "DeactivateEventSource",
179 "DescribeEventSource",
180 "ListEventSources",
181 "PutPartnerEvents",
182 "TestEventPattern",
183 "UpdateEventBus",
184 "CreateEndpoint",
185 "DeleteEndpoint",
186 "DescribeEndpoint",
187 "ListEndpoints",
188 "UpdateEndpoint",
189 "DeauthorizeConnection",
190 ]
191 }
192}
193
194fn parse_tags(body: &Value) -> HashMap<String, String> {
195 let mut tags = HashMap::new();
196 if let Some(arr) = body["Tags"].as_array() {
197 for tag in arr {
198 if let (Some(key), Some(val)) = (tag["Key"].as_str(), tag["Value"].as_str()) {
199 tags.insert(key.to_string(), val.to_string());
200 }
201 }
202 }
203 tags
204}
205
206fn parse_target(target: &Value) -> EventTarget {
207 EventTarget {
208 id: target["Id"].as_str().unwrap_or("").to_string(),
209 arn: target["Arn"].as_str().unwrap_or("").to_string(),
210 input: target["Input"].as_str().map(|s| s.to_string()),
211 input_path: target["InputPath"].as_str().map(|s| s.to_string()),
212 input_transformer: target.get("InputTransformer").cloned(),
213 sqs_parameters: target.get("SqsParameters").cloned(),
214 }
215}
216
217fn target_to_json(t: &EventTarget) -> Value {
218 let mut obj = json!({ "Id": t.id, "Arn": t.arn });
219 if let Some(ref input) = t.input {
220 obj["Input"] = json!(input);
221 }
222 if let Some(ref input_path) = t.input_path {
223 obj["InputPath"] = json!(input_path);
224 }
225 if let Some(ref it) = t.input_transformer {
226 obj["InputTransformer"] = it.clone();
227 }
228 if let Some(ref sp) = t.sqs_parameters {
229 obj["SqsParameters"] = sp.clone();
230 }
231 obj
232}
233
234impl EventBridgeService {
236 fn create_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
237 let body = req.json_body();
238 validate_required("Name", &body["Name"])?;
239 let name = body["Name"]
240 .as_str()
241 .ok_or_else(|| missing("Name"))?
242 .to_string();
243 validate_string_length("name", &name, 1, 256)?;
244 validate_optional_string_length(
245 "eventSourceName",
246 body["EventSourceName"].as_str(),
247 1,
248 256,
249 )?;
250 validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
251 validate_optional_string_length(
252 "kmsKeyIdentifier",
253 body["KmsKeyIdentifier"].as_str(),
254 0,
255 2048,
256 )?;
257
258 if name.contains('/') && !name.starts_with("aws.partner/") {
260 return Err(AwsServiceError::aws_error(
261 StatusCode::BAD_REQUEST,
262 "ValidationException",
263 "Event bus name must not contain '/'.",
264 ));
265 }
266
267 if name.starts_with("aws.partner/") {
269 let event_source = body["EventSourceName"].as_str().unwrap_or("");
270 let state_r = self.state.read();
271 let has_source = state_r.partner_event_sources.contains_key(event_source);
272 drop(state_r);
273 if !has_source {
274 return Err(AwsServiceError::aws_error(
275 StatusCode::BAD_REQUEST,
276 "ResourceNotFoundException",
277 format!("Event source {event_source} does not exist."),
278 ));
279 }
280 }
281
282 let mut state = self.state.write();
283
284 if state.buses.contains_key(&name) {
285 return Err(AwsServiceError::aws_error(
286 StatusCode::BAD_REQUEST,
287 "ResourceAlreadyExistsException",
288 format!("Event bus {name} already exists."),
289 ));
290 }
291
292 let arn = format!(
293 "arn:aws:events:{}:{}:event-bus/{}",
294 req.region, state.account_id, name
295 );
296 let now = Utc::now();
297 let description = body["Description"].as_str().map(|s| s.to_string());
298 let kms_key_identifier = body["KmsKeyIdentifier"].as_str().map(|s| s.to_string());
299 let dead_letter_config = body.get("DeadLetterConfig").cloned();
300
301 let tags = parse_tags(&body);
302
303 let bus = EventBus {
304 name: name.clone(),
305 arn: arn.clone(),
306 tags,
307 policy: None,
308 description,
309 kms_key_identifier,
310 dead_letter_config,
311 creation_time: now,
312 last_modified_time: now,
313 };
314 state.buses.insert(name, bus);
315
316 Ok(AwsResponse::ok_json(json!({ "EventBusArn": arn })))
317 }
318
319 fn delete_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
320 let body = req.json_body();
321 validate_required("Name", &body["Name"])?;
322 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
323 validate_string_length("name", name, 1, 256)?;
324
325 if name == "default" {
326 return Err(AwsServiceError::aws_error(
327 StatusCode::BAD_REQUEST,
328 "ValidationException",
329 format!("Cannot delete event bus {name}."),
330 ));
331 }
332
333 let mut state = self.state.write();
334 state.buses.remove(name);
335 state.rules.retain(|k, _| k.0 != name);
336
337 Ok(AwsResponse::ok_json(json!({})))
338 }
339
340 fn list_event_buses(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
341 let body = req.json_body();
342 validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 256)?;
343 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
344 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
345 let name_prefix = body["NamePrefix"].as_str();
346 let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
347 if let Some(t) = body["NextToken"].as_str() {
348 t.parse::<usize>().map_err(|_| {
349 AwsServiceError::aws_error(
350 StatusCode::BAD_REQUEST,
351 "InvalidNextTokenException",
352 format!("Invalid NextToken value: '{t}'"),
353 )
354 })?;
355 }
356
357 let state = self.state.read();
358 let filtered: Vec<&_> = state
359 .buses
360 .values()
361 .filter(|b| match name_prefix {
362 Some(prefix) => b.name.starts_with(prefix),
363 None => true,
364 })
365 .collect();
366
367 let (page, next_token) = paginate(&filtered, body["NextToken"].as_str(), limit);
368 let buses: Vec<Value> = page
369 .iter()
370 .map(|b| json!({ "Name": b.name, "Arn": b.arn }))
371 .collect();
372 let mut resp = json!({ "EventBuses": buses });
373 if let Some(token) = next_token {
374 resp["NextToken"] = json!(token);
375 }
376
377 Ok(AwsResponse::ok_json(resp))
378 }
379
380 fn describe_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
381 let body = req.json_body();
382 validate_optional_string_length("name", body["Name"].as_str(), 1, 1600)?;
383 let name = body["Name"].as_str().unwrap_or("default");
384
385 let state = self.state.read();
386 let bus = state.buses.get(name).ok_or_else(|| {
387 AwsServiceError::aws_error(
388 StatusCode::BAD_REQUEST,
389 "ResourceNotFoundException",
390 format!("Event bus {name} does not exist."),
391 )
392 })?;
393
394 let mut resp = json!({
395 "Name": bus.name,
396 "Arn": bus.arn,
397 "CreationTime": bus.creation_time.timestamp() as f64,
398 "LastModifiedTime": bus.last_modified_time.timestamp() as f64,
399 });
400
401 if let Some(ref policy) = bus.policy {
402 resp["Policy"] = Value::String(serde_json::to_string(policy).unwrap());
403 }
404 if let Some(ref desc) = bus.description {
405 resp["Description"] = json!(desc);
406 }
407 if let Some(ref kms) = bus.kms_key_identifier {
408 resp["KmsKeyIdentifier"] = json!(kms);
409 }
410 if let Some(ref dlc) = bus.dead_letter_config {
411 resp["DeadLetterConfig"] = dlc.clone();
412 }
413
414 Ok(AwsResponse::ok_json(resp))
415 }
416
417 fn put_permission(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
420 let body = req.json_body();
421 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 256)?;
422 validate_optional_string_length("action", body["Action"].as_str(), 1, 64)?;
423 validate_optional_string_length("principal", body["Principal"].as_str(), 1, 12)?;
424 validate_optional_string_length("statementId", body["StatementId"].as_str(), 1, 64)?;
425 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
426
427 let mut state = self.state.write();
428
429 let bus = state.buses.get_mut(event_bus_name).ok_or_else(|| {
430 AwsServiceError::aws_error(
431 StatusCode::BAD_REQUEST,
432 "ResourceNotFoundException",
433 format!("Event bus {event_bus_name} does not exist."),
434 )
435 })?;
436
437 if let Some(policy_str) = body["Policy"].as_str() {
439 if let Ok(policy) = serde_json::from_str::<Value>(policy_str) {
440 bus.policy = Some(policy);
441 return Ok(AwsResponse::ok_json(json!({})));
442 }
443 }
444
445 let action = body["Action"].as_str().unwrap_or("");
447 let principal = body["Principal"].as_str().unwrap_or("");
448 let statement_id = body["StatementId"].as_str().unwrap_or("");
449
450 if action != "events:PutEvents" {
452 return Err(AwsServiceError::aws_error(
453 StatusCode::BAD_REQUEST,
454 "ValidationException",
455 "Provided value in parameter 'action' is not supported.",
456 ));
457 }
458
459 let statement = json!({
460 "Sid": statement_id,
461 "Effect": "Allow",
462 "Principal": { "AWS": Arn::global("iam", principal, "root").to_string() },
463 "Action": action,
464 "Resource": bus.arn,
465 });
466
467 let policy = bus.policy.get_or_insert_with(|| {
468 json!({
469 "Version": "2012-10-17",
470 "Statement": [],
471 })
472 });
473
474 if let Some(stmts) = policy["Statement"].as_array_mut() {
475 stmts.push(statement);
476 }
477
478 Ok(AwsResponse::ok_json(json!({})))
479 }
480
481 fn remove_permission(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
482 let body = req.json_body();
483 validate_optional_string_length("statementId", body["StatementId"].as_str(), 1, 64)?;
484 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 256)?;
485 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
486 let statement_id = body["StatementId"].as_str().unwrap_or("");
487 let remove_all = body["RemoveAllPermissions"].as_bool().unwrap_or(false);
488
489 let mut state = self.state.write();
490
491 let bus = state.buses.get_mut(event_bus_name).ok_or_else(|| {
492 AwsServiceError::aws_error(
493 StatusCode::BAD_REQUEST,
494 "ResourceNotFoundException",
495 format!("Event bus {event_bus_name} does not exist."),
496 )
497 })?;
498
499 if remove_all {
500 bus.policy = None;
501 return Ok(AwsResponse::ok_json(json!({})));
502 }
503
504 let policy = bus.policy.as_mut().ok_or_else(|| {
505 AwsServiceError::aws_error(
506 StatusCode::BAD_REQUEST,
507 "ResourceNotFoundException",
508 "EventBus does not have a policy.",
509 )
510 })?;
511
512 if let Some(stmts) = policy["Statement"].as_array_mut() {
513 let before = stmts.len();
514 stmts.retain(|s| s["Sid"].as_str() != Some(statement_id));
515 if stmts.len() == before {
516 return Err(AwsServiceError::aws_error(
517 StatusCode::BAD_REQUEST,
518 "ResourceNotFoundException",
519 "Statement with the provided id does not exist.",
520 ));
521 }
522 if stmts.is_empty() {
523 bus.policy = None;
524 }
525 }
526
527 Ok(AwsResponse::ok_json(json!({})))
528 }
529
530 fn put_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
533 let body = req.json_body();
534 validate_required("Name", &body["Name"])?;
535 let name = body["Name"]
536 .as_str()
537 .ok_or_else(|| missing("Name"))?
538 .to_string();
539 validate_string_length("name", &name, 1, 64)?;
540 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
541 validate_optional_string_length(
542 "scheduleExpression",
543 body["ScheduleExpression"].as_str(),
544 0,
545 256,
546 )?;
547 validate_optional_string_length("eventPattern", body["EventPattern"].as_str(), 0, 4096)?;
548 validate_optional_enum(
549 "state",
550 body["State"].as_str(),
551 &[
552 "ENABLED",
553 "DISABLED",
554 "ENABLED_WITH_ALL_CLOUDTRAIL_MANAGEMENT_EVENTS",
555 ],
556 )?;
557 validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
558 validate_optional_string_length("roleArn", body["RoleArn"].as_str(), 1, 1600)?;
559
560 let raw_bus = body["EventBusName"]
561 .as_str()
562 .unwrap_or("default")
563 .to_string();
564
565 let mut state = self.state.write();
566 let event_bus_name = state.resolve_bus_name(&raw_bus);
567
568 let event_pattern = body["EventPattern"].as_str().and_then(|s| {
569 if s.is_empty() {
570 None
571 } else {
572 Some(s.to_string())
573 }
574 });
575 let schedule_expression = body["ScheduleExpression"].as_str().and_then(|s| {
576 if s.is_empty() {
577 None
578 } else {
579 Some(s.to_string())
580 }
581 });
582 let description = body["Description"].as_str().map(|s| s.to_string());
583 let role_arn = body["RoleArn"].as_str().map(|s| s.to_string());
584 let rule_state = body["State"].as_str().unwrap_or("ENABLED").to_string();
585
586 if schedule_expression.is_some() && event_bus_name != "default" {
588 return Err(AwsServiceError::aws_error(
589 StatusCode::BAD_REQUEST,
590 "ValidationException",
591 "ScheduleExpression is supported only on the default event bus.",
592 ));
593 }
594
595 if !state.buses.contains_key(&event_bus_name) {
596 return Err(AwsServiceError::aws_error(
597 StatusCode::BAD_REQUEST,
598 "ResourceNotFoundException",
599 format!("Event bus {event_bus_name} does not exist."),
600 ));
601 }
602
603 let arn = if event_bus_name == "default" {
604 format!(
605 "arn:aws:events:{}:{}:rule/{}",
606 req.region, state.account_id, name
607 )
608 } else {
609 format!(
610 "arn:aws:events:{}:{}:rule/{}/{}",
611 req.region, state.account_id, event_bus_name, name
612 )
613 };
614
615 let key = (event_bus_name.clone(), name.clone());
616 let targets = state
617 .rules
618 .get(&key)
619 .map(|r| r.targets.clone())
620 .unwrap_or_default();
621
622 let tags = parse_tags(&body);
623
624 let rule = EventRule {
625 name: name.clone(),
626 arn: arn.clone(),
627 event_bus_name,
628 event_pattern,
629 schedule_expression,
630 state: rule_state,
631 description,
632 role_arn,
633 managed_by: None,
634 created_by: None,
635 targets,
636 tags,
637 last_fired: None,
638 };
639
640 state.rules.insert(key, rule);
641 Ok(AwsResponse::ok_json(json!({ "RuleArn": arn })))
642 }
643
644 fn delete_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
645 let body = req.json_body();
646 validate_required("Name", &body["Name"])?;
647 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
648 validate_string_length("name", name, 1, 64)?;
649 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
650 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
651
652 let mut state = self.state.write();
653 let bus_name = state.resolve_bus_name(event_bus_name);
654 let key = (bus_name, name.to_string());
655
656 if let Some(rule) = state.rules.get(&key) {
658 if !rule.targets.is_empty() {
659 return Err(AwsServiceError::aws_error(
660 StatusCode::BAD_REQUEST,
661 "ValidationException",
662 "Rule can't be deleted since it has targets.",
663 ));
664 }
665 }
666
667 state.rules.remove(&key);
668 Ok(AwsResponse::ok_json(json!({})))
669 }
670
671 fn list_rules(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
672 let body = req.json_body();
673 validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 64)?;
674 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
675 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
676 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
677 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
678 let name_prefix = body["NamePrefix"].as_str();
679 let limit = body["Limit"].as_u64().map(|n| n as usize);
680 let next_token = body["NextToken"].as_str();
681
682 let state = self.state.read();
683 let bus_name = state.resolve_bus_name(event_bus_name);
684
685 let mut rules: Vec<&EventRule> = state
686 .rules
687 .values()
688 .filter(|r| r.event_bus_name == bus_name)
689 .filter(|r| match name_prefix {
690 Some(prefix) => r.name.starts_with(prefix),
691 None => true,
692 })
693 .collect();
694 rules.sort_by(|a, b| a.name.cmp(&b.name));
695
696 let start = next_token
698 .and_then(|t| t.parse::<usize>().ok())
699 .unwrap_or(0)
700 .min(rules.len());
701 let rules_slice = &rules[start..];
702
703 let (page, new_next_token) = if let Some(lim) = limit {
704 if rules_slice.len() > lim {
705 (&rules_slice[..lim], Some((start + lim).to_string()))
706 } else {
707 (rules_slice, None)
708 }
709 } else {
710 (rules_slice, None)
711 };
712
713 let rules_json: Vec<Value> = page
714 .iter()
715 .map(|r| {
716 let mut obj = json!({
717 "Name": r.name,
718 "Arn": r.arn,
719 "EventBusName": r.event_bus_name,
720 "State": r.state,
721 });
722 if let Some(ref desc) = r.description {
723 obj["Description"] = json!(desc);
724 }
725 if let Some(ref ep) = r.event_pattern {
726 obj["EventPattern"] = json!(ep);
727 }
728 if let Some(ref se) = r.schedule_expression {
729 obj["ScheduleExpression"] = json!(se);
730 }
731 if let Some(ref mb) = r.managed_by {
732 obj["ManagedBy"] = json!(mb);
733 }
734 obj
735 })
736 .collect();
737
738 let mut resp = json!({ "Rules": rules_json });
739 if let Some(token) = new_next_token {
740 resp["NextToken"] = json!(token);
741 }
742
743 Ok(AwsResponse::ok_json(resp))
744 }
745
746 fn describe_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
747 let body = req.json_body();
748 validate_required("Name", &body["Name"])?;
749 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
750 validate_string_length("name", name, 1, 64)?;
751 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
752 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
753
754 let state = self.state.read();
755 let bus_name = state.resolve_bus_name(event_bus_name);
756 let key = (bus_name.clone(), name.to_string());
757
758 let rule = state.rules.get(&key).ok_or_else(|| {
759 AwsServiceError::aws_error(
760 StatusCode::BAD_REQUEST,
761 "ResourceNotFoundException",
762 format!("Rule {name} does not exist."),
763 )
764 })?;
765
766 let mut resp = json!({
767 "Name": rule.name,
768 "Arn": rule.arn,
769 "EventBusName": rule.event_bus_name,
770 "State": rule.state,
771 });
772
773 if let Some(ref desc) = rule.description {
774 resp["Description"] = json!(desc);
775 }
776 if let Some(ref ep) = rule.event_pattern {
777 resp["EventPattern"] = json!(ep);
778 }
779 if let Some(ref se) = rule.schedule_expression {
780 resp["ScheduleExpression"] = json!(se);
781 }
782 if let Some(ref role) = rule.role_arn {
783 resp["RoleArn"] = json!(role);
784 }
785 if let Some(ref mb) = rule.managed_by {
786 resp["ManagedBy"] = json!(mb);
787 }
788 if let Some(ref cb) = rule.created_by {
789 resp["CreatedBy"] = json!(cb);
790 }
791 if rule.event_bus_name != "default" && rule.created_by.is_none() {
793 resp["CreatedBy"] = json!(state.account_id);
794 }
795
796 Ok(AwsResponse::ok_json(resp))
797 }
798
799 fn enable_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
800 let body = req.json_body();
801 validate_required("Name", &body["Name"])?;
802 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
803 validate_string_length("name", name, 1, 64)?;
804 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
805 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
806
807 let mut state = self.state.write();
808 let bus_name = state.resolve_bus_name(event_bus_name);
809 let key = (bus_name, name.to_string());
810
811 let rule = state.rules.get_mut(&key).ok_or_else(|| {
812 AwsServiceError::aws_error(
813 StatusCode::BAD_REQUEST,
814 "ResourceNotFoundException",
815 format!("Rule {name} does not exist."),
816 )
817 })?;
818
819 rule.state = "ENABLED".to_string();
820 Ok(AwsResponse::ok_json(json!({})))
821 }
822
823 fn disable_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
824 let body = req.json_body();
825 validate_required("Name", &body["Name"])?;
826 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
827 validate_string_length("name", name, 1, 64)?;
828 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
829 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
830
831 let mut state = self.state.write();
832 let bus_name = state.resolve_bus_name(event_bus_name);
833 let key = (bus_name, name.to_string());
834
835 let rule = state.rules.get_mut(&key).ok_or_else(|| {
836 AwsServiceError::aws_error(
837 StatusCode::BAD_REQUEST,
838 "ResourceNotFoundException",
839 format!("Rule {name} does not exist."),
840 )
841 })?;
842
843 rule.state = "DISABLED".to_string();
844 Ok(AwsResponse::ok_json(json!({})))
845 }
846
847 fn put_targets(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
850 let body = req.json_body();
851 validate_required("Rule", &body["Rule"])?;
852 let rule_name = body["Rule"].as_str().ok_or_else(|| missing("Rule"))?;
853 validate_string_length("rule", rule_name, 1, 64)?;
854 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
855 validate_required("Targets", &body["Targets"])?;
856 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
857 let targets = body["Targets"]
858 .as_array()
859 .ok_or_else(|| missing("Targets"))?;
860
861 for target in targets {
863 let target_id = target["Id"].as_str().unwrap_or("");
864 let target_arn = target["Arn"].as_str().unwrap_or("");
865
866 if target_arn.ends_with(".fifo") && target.get("SqsParameters").is_none() {
867 return Err(AwsServiceError::aws_error(
868 StatusCode::BAD_REQUEST,
869 "ValidationException",
870 format!(
871 "Parameter(s) SqsParameters must be specified for target: {target_id}."
872 ),
873 ));
874 }
875
876 if !target_arn.starts_with("arn:") {
878 return Err(AwsServiceError::aws_error(
879 StatusCode::BAD_REQUEST,
880 "ValidationException",
881 format!(
882 "Parameter {target_arn} is not valid. Reason: Provided Arn is not in correct format."
883 ),
884 ));
885 }
886 }
887
888 let mut state = self.state.write();
889 let bus_name = state.resolve_bus_name(event_bus_name);
890 let key = (bus_name.clone(), rule_name.to_string());
891
892 let rule = state.rules.get_mut(&key).ok_or_else(|| {
893 AwsServiceError::aws_error(
894 StatusCode::BAD_REQUEST,
895 "ResourceNotFoundException",
896 format!("Rule {rule_name} does not exist on EventBus {bus_name}."),
897 )
898 })?;
899
900 for target in targets {
901 let et = parse_target(target);
902 rule.targets.retain(|t| t.id != et.id);
904 rule.targets.push(et);
905 }
906
907 Ok(AwsResponse::ok_json(json!({
908 "FailedEntryCount": 0,
909 "FailedEntries": [],
910 })))
911 }
912
913 fn remove_targets(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
914 let body = req.json_body();
915 validate_required("Rule", &body["Rule"])?;
916 let rule_name = body["Rule"].as_str().ok_or_else(|| missing("Rule"))?;
917 validate_string_length("rule", rule_name, 1, 64)?;
918 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
919 validate_required("Ids", &body["Ids"])?;
920 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
921 let ids = body["Ids"].as_array().ok_or_else(|| missing("Ids"))?;
922
923 let target_ids: Vec<String> = ids
924 .iter()
925 .filter_map(|v| v.as_str().map(|s| s.to_string()))
926 .collect();
927
928 let mut state = self.state.write();
929 let bus_name = state.resolve_bus_name(event_bus_name);
930 let key = (bus_name.clone(), rule_name.to_string());
931
932 let rule = state.rules.get_mut(&key).ok_or_else(|| {
933 AwsServiceError::aws_error(
934 StatusCode::BAD_REQUEST,
935 "ResourceNotFoundException",
936 format!("Rule {rule_name} does not exist on EventBus {bus_name}."),
937 )
938 })?;
939
940 rule.targets.retain(|t| !target_ids.contains(&t.id));
941
942 Ok(AwsResponse::ok_json(json!({
943 "FailedEntryCount": 0,
944 "FailedEntries": [],
945 })))
946 }
947
948 fn list_targets_by_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
949 let body = req.json_body();
950 validate_required("Rule", &body["Rule"])?;
951 let rule_name = body["Rule"].as_str().ok_or_else(|| missing("Rule"))?;
952 validate_string_length("rule", rule_name, 1, 64)?;
953 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
954 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
955 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
956 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
957 let limit = body["Limit"].as_u64().map(|n| n as usize);
958 let next_token = body["NextToken"].as_str();
959
960 let state = self.state.read();
961 let bus_name = state.resolve_bus_name(event_bus_name);
962 let key = (bus_name, rule_name.to_string());
963
964 let rule = state.rules.get(&key).ok_or_else(|| {
965 AwsServiceError::aws_error(
966 StatusCode::BAD_REQUEST,
967 "ResourceNotFoundException",
968 format!("Rule {rule_name} does not exist."),
969 )
970 })?;
971
972 let all_targets = &rule.targets;
973 let start = next_token
974 .and_then(|t| t.parse::<usize>().ok())
975 .unwrap_or(0)
976 .min(all_targets.len());
977 let slice = &all_targets[start..];
978
979 let (page, new_next_token) = if let Some(lim) = limit {
980 if slice.len() > lim {
981 (&slice[..lim], Some((start + lim).to_string()))
982 } else {
983 (slice, None)
984 }
985 } else {
986 (slice, None)
987 };
988
989 let targets: Vec<Value> = page.iter().map(target_to_json).collect();
990
991 let mut resp = json!({ "Targets": targets });
992 if let Some(token) = new_next_token {
993 resp["NextToken"] = json!(token);
994 }
995
996 Ok(AwsResponse::ok_json(resp))
997 }
998
999 fn list_rule_names_by_target(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1000 let body = req.json_body();
1001 validate_required("TargetArn", &body["TargetArn"])?;
1002 let target_arn = body["TargetArn"]
1003 .as_str()
1004 .ok_or_else(|| missing("TargetArn"))?;
1005 validate_string_length("targetArn", target_arn, 1, 1600)?;
1006 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
1007 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1008 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1009 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
1010 let limit = body["Limit"].as_u64().map(|n| n as usize);
1011 let next_token = body["NextToken"].as_str();
1012
1013 let state = self.state.read();
1014 let bus_name = state.resolve_bus_name(event_bus_name);
1015
1016 let mut rule_names: Vec<String> = Vec::new();
1018 for rule in state.rules.values() {
1019 if rule.event_bus_name == bus_name
1020 && rule.targets.iter().any(|t| t.arn == target_arn)
1021 && !rule_names.contains(&rule.name)
1022 {
1023 rule_names.push(rule.name.clone());
1024 }
1025 }
1026 rule_names.sort();
1027
1028 let start = next_token
1029 .and_then(|t| t.parse::<usize>().ok())
1030 .unwrap_or(0)
1031 .min(rule_names.len());
1032 let slice = &rule_names[start..];
1033
1034 let (page, new_next_token) = if let Some(lim) = limit {
1035 if slice.len() > lim {
1036 (&slice[..lim], Some((start + lim).to_string()))
1037 } else {
1038 (slice, None)
1039 }
1040 } else {
1041 (slice, None)
1042 };
1043
1044 let mut resp = json!({ "RuleNames": page });
1045 if let Some(token) = new_next_token {
1046 resp["NextToken"] = json!(token);
1047 }
1048
1049 Ok(AwsResponse::ok_json(resp))
1050 }
1051
1052 fn create_partner_event_source(
1055 &self,
1056 req: &AwsRequest,
1057 ) -> Result<AwsResponse, AwsServiceError> {
1058 let body = req.json_body();
1059 validate_required("Name", &body["Name"])?;
1060 let name = body["Name"]
1061 .as_str()
1062 .ok_or_else(|| missing("Name"))?
1063 .to_string();
1064 validate_string_length("name", &name, 1, 256)?;
1065 validate_required("Account", &body["Account"])?;
1066 let account = body["Account"]
1067 .as_str()
1068 .ok_or_else(|| missing("Account"))?
1069 .to_string();
1070 validate_string_length("account", &account, 12, 12)?;
1071
1072 let mut state = self.state.write();
1073 if state.partner_event_sources.contains_key(&name) {
1074 return Err(AwsServiceError::aws_error(
1075 StatusCode::CONFLICT,
1076 "ResourceAlreadyExistsException",
1077 format!("Partner event source {name} already exists."),
1078 ));
1079 }
1080 let arn = format!(
1081 "arn:aws:events:{}::event-source/aws.partner/{}",
1082 state.region, name
1083 );
1084 let now = Utc::now();
1085 let ps = PartnerEventSource {
1086 name: name.clone(),
1087 arn: arn.clone(),
1088 account,
1089 creation_time: now,
1090 expiration_time: None,
1091 state: "ACTIVE".to_string(),
1092 };
1093 state.partner_event_sources.insert(name.clone(), ps);
1094
1095 Ok(AwsResponse::ok_json(json!({ "EventSourceArn": arn })))
1096 }
1097
1098 fn delete_partner_event_source(
1099 &self,
1100 req: &AwsRequest,
1101 ) -> Result<AwsResponse, AwsServiceError> {
1102 let body = req.json_body();
1103 validate_required("Name", &body["Name"])?;
1104 let name = body["Name"]
1105 .as_str()
1106 .ok_or_else(|| missing("Name"))?
1107 .to_string();
1108 validate_required("Account", &body["Account"])?;
1109 let account = body["Account"]
1110 .as_str()
1111 .ok_or_else(|| missing("Account"))?
1112 .to_string();
1113
1114 let mut state = self.state.write();
1115 match state.partner_event_sources.get(&name) {
1116 Some(ps) if ps.account == account => {
1117 state.partner_event_sources.remove(&name);
1118 }
1119 Some(_) => {
1120 return Err(AwsServiceError::aws_error(
1121 StatusCode::NOT_FOUND,
1122 "ResourceNotFoundException",
1123 format!("Partner event source {name} does not exist for account {account}."),
1124 ));
1125 }
1126 None => {
1127 return Err(AwsServiceError::aws_error(
1128 StatusCode::NOT_FOUND,
1129 "ResourceNotFoundException",
1130 format!("Partner event source {name} does not exist."),
1131 ));
1132 }
1133 }
1134
1135 Ok(AwsResponse::ok_json(json!({})))
1136 }
1137
1138 fn describe_partner_event_source(
1139 &self,
1140 req: &AwsRequest,
1141 ) -> Result<AwsResponse, AwsServiceError> {
1142 let body = req.json_body();
1143 validate_required("Name", &body["Name"])?;
1144 let name = body["Name"]
1145 .as_str()
1146 .ok_or_else(|| missing("Name"))?
1147 .to_string();
1148 validate_string_length("name", &name, 1, 256)?;
1149
1150 let state = self.state.read();
1151 let ps = state.partner_event_sources.get(&name).ok_or_else(|| {
1152 AwsServiceError::aws_error(
1153 StatusCode::NOT_FOUND,
1154 "ResourceNotFoundException",
1155 format!("Partner event source {name} does not exist."),
1156 )
1157 })?;
1158
1159 Ok(AwsResponse::ok_json(json!({
1160 "Arn": ps.arn,
1161 "Name": ps.name,
1162 })))
1163 }
1164
1165 fn list_partner_event_sources(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1166 let body = req.json_body();
1167 validate_required("namePrefix", &body["NamePrefix"])?;
1168 let name_prefix = body["NamePrefix"]
1169 .as_str()
1170 .ok_or_else(|| missing("NamePrefix"))?;
1171 validate_string_length("namePrefix", name_prefix, 1, 256)?;
1172 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1173 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1174 let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
1175
1176 let state = self.state.read();
1177 let all: Vec<Value> = state
1178 .partner_event_sources
1179 .values()
1180 .filter(|ps| ps.name.starts_with(name_prefix))
1181 .map(|ps| {
1182 json!({
1183 "Arn": ps.arn,
1184 "Name": ps.name,
1185 })
1186 })
1187 .collect();
1188
1189 let (sources, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
1190 let mut resp = json!({ "PartnerEventSources": sources });
1191 if let Some(token) = next_token {
1192 resp["NextToken"] = json!(token);
1193 }
1194
1195 Ok(AwsResponse::ok_json(resp))
1196 }
1197
1198 fn list_partner_event_source_accounts(
1199 &self,
1200 req: &AwsRequest,
1201 ) -> Result<AwsResponse, AwsServiceError> {
1202 let body = req.json_body();
1203 validate_required("EventSourceName", &body["EventSourceName"])?;
1204 let event_source_name = body["EventSourceName"]
1205 .as_str()
1206 .ok_or_else(|| missing("EventSourceName"))?;
1207 validate_string_length("eventSourceName", event_source_name, 1, 256)?;
1208 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1209 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1210
1211 let state = self.state.read();
1212 let accounts: Vec<Value> = state
1213 .partner_event_sources
1214 .values()
1215 .filter(|ps| ps.name == event_source_name)
1216 .map(|ps| json!({ "Account": ps.account }))
1217 .collect();
1218
1219 Ok(AwsResponse::ok_json(json!({
1220 "PartnerEventSourceAccounts": accounts
1221 })))
1222 }
1223
1224 fn activate_event_source(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1225 let body = req.json_body();
1226 validate_required("Name", &body["Name"])?;
1227 let name = body["Name"]
1228 .as_str()
1229 .ok_or_else(|| missing("Name"))?
1230 .to_string();
1231
1232 let mut state = self.state.write();
1233 let ps = state.partner_event_sources.get_mut(&name).ok_or_else(|| {
1234 AwsServiceError::aws_error(
1235 StatusCode::NOT_FOUND,
1236 "ResourceNotFoundException",
1237 format!("Event source {name} does not exist."),
1238 )
1239 })?;
1240 ps.state = "ACTIVE".to_string();
1241
1242 Ok(AwsResponse::ok_json(json!({})))
1243 }
1244
1245 fn deactivate_event_source(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1246 let body = req.json_body();
1247 validate_required("Name", &body["Name"])?;
1248 let name = body["Name"]
1249 .as_str()
1250 .ok_or_else(|| missing("Name"))?
1251 .to_string();
1252
1253 let mut state = self.state.write();
1254 let ps = state.partner_event_sources.get_mut(&name).ok_or_else(|| {
1255 AwsServiceError::aws_error(
1256 StatusCode::NOT_FOUND,
1257 "ResourceNotFoundException",
1258 format!("Event source {name} does not exist."),
1259 )
1260 })?;
1261 ps.state = "INACTIVE".to_string();
1262
1263 Ok(AwsResponse::ok_json(json!({})))
1264 }
1265
1266 fn describe_event_source(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1267 let body = req.json_body();
1268 validate_required("Name", &body["Name"])?;
1269 let name = body["Name"]
1270 .as_str()
1271 .ok_or_else(|| missing("Name"))?
1272 .to_string();
1273
1274 let state = self.state.read();
1275 let ps = state.partner_event_sources.get(&name).ok_or_else(|| {
1276 AwsServiceError::aws_error(
1277 StatusCode::NOT_FOUND,
1278 "ResourceNotFoundException",
1279 format!("Event source {name} does not exist."),
1280 )
1281 })?;
1282
1283 Ok(AwsResponse::ok_json(json!({
1284 "Arn": ps.arn,
1285 "Name": ps.name,
1286 "CreatedBy": ps.account,
1287 "CreationTime": ps.creation_time.timestamp() as f64,
1288 "State": ps.state,
1289 })))
1290 }
1291
1292 fn list_event_sources(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1293 let body = req.json_body();
1294 validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 256)?;
1295 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1296 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1297 let name_prefix = body["NamePrefix"].as_str();
1298 let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
1299
1300 let state = self.state.read();
1301 let all: Vec<Value> = state
1302 .partner_event_sources
1303 .values()
1304 .filter(|ps| match name_prefix {
1305 Some(prefix) => ps.name.starts_with(prefix),
1306 None => true,
1307 })
1308 .map(|ps| {
1309 json!({
1310 "Arn": ps.arn,
1311 "Name": ps.name,
1312 "CreatedBy": ps.account,
1313 "CreationTime": ps.creation_time.timestamp() as f64,
1314 "State": ps.state,
1315 })
1316 })
1317 .collect();
1318
1319 let (sources, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
1320 let mut resp = json!({ "EventSources": sources });
1321 if let Some(token) = next_token {
1322 resp["NextToken"] = json!(token);
1323 }
1324
1325 Ok(AwsResponse::ok_json(resp))
1326 }
1327
1328 fn put_partner_events(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1329 let body = req.json_body();
1330 validate_required("Entries", &body["Entries"])?;
1331 let entries = body["Entries"]
1332 .as_array()
1333 .ok_or_else(|| missing("Entries"))?;
1334
1335 let mut result_entries = Vec::new();
1336 for _entry in entries {
1337 let event_id = uuid::Uuid::new_v4().to_string();
1338 result_entries.push(json!({ "EventId": event_id }));
1339 }
1340
1341 Ok(AwsResponse::ok_json(json!({
1342 "FailedEntryCount": 0,
1343 "Entries": result_entries,
1344 })))
1345 }
1346
1347 fn test_event_pattern(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1350 let body = req.json_body();
1351 validate_required("EventPattern", &body["EventPattern"])?;
1352 validate_required("Event", &body["Event"])?;
1353 let event_pattern = body["EventPattern"]
1354 .as_str()
1355 .ok_or_else(|| missing("EventPattern"))?;
1356 let event_str = body["Event"].as_str().ok_or_else(|| missing("Event"))?;
1357
1358 let event: Value = serde_json::from_str(event_str).map_err(|_| {
1360 AwsServiceError::aws_error(
1361 StatusCode::BAD_REQUEST,
1362 "InvalidEventPatternException",
1363 "Event is not valid JSON.",
1364 )
1365 })?;
1366
1367 let _pattern: Value = serde_json::from_str(event_pattern).map_err(|_| {
1369 AwsServiceError::aws_error(
1370 StatusCode::BAD_REQUEST,
1371 "InvalidEventPatternException",
1372 "Event pattern is not valid JSON.",
1373 )
1374 })?;
1375
1376 let source = event["source"].as_str().unwrap_or("");
1377 let detail_type = event["detail-type"].as_str().unwrap_or("");
1378 let detail = event
1379 .get("detail")
1380 .map(|v| serde_json::to_string(v).unwrap_or_default())
1381 .unwrap_or_else(|| "{}".to_string());
1382 let account = event["account"].as_str().unwrap_or("");
1383 let region = event["region"].as_str().unwrap_or("");
1384 let resources: Vec<String> = event["resources"]
1385 .as_array()
1386 .map(|arr| {
1387 arr.iter()
1388 .filter_map(|v| v.as_str().map(|s| s.to_string()))
1389 .collect()
1390 })
1391 .unwrap_or_default();
1392
1393 let result = matches_pattern(
1394 Some(event_pattern),
1395 source,
1396 detail_type,
1397 &detail,
1398 account,
1399 region,
1400 &resources,
1401 );
1402
1403 Ok(AwsResponse::ok_json(json!({ "Result": result })))
1404 }
1405
1406 fn update_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1409 let body = req.json_body();
1410 validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
1411 validate_optional_string_length(
1412 "kmsKeyIdentifier",
1413 body["KmsKeyIdentifier"].as_str(),
1414 0,
1415 2048,
1416 )?;
1417 let name = body["Name"].as_str().unwrap_or("default");
1418
1419 let mut state = self.state.write();
1420 let bus = state.buses.get_mut(name).ok_or_else(|| {
1421 AwsServiceError::aws_error(
1422 StatusCode::BAD_REQUEST,
1423 "ResourceNotFoundException",
1424 format!("Event bus {name} does not exist."),
1425 )
1426 })?;
1427
1428 if let Some(desc) = body["Description"].as_str() {
1429 bus.description = Some(desc.to_string());
1430 }
1431 if let Some(kms) = body["KmsKeyIdentifier"].as_str() {
1432 bus.kms_key_identifier = Some(kms.to_string());
1433 }
1434 if let Some(dlc) = body.get("DeadLetterConfig") {
1435 bus.dead_letter_config = Some(dlc.clone());
1436 }
1437 bus.last_modified_time = Utc::now();
1438
1439 let arn = bus.arn.clone();
1440 let bus_name = bus.name.clone();
1441
1442 Ok(AwsResponse::ok_json(json!({
1443 "Arn": arn,
1444 "Name": bus_name,
1445 })))
1446 }
1447
1448 fn create_endpoint(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1451 let body = req.json_body();
1452 validate_required("Name", &body["Name"])?;
1453 let name = body["Name"]
1454 .as_str()
1455 .ok_or_else(|| missing("Name"))?
1456 .to_string();
1457 validate_string_length("name", &name, 1, 64)?;
1458 validate_required("RoutingConfig", &body["RoutingConfig"])?;
1459 validate_required("EventBuses", &body["EventBuses"])?;
1460
1461 let description = body["Description"].as_str().map(|s| s.to_string());
1462 let routing_config = body["RoutingConfig"].clone();
1463 let replication_config = body.get("ReplicationConfig").cloned();
1464 let event_buses = body["EventBuses"].as_array().cloned().unwrap_or_default();
1465 let role_arn = body["RoleArn"].as_str().map(|s| s.to_string());
1466
1467 let mut state = self.state.write();
1468 if state.endpoints.contains_key(&name) {
1469 return Err(AwsServiceError::aws_error(
1470 StatusCode::CONFLICT,
1471 "ResourceAlreadyExistsException",
1472 format!("Endpoint {name} already exists."),
1473 ));
1474 }
1475
1476 let endpoint_id = format!("{}.abc123", name);
1477 let arn = format!(
1478 "arn:aws:events:{}:{}:endpoint/{}",
1479 req.region, state.account_id, name
1480 );
1481 let endpoint_url = format!(
1482 "https://{}.endpoint.events.{}.amazonaws.com",
1483 endpoint_id, req.region
1484 );
1485 let now = Utc::now();
1486
1487 let endpoint = Endpoint {
1488 name: name.clone(),
1489 arn: arn.clone(),
1490 endpoint_id: endpoint_id.clone(),
1491 endpoint_url: Some(endpoint_url),
1492 description,
1493 routing_config: routing_config.clone(),
1494 replication_config: replication_config.clone(),
1495 event_buses: event_buses.clone(),
1496 role_arn: role_arn.clone(),
1497 state: "ACTIVE".to_string(),
1498 creation_time: now,
1499 last_modified_time: now,
1500 };
1501 state.endpoints.insert(name.clone(), endpoint);
1502
1503 let mut resp = json!({
1504 "Name": name,
1505 "Arn": arn,
1506 "State": "ACTIVE",
1507 "RoutingConfig": routing_config,
1508 "EventBuses": event_buses,
1509 });
1510 if let Some(ref rc) = replication_config {
1511 resp["ReplicationConfig"] = rc.clone();
1512 }
1513 if let Some(ref ra) = role_arn {
1514 resp["RoleArn"] = json!(ra);
1515 }
1516
1517 Ok(AwsResponse::ok_json(resp))
1518 }
1519
1520 fn delete_endpoint(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1521 let body = req.json_body();
1522 validate_required("Name", &body["Name"])?;
1523 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
1524
1525 let mut state = self.state.write();
1526 state.endpoints.remove(name).ok_or_else(|| {
1527 AwsServiceError::aws_error(
1528 StatusCode::BAD_REQUEST,
1529 "ResourceNotFoundException",
1530 format!("Endpoint '{name}' does not exist."),
1531 )
1532 })?;
1533
1534 Ok(AwsResponse::ok_json(json!({})))
1535 }
1536
1537 fn describe_endpoint(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1538 let body = req.json_body();
1539 validate_required("Name", &body["Name"])?;
1540 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
1541
1542 let state = self.state.read();
1543 let ep = state.endpoints.get(name).ok_or_else(|| {
1544 AwsServiceError::aws_error(
1545 StatusCode::BAD_REQUEST,
1546 "ResourceNotFoundException",
1547 format!("Endpoint '{name}' does not exist."),
1548 )
1549 })?;
1550
1551 let mut resp = json!({
1552 "Name": ep.name,
1553 "Arn": ep.arn,
1554 "EndpointId": ep.endpoint_id,
1555 "State": ep.state,
1556 "RoutingConfig": ep.routing_config,
1557 "EventBuses": ep.event_buses,
1558 "CreationTime": ep.creation_time.timestamp() as f64,
1559 "LastModifiedTime": ep.last_modified_time.timestamp() as f64,
1560 });
1561 if let Some(ref url) = ep.endpoint_url {
1562 resp["EndpointUrl"] = json!(url);
1563 }
1564 if let Some(ref desc) = ep.description {
1565 resp["Description"] = json!(desc);
1566 }
1567 if let Some(ref rc) = ep.replication_config {
1568 resp["ReplicationConfig"] = rc.clone();
1569 }
1570 if let Some(ref ra) = ep.role_arn {
1571 resp["RoleArn"] = json!(ra);
1572 }
1573
1574 Ok(AwsResponse::ok_json(resp))
1575 }
1576
1577 fn list_endpoints(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1578 let body = req.json_body();
1579 validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 64)?;
1580 validate_optional_string_length("homeRegion", body["HomeRegion"].as_str(), 9, 20)?;
1581 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1582 validate_optional_range_i64("maxResults", body["MaxResults"].as_i64(), 1, 100)?;
1583 let name_prefix = body["NamePrefix"].as_str();
1584 let limit = body["MaxResults"].as_i64().unwrap_or(100) as usize;
1585
1586 let state = self.state.read();
1587 let all: Vec<Value> = state
1588 .endpoints
1589 .values()
1590 .filter(|ep| match name_prefix {
1591 Some(prefix) => ep.name.starts_with(prefix),
1592 None => true,
1593 })
1594 .map(|ep| {
1595 let mut obj = json!({
1596 "Name": ep.name,
1597 "Arn": ep.arn,
1598 "EndpointId": ep.endpoint_id,
1599 "State": ep.state,
1600 "RoutingConfig": ep.routing_config,
1601 "EventBuses": ep.event_buses,
1602 "CreationTime": ep.creation_time.timestamp() as f64,
1603 "LastModifiedTime": ep.last_modified_time.timestamp() as f64,
1604 });
1605 if let Some(ref url) = ep.endpoint_url {
1606 obj["EndpointUrl"] = json!(url);
1607 }
1608 obj
1609 })
1610 .collect();
1611
1612 let (endpoints, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
1613 let mut resp = json!({ "Endpoints": endpoints });
1614 if let Some(token) = next_token {
1615 resp["NextToken"] = json!(token);
1616 }
1617
1618 Ok(AwsResponse::ok_json(resp))
1619 }
1620
1621 fn update_endpoint(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1622 let body = req.json_body();
1623 validate_required("Name", &body["Name"])?;
1624 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
1625
1626 let mut state = self.state.write();
1627 let ep = state.endpoints.get_mut(name).ok_or_else(|| {
1628 AwsServiceError::aws_error(
1629 StatusCode::BAD_REQUEST,
1630 "ResourceNotFoundException",
1631 format!("Endpoint '{name}' does not exist."),
1632 )
1633 })?;
1634
1635 if let Some(desc) = body["Description"].as_str() {
1636 ep.description = Some(desc.to_string());
1637 }
1638 if !body["RoutingConfig"].is_null() {
1639 ep.routing_config = body["RoutingConfig"].clone();
1640 }
1641 if let Some(rc) = body.get("ReplicationConfig") {
1642 ep.replication_config = Some(rc.clone());
1643 }
1644 if let Some(buses) = body["EventBuses"].as_array() {
1645 ep.event_buses = buses.clone();
1646 }
1647 if let Some(ra) = body["RoleArn"].as_str() {
1648 ep.role_arn = Some(ra.to_string());
1649 }
1650 ep.last_modified_time = Utc::now();
1651
1652 let resp = json!({
1653 "Name": ep.name,
1654 "Arn": ep.arn,
1655 "EndpointId": ep.endpoint_id,
1656 "State": ep.state,
1657 "RoutingConfig": ep.routing_config,
1658 "EventBuses": ep.event_buses,
1659 });
1660
1661 Ok(AwsResponse::ok_json(resp))
1662 }
1663
1664 fn deauthorize_connection(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1667 let body = req.json_body();
1668 validate_required("Name", &body["Name"])?;
1669 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
1670 validate_string_length("name", name, 1, 64)?;
1671
1672 let mut state = self.state.write();
1673 let conn = state.connections.get_mut(name).ok_or_else(|| {
1674 AwsServiceError::aws_error(
1675 StatusCode::BAD_REQUEST,
1676 "ResourceNotFoundException",
1677 format!("Connection '{name}' does not exist."),
1678 )
1679 })?;
1680
1681 conn.connection_state = "DEAUTHORIZING".to_string();
1682 conn.last_modified_time = Utc::now();
1683
1684 let resp = json!({
1685 "ConnectionArn": conn.arn,
1686 "ConnectionState": conn.connection_state,
1687 "CreationTime": conn.creation_time.timestamp() as f64,
1688 "LastModifiedTime": conn.last_modified_time.timestamp() as f64,
1689 "LastAuthorizedTime": conn.last_authorized_time.timestamp() as f64,
1690 });
1691
1692 Ok(AwsResponse::ok_json(resp))
1693 }
1694
1695 fn put_events(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1698 let body = req.json_body();
1699 validate_required("Entries", &body["Entries"])?;
1700 validate_optional_string_length("endpointId", body["EndpointId"].as_str(), 1, 50)?;
1701 let entries = body["Entries"]
1702 .as_array()
1703 .ok_or_else(|| missing("Entries"))?;
1704
1705 if entries.is_empty() {
1707 return Err(AwsServiceError::aws_error(
1708 StatusCode::BAD_REQUEST,
1709 "ValidationException",
1710 "1 validation error detected: Value '[PutEventsRequestEntry]' at 'entries' failed to satisfy constraint: Member must have length greater than or equal to 1",
1711 ));
1712 }
1713 if entries.len() > 10 {
1714 return Err(AwsServiceError::aws_error(
1715 StatusCode::BAD_REQUEST,
1716 "ValidationException",
1717 "1 validation error detected: Value '[PutEventsRequestEntry]' at 'entries' failed to satisfy constraint: Member must have length less than or equal to 10",
1718 ));
1719 }
1720
1721 let mut state = self.state.write();
1722 let mut result_entries = Vec::new();
1723 let mut events_to_deliver = Vec::new();
1724 let mut failed_count = 0;
1725
1726 for entry in entries {
1727 let source = entry["Source"].as_str().unwrap_or("").to_string();
1728 let detail_type = entry["DetailType"].as_str().unwrap_or("").to_string();
1729 let detail = entry["Detail"].as_str().unwrap_or("").to_string();
1730
1731 if source.is_empty() {
1733 failed_count += 1;
1734 result_entries.push(json!({
1735 "ErrorCode": "InvalidArgument",
1736 "ErrorMessage": "Parameter Source is not valid. Reason: Source is a required argument.",
1737 }));
1738 continue;
1739 }
1740 if detail_type.is_empty() {
1741 failed_count += 1;
1742 result_entries.push(json!({
1743 "ErrorCode": "InvalidArgument",
1744 "ErrorMessage": "Parameter DetailType is not valid. Reason: DetailType is a required argument.",
1745 }));
1746 continue;
1747 }
1748 if detail.is_empty() {
1749 failed_count += 1;
1750 result_entries.push(json!({
1751 "ErrorCode": "InvalidArgument",
1752 "ErrorMessage": "Parameter Detail is not valid. Reason: Detail is a required argument.",
1753 }));
1754 continue;
1755 }
1756
1757 if serde_json::from_str::<Value>(&detail).is_err() {
1759 failed_count += 1;
1760 result_entries.push(json!({
1761 "ErrorCode": "MalformedDetail",
1762 "ErrorMessage": "Detail is malformed.",
1763 }));
1764 continue;
1765 }
1766
1767 let event_id = uuid::Uuid::new_v4().to_string();
1768 let raw_bus = entry["EventBusName"]
1769 .as_str()
1770 .unwrap_or("default")
1771 .to_string();
1772 let event_bus_name = state.resolve_bus_name(&raw_bus);
1773 let time = if let Some(s) = entry["Time"].as_str() {
1774 DateTime::parse_from_rfc3339(s)
1775 .map(|dt| dt.with_timezone(&Utc))
1776 .unwrap_or_else(|_| Utc::now())
1777 } else if let Some(ts) = entry["Time"].as_f64() {
1778 DateTime::from_timestamp(ts as i64, ((ts.fract()) * 1_000_000_000.0) as u32)
1779 .unwrap_or_else(Utc::now)
1780 } else if let Some(ts) = entry["Time"].as_i64() {
1781 DateTime::from_timestamp(ts, 0).unwrap_or_else(Utc::now)
1782 } else {
1783 Utc::now()
1784 };
1785 let resources: Vec<String> = entry["Resources"]
1786 .as_array()
1787 .map(|arr| {
1788 arr.iter()
1789 .filter_map(|v| v.as_str().map(|s| s.to_string()))
1790 .collect()
1791 })
1792 .unwrap_or_default();
1793
1794 let event = PutEvent {
1795 event_id: event_id.clone(),
1796 source: source.clone(),
1797 detail_type: detail_type.clone(),
1798 detail: detail.clone(),
1799 event_bus_name: event_bus_name.clone(),
1800 time,
1801 resources: resources.clone(),
1802 };
1803
1804 let archive_keys: Vec<String> = state.archives.keys().cloned().collect();
1806 for akey in archive_keys {
1807 let (archive_bus, archive_pattern, archive_enabled) = {
1808 let a = &state.archives[&akey];
1809 (
1810 state.resolve_bus_name(&a.event_source_arn),
1811 a.event_pattern.clone(),
1812 a.state == "ENABLED",
1813 )
1814 };
1815 if archive_bus == event_bus_name && archive_enabled {
1816 let pattern_matches = matches_pattern(
1818 archive_pattern.as_deref(),
1819 &source,
1820 &detail_type,
1821 &detail,
1822 &req.account_id,
1823 &req.region,
1824 &resources,
1825 );
1826 if pattern_matches {
1827 if let Some(archive) = state.archives.get_mut(&akey) {
1828 archive.event_count += 1;
1829 archive.size_bytes += detail.len() as i64;
1830 archive.events.push(event.clone());
1831 }
1832 }
1833 }
1834 }
1835
1836 state.events.push(event);
1837
1838 let matching_targets: Vec<EventTarget> = state
1840 .rules
1841 .values()
1842 .filter(|r| {
1843 r.event_bus_name == event_bus_name
1844 && r.state == "ENABLED"
1845 && matches_pattern(
1846 r.event_pattern.as_deref(),
1847 &source,
1848 &detail_type,
1849 &detail,
1850 &req.account_id,
1851 &req.region,
1852 &resources,
1853 )
1854 })
1855 .flat_map(|r| r.targets.clone())
1856 .collect();
1857
1858 if !matching_targets.is_empty() {
1859 events_to_deliver.push((
1860 event_id.clone(),
1861 source,
1862 detail_type,
1863 detail,
1864 time,
1865 resources,
1866 matching_targets,
1867 ));
1868 }
1869
1870 result_entries.push(json!({ "EventId": event_id }));
1871 }
1872
1873 drop(state);
1875
1876 for (event_id, source, detail_type, detail, time, resources, targets) in events_to_deliver {
1878 let detail_value: Value = serde_json::from_str(&detail).unwrap_or(json!({}));
1879 let event_json = json!({
1880 "version": "0",
1881 "id": event_id,
1882 "source": source,
1883 "account": req.account_id,
1884 "detail-type": detail_type,
1885 "detail": detail_value,
1886 "time": time.format("%Y-%m-%dT%H:%M:%SZ").to_string(),
1887 "region": req.region,
1888 "resources": resources,
1889 });
1890 let event_str = event_json.to_string();
1891
1892 for target in targets {
1893 let arn = &target.arn;
1894 let body_str = if let Some(ref transformer) = target.input_transformer {
1896 apply_input_transformer(transformer, &event_json)
1897 } else if let Some(ref input) = target.input {
1898 input.clone()
1899 } else if let Some(ref input_path) = target.input_path {
1900 resolve_json_path(&event_json, input_path)
1901 .map(|v| v.to_string())
1902 .unwrap_or_else(|| event_str.clone())
1903 } else {
1904 event_str.clone()
1905 };
1906
1907 if arn.contains(":sqs:") {
1908 let group_id = target
1910 .sqs_parameters
1911 .as_ref()
1912 .and_then(|p| p["MessageGroupId"].as_str())
1913 .map(|s| s.to_string());
1914 if group_id.is_some() {
1915 self.delivery.send_to_sqs_with_attrs(
1919 arn,
1920 &body_str,
1921 &HashMap::new(),
1922 group_id.as_deref(),
1923 None,
1924 );
1925 } else {
1926 self.delivery.send_to_sqs(arn, &body_str, &HashMap::new());
1927 }
1928 } else if arn.contains(":sns:") {
1929 self.delivery
1930 .publish_to_sns(arn, &body_str, Some(&detail_type));
1931 } else if arn.contains(":lambda:") {
1932 tracing::info!(
1933 function_arn = %arn,
1934 payload = %body_str,
1935 "EventBridge delivering to Lambda function"
1936 );
1937 let now = Utc::now();
1938 let mut state = self.state.write();
1939 state
1940 .lambda_invocations
1941 .push(crate::state::LambdaInvocation {
1942 function_arn: arn.clone(),
1943 payload: body_str.clone(),
1944 timestamp: now,
1945 });
1946 drop(state);
1947 if let Some(ref ls) = self.lambda_state {
1949 ls.write().invocations.push(LambdaInvocation {
1950 function_arn: arn.clone(),
1951 payload: body_str.clone(),
1952 timestamp: now,
1953 source: "aws:events".to_string(),
1954 });
1955 }
1956 invoke_lambda_async(
1958 &self.container_runtime,
1959 &self.lambda_state,
1960 arn,
1961 &body_str,
1962 );
1963 } else if arn.contains(":logs:") {
1964 tracing::info!(
1965 log_group_arn = %arn,
1966 payload = %body_str,
1967 "EventBridge delivering to CloudWatch Logs"
1968 );
1969 let now = Utc::now();
1970 let mut state = self.state.write();
1971 state.log_deliveries.push(crate::state::LogDelivery {
1972 log_group_arn: arn.clone(),
1973 payload: body_str.clone(),
1974 timestamp: now,
1975 });
1976 drop(state);
1977 if let Some(ref log_state) = self.logs_state {
1979 deliver_to_logs(log_state, arn, &body_str, now);
1980 }
1981 } else if arn.contains(":states:") {
1982 tracing::info!(
1983 state_machine_arn = %arn,
1984 payload = %body_str,
1985 "EventBridge delivering to Step Functions (stub)"
1986 );
1987 let mut state = self.state.write();
1988 state
1989 .step_function_executions
1990 .push(crate::state::StepFunctionExecution {
1991 state_machine_arn: arn.clone(),
1992 payload: body_str.clone(),
1993 timestamp: Utc::now(),
1994 });
1995 } else if arn.contains(":api-destination/") {
1996 let state = self.state.read();
1998 let dest = state.api_destinations.values().find(|d| d.arn == *arn);
1999 if let Some(dest) = dest {
2000 let url = dest.invocation_endpoint.clone();
2001 let method = dest.http_method.clone();
2002 let conn = state
2003 .connections
2004 .values()
2005 .find(|c| c.arn == dest.connection_arn)
2006 .cloned();
2007 drop(state);
2008
2009 let payload = body_str.clone();
2010 tokio::spawn(async move {
2011 let client = reqwest::Client::new();
2012 let mut req_builder = match method.as_str() {
2013 "GET" => client.get(&url),
2014 "PUT" => client.put(&url),
2015 "DELETE" => client.delete(&url),
2016 "PATCH" => client.patch(&url),
2017 "HEAD" => client.head(&url),
2018 _ => client.post(&url),
2019 };
2020 req_builder = req_builder.header("Content-Type", "application/json");
2021
2022 if let Some(conn) = conn {
2024 req_builder = apply_connection_auth(req_builder, &conn);
2025 }
2026
2027 let result = req_builder.body(payload).send().await;
2028 if let Err(e) = result {
2029 tracing::warn!(
2030 endpoint = %url,
2031 error = %e,
2032 "EventBridge ApiDestination delivery failed"
2033 );
2034 }
2035 });
2036 }
2037 } else if arn.starts_with("https://") || arn.starts_with("http://") {
2038 let url = arn.clone();
2040 let payload = body_str.clone();
2041 tokio::spawn(async move {
2042 let client = reqwest::Client::new();
2043 let result = client
2044 .post(&url)
2045 .header("Content-Type", "application/json")
2046 .body(payload)
2047 .send()
2048 .await;
2049 if let Err(e) = result {
2050 tracing::warn!(
2051 endpoint = %url,
2052 error = %e,
2053 "EventBridge HTTP target delivery failed"
2054 );
2055 }
2056 });
2057 }
2058 }
2059 }
2060
2061 let resp = json!({
2062 "FailedEntryCount": failed_count,
2063 "Entries": result_entries,
2064 });
2065
2066 Ok(AwsResponse::ok_json(resp))
2067 }
2068
2069 fn tag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2072 let body = req.json_body();
2073 validate_required("ResourceARN", &body["ResourceARN"])?;
2074 let arn = body["ResourceARN"]
2075 .as_str()
2076 .ok_or_else(|| missing("ResourceARN"))?;
2077 validate_string_length("resourceARN", arn, 1, 1600)?;
2078 validate_required("Tags", &body["Tags"])?;
2079
2080 let mut state = self.state.write();
2081 let tag_map = find_tags_mut(&mut state, arn)?;
2082
2083 fakecloud_core::tags::apply_tags(tag_map, &body, "Tags", "Key", "Value").map_err(|f| {
2084 AwsServiceError::aws_error(
2085 StatusCode::BAD_REQUEST,
2086 "ValidationException",
2087 format!("{f} must be a list"),
2088 )
2089 })?;
2090
2091 Ok(AwsResponse::ok_json(json!({})))
2092 }
2093
2094 fn untag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2095 let body = req.json_body();
2096 validate_required("ResourceARN", &body["ResourceARN"])?;
2097 let arn = body["ResourceARN"]
2098 .as_str()
2099 .ok_or_else(|| missing("ResourceARN"))?;
2100 validate_string_length("resourceARN", arn, 1, 1600)?;
2101 validate_required("TagKeys", &body["TagKeys"])?;
2102
2103 let mut state = self.state.write();
2104 let tag_map = find_tags_mut(&mut state, arn)?;
2105
2106 fakecloud_core::tags::remove_tags(tag_map, &body, "TagKeys").map_err(|f| {
2107 AwsServiceError::aws_error(
2108 StatusCode::BAD_REQUEST,
2109 "ValidationException",
2110 format!("{f} must be a list"),
2111 )
2112 })?;
2113
2114 Ok(AwsResponse::ok_json(json!({})))
2115 }
2116
2117 fn list_tags_for_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2118 let body = req.json_body();
2119 validate_required("ResourceARN", &body["ResourceARN"])?;
2120 let arn = body["ResourceARN"]
2121 .as_str()
2122 .ok_or_else(|| missing("ResourceARN"))?;
2123 validate_string_length("resourceARN", arn, 1, 1600)?;
2124
2125 let state = self.state.read();
2126 let tag_map = find_tags(&state, arn)?;
2127
2128 let tags = fakecloud_core::tags::tags_to_json(tag_map, "Key", "Value");
2129
2130 Ok(AwsResponse::ok_json(json!({ "Tags": tags })))
2131 }
2132
2133 fn create_archive(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2136 let body = req.json_body();
2137 validate_required("ArchiveName", &body["ArchiveName"])?;
2138 let name = body["ArchiveName"]
2139 .as_str()
2140 .ok_or_else(|| missing("ArchiveName"))?
2141 .to_string();
2142 validate_string_length("archiveName", &name, 1, 48)?;
2143 validate_required("EventSourceArn", &body["EventSourceArn"])?;
2144 let event_source_arn = body["EventSourceArn"]
2145 .as_str()
2146 .ok_or_else(|| missing("EventSourceArn"))?
2147 .to_string();
2148 validate_string_length("eventSourceArn", &event_source_arn, 1, 1600)?;
2149 validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2150 validate_optional_string_length("eventPattern", body["EventPattern"].as_str(), 0, 4096)?;
2151 if let Some(rd) = body["RetentionDays"].as_i64() {
2152 validate_range_i64("retentionDays", rd, 0, i64::MAX)?;
2153 }
2154 let description = body["Description"].as_str().map(|s| s.to_string());
2155 let event_pattern = body["EventPattern"].as_str().map(|s| s.to_string());
2156 let retention_days = body["RetentionDays"].as_i64().unwrap_or(0);
2157
2158 if let Some(ref pattern) = event_pattern {
2160 validate_event_pattern(pattern)?;
2161 }
2162
2163 let mut state = self.state.write();
2164
2165 let bus_name = state.resolve_bus_name(&event_source_arn);
2167 if !state.buses.contains_key(&bus_name) {
2168 return Err(AwsServiceError::aws_error(
2169 StatusCode::BAD_REQUEST,
2170 "ResourceNotFoundException",
2171 format!("Event bus {bus_name} does not exist."),
2172 ));
2173 }
2174
2175 if state.archives.contains_key(&name) {
2177 return Err(AwsServiceError::aws_error(
2178 StatusCode::BAD_REQUEST,
2179 "ResourceAlreadyExistsException",
2180 format!("Archive {name} already exists."),
2181 ));
2182 }
2183
2184 let now = Utc::now();
2185 let arn = format!(
2186 "arn:aws:events:{}:{}:archive/{}",
2187 req.region, state.account_id, name
2188 );
2189
2190 let archive = Archive {
2191 name: name.clone(),
2192 arn: arn.clone(),
2193 event_source_arn: event_source_arn.clone(),
2194 description,
2195 event_pattern: event_pattern.clone(),
2196 retention_days,
2197 state: "ENABLED".to_string(),
2198 creation_time: now,
2199 event_count: 0,
2200 size_bytes: 0,
2201 events: Vec::new(),
2202 };
2203 state.archives.insert(name.clone(), archive);
2204
2205 let rule_name = format!("Events-Archive-{name}");
2207 let rule_arn = format!(
2208 "arn:aws:events:{}:{}:rule/{}",
2209 req.region, state.account_id, rule_name
2210 );
2211 let rule_event_pattern = {
2213 let mut merged = if let Some(ref ep) = event_pattern {
2214 serde_json::from_str::<Value>(ep).unwrap_or_else(|_| json!({}))
2215 } else {
2216 json!({})
2217 };
2218 if let Some(obj) = merged.as_object_mut() {
2219 obj.insert("replay-name".to_string(), json!([{"exists": false}]));
2220 }
2221 serde_json::to_string(&merged).unwrap_or_default()
2222 };
2223
2224 let archive_target = EventTarget {
2226 id: name.clone(),
2227 arn: format!("arn:aws:events:{}:::", req.region),
2228 input: None,
2229 input_path: None,
2230 input_transformer: Some(json!({
2231 "InputPathsMap": {},
2232 "InputTemplate": format!(
2233 "{{\"archive-arn\": \"{}\", \"event\": <aws.events.event.json>, \"ingestion-time\": <aws.events.event.ingestion-time>}}",
2234 arn
2235 )
2236 })),
2237 sqs_parameters: None,
2238 };
2239
2240 let archive_rule = EventRule {
2241 name: rule_name.clone(),
2242 arn: rule_arn,
2243 event_bus_name: bus_name.clone(),
2244 event_pattern: Some(rule_event_pattern),
2245 schedule_expression: None,
2246 state: "ENABLED".to_string(),
2247 description: None,
2248 role_arn: None,
2249 managed_by: Some("prod.vhs.events.aws.internal".to_string()),
2250 created_by: Some(state.account_id.clone()),
2251 targets: vec![archive_target],
2252 tags: HashMap::new(),
2253 last_fired: None,
2254 };
2255 let key = (bus_name, rule_name);
2256 state.rules.insert(key, archive_rule);
2257
2258 Ok(AwsResponse::ok_json(json!({
2259 "ArchiveArn": arn,
2260 "CreationTime": now.timestamp() as f64,
2261 "State": "ENABLED",
2262 })))
2263 }
2264
2265 fn describe_archive(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2266 let body = req.json_body();
2267 validate_required("ArchiveName", &body["ArchiveName"])?;
2268 let name = body["ArchiveName"]
2269 .as_str()
2270 .ok_or_else(|| missing("ArchiveName"))?;
2271 validate_string_length("archiveName", name, 1, 48)?;
2272
2273 let state = self.state.read();
2274 let archive = state.archives.get(name).ok_or_else(|| {
2275 AwsServiceError::aws_error(
2276 StatusCode::BAD_REQUEST,
2277 "ResourceNotFoundException",
2278 format!("Archive {name} does not exist."),
2279 )
2280 })?;
2281
2282 let mut resp = json!({
2283 "ArchiveArn": archive.arn,
2284 "ArchiveName": archive.name,
2285 "CreationTime": archive.creation_time.timestamp() as f64,
2286 "EventCount": archive.event_count,
2287 "EventSourceArn": archive.event_source_arn,
2288 "RetentionDays": archive.retention_days,
2289 "SizeBytes": archive.size_bytes,
2290 "State": archive.state,
2291 });
2292 if let Some(ref desc) = archive.description {
2293 resp["Description"] = json!(desc);
2294 }
2295 if let Some(ref ep) = archive.event_pattern {
2296 resp["EventPattern"] = json!(ep);
2297 }
2298
2299 Ok(AwsResponse::ok_json(resp))
2300 }
2301
2302 fn list_archives(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2303 let body = req.json_body();
2304 validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 48)?;
2305 validate_optional_string_length(
2306 "eventSourceArn",
2307 body["EventSourceArn"].as_str(),
2308 1,
2309 1600,
2310 )?;
2311 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
2312 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
2313 let name_prefix = body["NamePrefix"].as_str();
2314 let source_arn = body["EventSourceArn"].as_str();
2315 let archive_state = body["State"].as_str();
2316
2317 let filter_count = [
2319 name_prefix.is_some(),
2320 source_arn.is_some(),
2321 archive_state.is_some(),
2322 ]
2323 .iter()
2324 .filter(|&&x| x)
2325 .count();
2326 if filter_count > 1 {
2327 return Err(AwsServiceError::aws_error(
2328 StatusCode::BAD_REQUEST,
2329 "ValidationException",
2330 "At most one filter is allowed for ListArchives. Use either : State, EventSourceArn, or NamePrefix.",
2331 ));
2332 }
2333
2334 if let Some(s) = archive_state {
2336 let valid = [
2337 "ENABLED",
2338 "DISABLED",
2339 "CREATING",
2340 "UPDATING",
2341 "CREATE_FAILED",
2342 "UPDATE_FAILED",
2343 ];
2344 if !valid.contains(&s) {
2345 return Err(AwsServiceError::aws_error(
2346 StatusCode::BAD_REQUEST,
2347 "ValidationException",
2348 format!(
2349 "1 validation error detected: Value '{}' at 'state' failed to satisfy constraint: Member must satisfy enum value set: [ENABLED, DISABLED, CREATING, UPDATING, CREATE_FAILED, UPDATE_FAILED]",
2350 s
2351 ),
2352 ));
2353 }
2354 }
2355
2356 let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
2357
2358 let state = self.state.read();
2359 let all: Vec<Value> = state
2360 .archives
2361 .values()
2362 .filter(|a| {
2363 if let Some(prefix) = name_prefix {
2364 a.name.starts_with(prefix)
2365 } else if let Some(arn) = source_arn {
2366 a.event_source_arn == arn
2367 } else if let Some(s) = archive_state {
2368 a.state == s
2369 } else {
2370 true
2371 }
2372 })
2373 .map(|a| {
2374 json!({
2375 "ArchiveName": a.name,
2376 "CreationTime": a.creation_time.timestamp() as f64,
2377 "EventCount": a.event_count,
2378 "EventSourceArn": a.event_source_arn,
2379 "RetentionDays": a.retention_days,
2380 "SizeBytes": a.size_bytes,
2381 "State": a.state,
2382 })
2383 })
2384 .collect();
2385
2386 let (archives, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
2387 let mut resp = json!({ "Archives": archives });
2388 if let Some(token) = next_token {
2389 resp["NextToken"] = json!(token);
2390 }
2391
2392 Ok(AwsResponse::ok_json(resp))
2393 }
2394
2395 fn update_archive(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2396 let body = req.json_body();
2397 validate_required("ArchiveName", &body["ArchiveName"])?;
2398 let name = body["ArchiveName"]
2399 .as_str()
2400 .ok_or_else(|| missing("ArchiveName"))?;
2401 validate_string_length("archiveName", name, 1, 48)?;
2402 validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2403 validate_optional_string_length("eventPattern", body["EventPattern"].as_str(), 0, 4096)?;
2404 if let Some(rd) = body["RetentionDays"].as_i64() {
2405 validate_range_i64("retentionDays", rd, 0, i64::MAX)?;
2406 }
2407
2408 if let Some(pattern) = body["EventPattern"].as_str() {
2410 validate_event_pattern(pattern)?;
2411 }
2412
2413 let mut state = self.state.write();
2414 let archive = state.archives.get_mut(name).ok_or_else(|| {
2415 AwsServiceError::aws_error(
2416 StatusCode::BAD_REQUEST,
2417 "ResourceNotFoundException",
2418 format!("Archive {name} does not exist."),
2419 )
2420 })?;
2421
2422 if let Some(desc) = body["Description"].as_str() {
2423 archive.description = Some(desc.to_string());
2424 }
2425 if let Some(pattern) = body["EventPattern"].as_str() {
2426 archive.event_pattern = Some(pattern.to_string());
2427 }
2428 if let Some(days) = body["RetentionDays"].as_i64() {
2429 archive.retention_days = days;
2430 }
2431
2432 Ok(AwsResponse::ok_json(json!({
2433 "ArchiveArn": archive.arn,
2434 "CreationTime": archive.creation_time.timestamp() as f64,
2435 "State": archive.state,
2436 })))
2437 }
2438
2439 fn delete_archive(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2440 let body = req.json_body();
2441 validate_required("ArchiveName", &body["ArchiveName"])?;
2442 let name = body["ArchiveName"]
2443 .as_str()
2444 .ok_or_else(|| missing("ArchiveName"))?;
2445 validate_string_length("archiveName", name, 1, 48)?;
2446
2447 let mut state = self.state.write();
2448 if !state.archives.contains_key(name) {
2449 return Err(AwsServiceError::aws_error(
2450 StatusCode::BAD_REQUEST,
2451 "ResourceNotFoundException",
2452 format!("Archive {name} does not exist."),
2453 ));
2454 }
2455
2456 state.archives.remove(name);
2457
2458 let rule_name = format!("Events-Archive-{name}");
2460 state.rules.retain(|k, _| k.1 != rule_name);
2461
2462 Ok(AwsResponse::ok_json(json!({})))
2463 }
2464
2465 fn create_connection(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2468 let body = req.json_body();
2469 validate_required("Name", &body["Name"])?;
2470 let name = body["Name"]
2471 .as_str()
2472 .ok_or_else(|| missing("Name"))?
2473 .to_string();
2474 validate_string_length("name", &name, 1, 64)?;
2475 validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2476 validate_required("AuthorizationType", &body["AuthorizationType"])?;
2477 let description = body["Description"].as_str().map(|s| s.to_string());
2478 let auth_type = body["AuthorizationType"]
2479 .as_str()
2480 .ok_or_else(|| missing("AuthorizationType"))?
2481 .to_string();
2482 validate_enum(
2483 "authorizationType",
2484 &auth_type,
2485 &["BASIC", "OAUTH_CLIENT_CREDENTIALS", "API_KEY"],
2486 )?;
2487 validate_optional_string_length(
2488 "kmsKeyIdentifier",
2489 body["KmsKeyIdentifier"].as_str(),
2490 0,
2491 2048,
2492 )?;
2493 validate_required("AuthParameters", &body["AuthParameters"])?;
2494 let auth_params = body["AuthParameters"].clone();
2495
2496 let mut state = self.state.write();
2497 let now = Utc::now();
2498 let conn_uuid = uuid::Uuid::new_v4();
2499 let arn = format!(
2500 "arn:aws:events:{}:{}:connection/{}/{}",
2501 req.region, state.account_id, name, conn_uuid
2502 );
2503 let secret_arn = format!(
2504 "arn:aws:secretsmanager:{}:{}:secret:events!connection/{}/{}",
2505 req.region, state.account_id, name, conn_uuid
2506 );
2507
2508 let conn = Connection {
2509 name: name.clone(),
2510 arn: arn.clone(),
2511 description,
2512 authorization_type: auth_type.clone(),
2513 auth_parameters: auth_params,
2514 connection_state: "AUTHORIZED".to_string(),
2515 secret_arn: secret_arn.clone(),
2516 creation_time: now,
2517 last_modified_time: now,
2518 last_authorized_time: now,
2519 };
2520 state.connections.insert(name, conn);
2521
2522 Ok(AwsResponse::ok_json(json!({
2523 "ConnectionArn": arn,
2524 "ConnectionState": "AUTHORIZED",
2525 "CreationTime": now.timestamp() as f64,
2526 "LastModifiedTime": now.timestamp() as f64,
2527 })))
2528 }
2529
2530 fn describe_connection(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2531 let body = req.json_body();
2532 validate_required("Name", &body["Name"])?;
2533 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
2534 validate_string_length("name", name, 1, 64)?;
2535
2536 let state = self.state.read();
2537 let conn = state.connections.get(name).ok_or_else(|| {
2538 AwsServiceError::aws_error(
2539 StatusCode::BAD_REQUEST,
2540 "ResourceNotFoundException",
2541 format!("Connection '{name}' does not exist."),
2542 )
2543 })?;
2544
2545 let auth_params_response =
2547 build_auth_params_response(&conn.authorization_type, &conn.auth_parameters);
2548
2549 let mut resp = json!({
2550 "ConnectionArn": conn.arn,
2551 "Name": conn.name,
2552 "AuthorizationType": conn.authorization_type,
2553 "AuthParameters": auth_params_response,
2554 "ConnectionState": conn.connection_state,
2555 "SecretArn": conn.secret_arn,
2556 "CreationTime": conn.creation_time.timestamp() as f64,
2557 "LastModifiedTime": conn.last_modified_time.timestamp() as f64,
2558 "LastAuthorizedTime": conn.last_authorized_time.timestamp() as f64,
2559 });
2560 if let Some(ref desc) = conn.description {
2561 resp["Description"] = json!(desc);
2562 }
2563
2564 Ok(AwsResponse::ok_json(resp))
2565 }
2566
2567 fn list_connections(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2568 let body = req.json_body();
2569 validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 64)?;
2570 validate_optional_enum(
2571 "connectionState",
2572 body["ConnectionState"].as_str(),
2573 &[
2574 "CREATING",
2575 "UPDATING",
2576 "DELETING",
2577 "AUTHORIZED",
2578 "DEAUTHORIZED",
2579 "AUTHORIZING",
2580 "DEAUTHORIZING",
2581 "ACTIVE",
2582 "FAILED_CONNECTIVITY",
2583 ],
2584 )?;
2585 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
2586 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
2587
2588 let name_prefix = body["NamePrefix"].as_str();
2589 let connection_state = body["ConnectionState"].as_str();
2590 let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
2591
2592 let state = self.state.read();
2593 let all: Vec<Value> = state
2594 .connections
2595 .values()
2596 .filter(|c| {
2597 if let Some(prefix) = name_prefix {
2598 if !c.name.starts_with(prefix) {
2599 return false;
2600 }
2601 }
2602 if let Some(cs) = connection_state {
2603 if c.connection_state != cs {
2604 return false;
2605 }
2606 }
2607 true
2608 })
2609 .map(|c| {
2610 json!({
2611 "ConnectionArn": c.arn,
2612 "Name": c.name,
2613 "AuthorizationType": c.authorization_type,
2614 "ConnectionState": c.connection_state,
2615 "CreationTime": c.creation_time.timestamp() as f64,
2616 "LastModifiedTime": c.last_modified_time.timestamp() as f64,
2617 "LastAuthorizedTime": c.last_authorized_time.timestamp() as f64,
2618 })
2619 })
2620 .collect();
2621
2622 let (conns, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
2623 let mut resp = json!({ "Connections": conns });
2624 if let Some(token) = next_token {
2625 resp["NextToken"] = json!(token);
2626 }
2627
2628 Ok(AwsResponse::ok_json(resp))
2629 }
2630
2631 fn update_connection(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2632 let body = req.json_body();
2633 validate_required("Name", &body["Name"])?;
2634 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
2635 validate_string_length("name", name, 1, 64)?;
2636 validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2637 validate_optional_enum(
2638 "authorizationType",
2639 body["AuthorizationType"].as_str(),
2640 &["BASIC", "OAUTH_CLIENT_CREDENTIALS", "API_KEY"],
2641 )?;
2642
2643 let mut state = self.state.write();
2644 let conn = state.connections.get_mut(name).ok_or_else(|| {
2645 AwsServiceError::aws_error(
2646 StatusCode::BAD_REQUEST,
2647 "ResourceNotFoundException",
2648 format!("Connection '{name}' does not exist."),
2649 )
2650 })?;
2651
2652 if let Some(desc) = body["Description"].as_str() {
2653 conn.description = Some(desc.to_string());
2654 }
2655 if let Some(auth_type) = body["AuthorizationType"].as_str() {
2656 conn.authorization_type = auth_type.to_string();
2657 }
2658 if body.get("AuthParameters").is_some() {
2659 conn.auth_parameters = body["AuthParameters"].clone();
2660 }
2661 conn.last_modified_time = Utc::now();
2662
2663 Ok(AwsResponse::ok_json(json!({
2664 "ConnectionArn": conn.arn,
2665 "ConnectionState": conn.connection_state,
2666 "CreationTime": conn.creation_time.timestamp() as f64,
2667 "LastModifiedTime": conn.last_modified_time.timestamp() as f64,
2668 "LastAuthorizedTime": conn.last_authorized_time.timestamp() as f64,
2669 })))
2670 }
2671
2672 fn delete_connection(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2673 let body = req.json_body();
2674 validate_required("Name", &body["Name"])?;
2675 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
2676 validate_string_length("name", name, 1, 64)?;
2677
2678 let mut state = self.state.write();
2679 let conn = state.connections.remove(name).ok_or_else(|| {
2680 AwsServiceError::aws_error(
2681 StatusCode::BAD_REQUEST,
2682 "ResourceNotFoundException",
2683 format!("Connection '{name}' does not exist."),
2684 )
2685 })?;
2686
2687 Ok(AwsResponse::ok_json(json!({
2688 "ConnectionArn": conn.arn,
2689 "ConnectionState": conn.connection_state,
2690 "CreationTime": conn.creation_time.timestamp() as f64,
2691 "LastModifiedTime": conn.last_modified_time.timestamp() as f64,
2692 "LastAuthorizedTime": conn.last_authorized_time.timestamp() as f64,
2693 })))
2694 }
2695
2696 fn create_api_destination(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2699 let body = req.json_body();
2700 validate_required("Name", &body["Name"])?;
2701 let name = body["Name"]
2702 .as_str()
2703 .ok_or_else(|| missing("Name"))?
2704 .to_string();
2705 validate_string_length("name", &name, 1, 64)?;
2706 validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2707 validate_required("ConnectionArn", &body["ConnectionArn"])?;
2708 let description = body["Description"].as_str().map(|s| s.to_string());
2709 let connection_arn = body["ConnectionArn"]
2710 .as_str()
2711 .ok_or_else(|| missing("ConnectionArn"))?
2712 .to_string();
2713 validate_string_length("connectionArn", &connection_arn, 1, 1600)?;
2714 validate_required("InvocationEndpoint", &body["InvocationEndpoint"])?;
2715 let endpoint = body["InvocationEndpoint"]
2716 .as_str()
2717 .ok_or_else(|| missing("InvocationEndpoint"))?
2718 .to_string();
2719 validate_string_length("invocationEndpoint", &endpoint, 1, 2048)?;
2720 validate_required("HttpMethod", &body["HttpMethod"])?;
2721 let http_method = body["HttpMethod"]
2722 .as_str()
2723 .ok_or_else(|| missing("HttpMethod"))?
2724 .to_string();
2725 validate_enum(
2726 "httpMethod",
2727 &http_method,
2728 &["POST", "GET", "HEAD", "OPTIONS", "PUT", "PATCH", "DELETE"],
2729 )?;
2730 let rate_limit = body["InvocationRateLimitPerSecond"].as_i64();
2731 if let Some(r) = rate_limit {
2732 validate_range_i64("invocationRateLimitPerSecond", r, 1, i64::MAX)?;
2733 }
2734
2735 let mut state = self.state.write();
2736 let now = Utc::now();
2737 let dest_uuid = uuid::Uuid::new_v4();
2738 let arn = format!(
2739 "arn:aws:events:{}:{}:api-destination/{}/{}",
2740 req.region, state.account_id, name, dest_uuid
2741 );
2742
2743 let dest = ApiDestination {
2744 name: name.clone(),
2745 arn: arn.clone(),
2746 description,
2747 connection_arn,
2748 invocation_endpoint: endpoint,
2749 http_method,
2750 invocation_rate_limit_per_second: rate_limit,
2751 state: "ACTIVE".to_string(),
2752 creation_time: now,
2753 last_modified_time: now,
2754 };
2755 state.api_destinations.insert(name, dest);
2756
2757 Ok(AwsResponse::ok_json(json!({
2758 "ApiDestinationArn": arn,
2759 "ApiDestinationState": "ACTIVE",
2760 "CreationTime": now.timestamp() as f64,
2761 "LastModifiedTime": now.timestamp() as f64,
2762 })))
2763 }
2764
2765 fn describe_api_destination(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2766 let body = req.json_body();
2767 validate_required("Name", &body["Name"])?;
2768 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
2769 validate_string_length("name", name, 1, 64)?;
2770
2771 let state = self.state.read();
2772 let dest = state.api_destinations.get(name).ok_or_else(|| {
2773 AwsServiceError::aws_error(
2774 StatusCode::BAD_REQUEST,
2775 "ResourceNotFoundException",
2776 format!("An api-destination '{name}' does not exist."),
2777 )
2778 })?;
2779
2780 let mut resp = json!({
2781 "ApiDestinationArn": dest.arn,
2782 "Name": dest.name,
2783 "ConnectionArn": dest.connection_arn,
2784 "InvocationEndpoint": dest.invocation_endpoint,
2785 "HttpMethod": dest.http_method,
2786 "ApiDestinationState": dest.state,
2787 "CreationTime": dest.creation_time.timestamp() as f64,
2788 "LastModifiedTime": dest.last_modified_time.timestamp() as f64,
2789 });
2790 if let Some(ref desc) = dest.description {
2791 resp["Description"] = json!(desc);
2792 }
2793 if let Some(rate) = dest.invocation_rate_limit_per_second {
2794 resp["InvocationRateLimitPerSecond"] = json!(rate);
2795 }
2796
2797 Ok(AwsResponse::ok_json(resp))
2798 }
2799
2800 fn list_api_destinations(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2801 let body = req.json_body();
2802 validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 64)?;
2803 validate_optional_string_length("connectionArn", body["ConnectionArn"].as_str(), 1, 1600)?;
2804 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
2805 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
2806
2807 let name_prefix = body["NamePrefix"].as_str();
2808 let connection_arn = body["ConnectionArn"].as_str();
2809 let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
2810
2811 let state = self.state.read();
2812 let all: Vec<Value> = state
2813 .api_destinations
2814 .values()
2815 .filter(|d| {
2816 if let Some(prefix) = name_prefix {
2817 if !d.name.starts_with(prefix) {
2818 return false;
2819 }
2820 }
2821 if let Some(arn) = connection_arn {
2822 if d.connection_arn != arn {
2823 return false;
2824 }
2825 }
2826 true
2827 })
2828 .map(|d| {
2829 let mut obj = json!({
2830 "ApiDestinationArn": d.arn,
2831 "Name": d.name,
2832 "ConnectionArn": d.connection_arn,
2833 "InvocationEndpoint": d.invocation_endpoint,
2834 "HttpMethod": d.http_method,
2835 "ApiDestinationState": d.state,
2836 "CreationTime": d.creation_time.timestamp() as f64,
2837 "LastModifiedTime": d.last_modified_time.timestamp() as f64,
2838 });
2839 if let Some(rate) = d.invocation_rate_limit_per_second {
2840 obj["InvocationRateLimitPerSecond"] = json!(rate);
2841 }
2842 obj
2843 })
2844 .collect();
2845
2846 let (dests, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
2847 let mut resp = json!({ "ApiDestinations": dests });
2848 if let Some(token) = next_token {
2849 resp["NextToken"] = json!(token);
2850 }
2851
2852 Ok(AwsResponse::ok_json(resp))
2853 }
2854
2855 fn update_api_destination(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2856 let body = req.json_body();
2857 validate_required("Name", &body["Name"])?;
2858 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
2859 validate_string_length("name", name, 1, 64)?;
2860 validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2861 validate_optional_string_length("connectionArn", body["ConnectionArn"].as_str(), 1, 1600)?;
2862 validate_optional_string_length(
2863 "invocationEndpoint",
2864 body["InvocationEndpoint"].as_str(),
2865 1,
2866 2048,
2867 )?;
2868 validate_optional_enum(
2869 "httpMethod",
2870 body["HttpMethod"].as_str(),
2871 &["POST", "GET", "HEAD", "OPTIONS", "PUT", "PATCH", "DELETE"],
2872 )?;
2873 if let Some(r) = body["InvocationRateLimitPerSecond"].as_i64() {
2874 validate_range_i64("invocationRateLimitPerSecond", r, 1, i64::MAX)?;
2875 }
2876
2877 let mut state = self.state.write();
2878 let dest = state.api_destinations.get_mut(name).ok_or_else(|| {
2879 AwsServiceError::aws_error(
2880 StatusCode::BAD_REQUEST,
2881 "ResourceNotFoundException",
2882 format!("An api-destination '{name}' does not exist."),
2883 )
2884 })?;
2885
2886 if let Some(desc) = body["Description"].as_str() {
2887 dest.description = Some(desc.to_string());
2888 }
2889 if let Some(endpoint) = body["InvocationEndpoint"].as_str() {
2890 dest.invocation_endpoint = endpoint.to_string();
2891 }
2892 if let Some(method) = body["HttpMethod"].as_str() {
2893 dest.http_method = method.to_string();
2894 }
2895 if let Some(rate) = body["InvocationRateLimitPerSecond"].as_i64() {
2896 dest.invocation_rate_limit_per_second = Some(rate);
2897 }
2898 if let Some(conn) = body["ConnectionArn"].as_str() {
2899 dest.connection_arn = conn.to_string();
2900 }
2901 dest.last_modified_time = Utc::now();
2902
2903 Ok(AwsResponse::ok_json(json!({
2904 "ApiDestinationArn": dest.arn,
2905 "ApiDestinationState": dest.state,
2906 "CreationTime": dest.creation_time.timestamp() as f64,
2907 "LastModifiedTime": dest.last_modified_time.timestamp() as f64,
2908 })))
2909 }
2910
2911 fn delete_api_destination(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2912 let body = req.json_body();
2913 validate_required("Name", &body["Name"])?;
2914 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
2915 validate_string_length("name", name, 1, 64)?;
2916
2917 let mut state = self.state.write();
2918 if !state.api_destinations.contains_key(name) {
2919 return Err(AwsServiceError::aws_error(
2920 StatusCode::BAD_REQUEST,
2921 "ResourceNotFoundException",
2922 format!("An api-destination '{name}' does not exist."),
2923 ));
2924 }
2925 state.api_destinations.remove(name);
2926
2927 Ok(AwsResponse::ok_json(json!({})))
2928 }
2929
2930 fn start_replay(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2933 let body = req.json_body();
2934 validate_required("ReplayName", &body["ReplayName"])?;
2935 let name = body["ReplayName"]
2936 .as_str()
2937 .ok_or_else(|| missing("ReplayName"))?
2938 .to_string();
2939 validate_string_length("replayName", &name, 1, 64)?;
2940 validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2941 validate_required("EventSourceArn", &body["EventSourceArn"])?;
2942 let description = body["Description"].as_str().map(|s| s.to_string());
2943 let event_source_arn = body["EventSourceArn"]
2944 .as_str()
2945 .ok_or_else(|| missing("EventSourceArn"))?
2946 .to_string();
2947 validate_string_length("eventSourceArn", &event_source_arn, 1, 1600)?;
2948 validate_required("EventStartTime", &body["EventStartTime"])?;
2949 validate_required("EventEndTime", &body["EventEndTime"])?;
2950 validate_required("Destination", &body["Destination"])?;
2951 let destination = body["Destination"].clone();
2952 let event_start_time_f = body["EventStartTime"].as_f64();
2953 let event_end_time_f = body["EventEndTime"].as_f64();
2954
2955 let event_start_time = event_start_time_f
2956 .and_then(|f| DateTime::from_timestamp(f as i64, 0))
2957 .unwrap_or_else(Utc::now);
2958 let event_end_time = event_end_time_f
2959 .and_then(|f| DateTime::from_timestamp(f as i64, 0))
2960 .unwrap_or_else(Utc::now);
2961
2962 let dest_arn = destination["Arn"].as_str().unwrap_or("");
2964 if !dest_arn.contains(":event-bus/") {
2965 return Err(AwsServiceError::aws_error(
2966 StatusCode::BAD_REQUEST,
2967 "ValidationException",
2968 "Parameter Destination.Arn is not valid. Reason: Must contain an event bus ARN.",
2969 ));
2970 }
2971
2972 let mut state = self.state.write();
2973
2974 let bus_name = state.resolve_bus_name(dest_arn);
2976 if !state.buses.contains_key(&bus_name) {
2977 return Err(AwsServiceError::aws_error(
2978 StatusCode::BAD_REQUEST,
2979 "ResourceNotFoundException",
2980 format!("Event bus {bus_name} does not exist."),
2981 ));
2982 }
2983
2984 let archive_name = event_source_arn
2986 .rsplit_once("archive/")
2987 .map(|(_, n)| n.to_string())
2988 .unwrap_or_default();
2989 if !state.archives.contains_key(&archive_name) {
2990 return Err(AwsServiceError::aws_error(
2991 StatusCode::BAD_REQUEST,
2992 "ValidationException",
2993 format!(
2994 "Parameter EventSourceArn is not valid. Reason: Archive {archive_name} does not exist."
2995 ),
2996 ));
2997 }
2998
2999 let archive = state.archives.get(&archive_name).unwrap();
3001 let archive_bus = state.resolve_bus_name(&archive.event_source_arn);
3002 if archive_bus != bus_name {
3003 return Err(AwsServiceError::aws_error(
3004 StatusCode::BAD_REQUEST,
3005 "ValidationException",
3006 "Parameter Destination.Arn is not valid. Reason: Cross event bus replay is not permitted.",
3007 ));
3008 }
3009
3010 if event_end_time <= event_start_time {
3012 return Err(AwsServiceError::aws_error(
3013 StatusCode::BAD_REQUEST,
3014 "ValidationException",
3015 "Parameter EventEndTime is not valid. Reason: EventStartTime must be before EventEndTime.",
3016 ));
3017 }
3018
3019 if state.replays.contains_key(&name) {
3021 return Err(AwsServiceError::aws_error(
3022 StatusCode::BAD_REQUEST,
3023 "ResourceAlreadyExistsException",
3024 format!("Replay {name} already exists."),
3025 ));
3026 }
3027
3028 let now = Utc::now();
3029 let arn = format!(
3030 "arn:aws:events:{}:{}:replay/{}",
3031 req.region, state.account_id, name
3032 );
3033
3034 let archive = state.archives.get(&archive_name).unwrap();
3036 let replay_events: Vec<PutEvent> = archive
3037 .events
3038 .iter()
3039 .filter(|e| e.time >= event_start_time && e.time < event_end_time)
3040 .cloned()
3041 .collect();
3042
3043 let mut events_to_deliver: Vec<(PutEvent, Vec<EventTarget>)> = Vec::new();
3045
3046 for event in &replay_events {
3047 let matching_targets: Vec<EventTarget> = state
3048 .rules
3049 .values()
3050 .filter(|r| {
3051 r.event_bus_name == bus_name
3052 && r.state == "ENABLED"
3053 && matches_pattern(
3054 r.event_pattern.as_deref(),
3055 &event.source,
3056 &event.detail_type,
3057 &event.detail,
3058 &req.account_id,
3059 &req.region,
3060 &event.resources,
3061 )
3062 })
3063 .flat_map(|r| r.targets.clone())
3064 .collect();
3065
3066 if !matching_targets.is_empty() {
3067 events_to_deliver.push((event.clone(), matching_targets));
3068 }
3069 }
3070
3071 let replay = Replay {
3072 name: name.clone(),
3073 arn: arn.clone(),
3074 description,
3075 event_source_arn,
3076 destination,
3077 event_start_time,
3078 event_end_time,
3079 state: "COMPLETED".to_string(),
3080 replay_start_time: now,
3081 replay_end_time: Some(now),
3082 };
3083 state.replays.insert(name, replay);
3084
3085 drop(state);
3087
3088 for (event, targets) in events_to_deliver {
3090 let detail_value: Value = serde_json::from_str(&event.detail).unwrap_or(json!({}));
3091 let event_json = json!({
3092 "version": "0",
3093 "id": event.event_id,
3094 "source": event.source,
3095 "account": req.account_id,
3096 "detail-type": event.detail_type,
3097 "detail": detail_value,
3098 "time": event.time.format("%Y-%m-%dT%H:%M:%SZ").to_string(),
3099 "region": req.region,
3100 "resources": event.resources,
3101 "replay-name": arn,
3102 });
3103 let event_str = event_json.to_string();
3104
3105 for target in targets {
3106 let target_arn = &target.arn;
3107 let body_str = if let Some(ref transformer) = target.input_transformer {
3108 apply_input_transformer(transformer, &event_json)
3109 } else if let Some(ref input) = target.input {
3110 input.clone()
3111 } else if let Some(ref input_path) = target.input_path {
3112 resolve_json_path(&event_json, input_path)
3113 .map(|v| v.to_string())
3114 .unwrap_or_else(|| event_str.clone())
3115 } else {
3116 event_str.clone()
3117 };
3118
3119 if target_arn.contains(":sqs:") {
3120 let group_id = target
3121 .sqs_parameters
3122 .as_ref()
3123 .and_then(|p| p["MessageGroupId"].as_str())
3124 .map(|s| s.to_string());
3125 if group_id.is_some() {
3126 self.delivery.send_to_sqs_with_attrs(
3127 target_arn,
3128 &body_str,
3129 &HashMap::new(),
3130 group_id.as_deref(),
3131 None,
3132 );
3133 } else {
3134 self.delivery
3135 .send_to_sqs(target_arn, &body_str, &HashMap::new());
3136 }
3137 } else if target_arn.contains(":sns:") {
3138 self.delivery
3139 .publish_to_sns(target_arn, &body_str, Some(&event.detail_type));
3140 } else if target_arn.contains(":lambda:") {
3141 let mut state = self.state.write();
3142 state
3143 .lambda_invocations
3144 .push(crate::state::LambdaInvocation {
3145 function_arn: target_arn.clone(),
3146 payload: body_str.clone(),
3147 timestamp: Utc::now(),
3148 });
3149 drop(state);
3150 if let Some(ref ls) = self.lambda_state {
3151 ls.write().invocations.push(LambdaInvocation {
3152 function_arn: target_arn.clone(),
3153 payload: body_str.clone(),
3154 timestamp: Utc::now(),
3155 source: "aws:events".to_string(),
3156 });
3157 }
3158 invoke_lambda_async(
3159 &self.container_runtime,
3160 &self.lambda_state,
3161 target_arn,
3162 &body_str,
3163 );
3164 } else if target_arn.contains(":logs:") {
3165 let mut state = self.state.write();
3166 state.log_deliveries.push(crate::state::LogDelivery {
3167 log_group_arn: target_arn.clone(),
3168 payload: body_str.clone(),
3169 timestamp: Utc::now(),
3170 });
3171 drop(state);
3172 if let Some(ref log_state) = self.logs_state {
3173 deliver_to_logs(log_state, target_arn, &body_str, Utc::now());
3174 }
3175 } else if target_arn.contains(":states:") {
3176 let mut state = self.state.write();
3177 state
3178 .step_function_executions
3179 .push(crate::state::StepFunctionExecution {
3180 state_machine_arn: target_arn.clone(),
3181 payload: body_str.clone(),
3182 timestamp: Utc::now(),
3183 });
3184 } else if target_arn.starts_with("https://") || target_arn.starts_with("http://") {
3185 let url = target_arn.clone();
3186 let payload = body_str.clone();
3187 tokio::spawn(async move {
3188 let client = reqwest::Client::new();
3189 let _ = client
3190 .post(&url)
3191 .header("Content-Type", "application/json")
3192 .body(payload)
3193 .send()
3194 .await;
3195 });
3196 }
3197 }
3198 }
3199
3200 Ok(AwsResponse::ok_json(json!({
3201 "ReplayArn": arn,
3202 "ReplayStartTime": now.timestamp() as f64,
3203 "State": "STARTING",
3204 })))
3205 }
3206
3207 fn describe_replay(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3208 let body = req.json_body();
3209 validate_required("ReplayName", &body["ReplayName"])?;
3210 let name = body["ReplayName"]
3211 .as_str()
3212 .ok_or_else(|| missing("ReplayName"))?;
3213 validate_string_length("replayName", name, 1, 64)?;
3214
3215 let state = self.state.read();
3216 let replay = state.replays.get(name).ok_or_else(|| {
3217 AwsServiceError::aws_error(
3218 StatusCode::BAD_REQUEST,
3219 "ResourceNotFoundException",
3220 format!("Replay {name} does not exist."),
3221 )
3222 })?;
3223
3224 let mut resp = json!({
3225 "Destination": replay.destination,
3226 "EventSourceArn": replay.event_source_arn,
3227 "EventStartTime": replay.event_start_time.timestamp() as f64,
3228 "EventEndTime": replay.event_end_time.timestamp() as f64,
3229 "ReplayArn": replay.arn,
3230 "ReplayName": replay.name,
3231 "ReplayStartTime": replay.replay_start_time.timestamp() as f64,
3232 "State": replay.state,
3233 });
3234 if let Some(ref desc) = replay.description {
3235 resp["Description"] = json!(desc);
3236 }
3237 if let Some(ref end) = replay.replay_end_time {
3238 resp["ReplayEndTime"] = json!(end.timestamp() as f64);
3239 }
3240
3241 Ok(AwsResponse::ok_json(resp))
3242 }
3243
3244 fn list_replays(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3245 let body = req.json_body();
3246 validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 64)?;
3247 validate_optional_string_length(
3248 "eventSourceArn",
3249 body["EventSourceArn"].as_str(),
3250 1,
3251 1600,
3252 )?;
3253 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
3254 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
3255 let name_prefix = body["NamePrefix"].as_str();
3256 let source_arn = body["EventSourceArn"].as_str();
3257 let replay_state = body["State"].as_str();
3258
3259 let filter_count = [
3261 name_prefix.is_some(),
3262 source_arn.is_some(),
3263 replay_state.is_some(),
3264 ]
3265 .iter()
3266 .filter(|&&x| x)
3267 .count();
3268 if filter_count > 1 {
3269 return Err(AwsServiceError::aws_error(
3270 StatusCode::BAD_REQUEST,
3271 "ValidationException",
3272 "At most one filter is allowed for ListReplays. Use either : State, EventSourceArn, or NamePrefix.",
3273 ));
3274 }
3275
3276 if let Some(s) = replay_state {
3278 let valid = [
3279 "CANCELLED",
3280 "CANCELLING",
3281 "COMPLETED",
3282 "FAILED",
3283 "RUNNING",
3284 "STARTING",
3285 ];
3286 if !valid.contains(&s) {
3287 return Err(AwsServiceError::aws_error(
3288 StatusCode::BAD_REQUEST,
3289 "ValidationException",
3290 format!(
3291 "1 validation error detected: Value '{}' at 'state' failed to satisfy constraint: Member must satisfy enum value set: [CANCELLED, CANCELLING, COMPLETED, FAILED, RUNNING, STARTING]",
3292 s
3293 ),
3294 ));
3295 }
3296 }
3297
3298 let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
3299
3300 let state = self.state.read();
3301 let all: Vec<Value> = state
3302 .replays
3303 .values()
3304 .filter(|r| {
3305 if let Some(prefix) = name_prefix {
3306 r.name.starts_with(prefix)
3307 } else if let Some(arn) = source_arn {
3308 r.event_source_arn == arn
3309 } else if let Some(s) = replay_state {
3310 r.state == s
3311 } else {
3312 true
3313 }
3314 })
3315 .map(|r| {
3316 let mut obj = json!({
3317 "EventSourceArn": r.event_source_arn,
3318 "EventStartTime": r.event_start_time.timestamp() as f64,
3319 "EventEndTime": r.event_end_time.timestamp() as f64,
3320 "ReplayName": r.name,
3321 "ReplayStartTime": r.replay_start_time.timestamp() as f64,
3322 "State": r.state,
3323 });
3324 if let Some(ref end) = r.replay_end_time {
3325 obj["ReplayEndTime"] = json!(end.timestamp() as f64);
3326 }
3327 obj
3328 })
3329 .collect();
3330
3331 let (replays, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
3332 let mut resp = json!({ "Replays": replays });
3333 if let Some(token) = next_token {
3334 resp["NextToken"] = json!(token);
3335 }
3336
3337 Ok(AwsResponse::ok_json(resp))
3338 }
3339
3340 fn cancel_replay(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3341 let body = req.json_body();
3342 validate_required("ReplayName", &body["ReplayName"])?;
3343 let name = body["ReplayName"]
3344 .as_str()
3345 .ok_or_else(|| missing("ReplayName"))?;
3346 validate_string_length("replayName", name, 1, 64)?;
3347
3348 let mut state = self.state.write();
3349 let replay = state.replays.get_mut(name).ok_or_else(|| {
3350 AwsServiceError::aws_error(
3351 StatusCode::BAD_REQUEST,
3352 "ResourceNotFoundException",
3353 format!("Replay {name} does not exist."),
3354 )
3355 })?;
3356
3357 if replay.state == "CANCELLED" || replay.state == "CANCELLING" {
3359 return Err(AwsServiceError::aws_error(
3360 StatusCode::BAD_REQUEST,
3361 "IllegalStatusException",
3362 format!("Replay {name} is not in a valid state for this operation."),
3363 ));
3364 }
3365
3366 let arn = replay.arn.clone();
3367 replay.state = "CANCELLED".to_string();
3368
3369 Ok(AwsResponse::ok_json(json!({
3370 "ReplayArn": arn,
3371 "State": "CANCELLING",
3372 })))
3373 }
3374}
3375
3376fn find_tags_mut<'a>(
3379 state: &'a mut crate::state::EventBridgeState,
3380 arn: &str,
3381) -> Result<&'a mut HashMap<String, String>, AwsServiceError> {
3382 for bus in state.buses.values_mut() {
3384 if bus.arn == arn {
3385 return Ok(&mut bus.tags);
3386 }
3387 }
3388 for rule in state.rules.values_mut() {
3390 if rule.arn == arn {
3391 return Ok(&mut rule.tags);
3392 }
3393 }
3394
3395 let error_msg = if arn.contains(":rule/") {
3397 let parts: Vec<&str> = arn.rsplitn(2, ":rule/").collect();
3399 if let Some(rule_path) = parts.first() {
3400 if let Some((bus, rule_name)) = rule_path.rsplit_once('/') {
3401 format!("Rule {rule_name} does not exist on EventBus {bus}.")
3402 } else {
3403 format!("Rule {} does not exist on EventBus default.", rule_path)
3404 }
3405 } else {
3406 format!("Resource {arn} not found.")
3407 }
3408 } else {
3409 format!("Resource {arn} not found.")
3410 };
3411
3412 Err(AwsServiceError::aws_error(
3413 StatusCode::BAD_REQUEST,
3414 "ResourceNotFoundException",
3415 error_msg,
3416 ))
3417}
3418
3419fn find_tags<'a>(
3420 state: &'a crate::state::EventBridgeState,
3421 arn: &str,
3422) -> Result<&'a HashMap<String, String>, AwsServiceError> {
3423 for bus in state.buses.values() {
3424 if bus.arn == arn {
3425 return Ok(&bus.tags);
3426 }
3427 }
3428 for rule in state.rules.values() {
3429 if rule.arn == arn {
3430 return Ok(&rule.tags);
3431 }
3432 }
3433
3434 let error_msg = if arn.contains(":rule/") {
3435 let parts: Vec<&str> = arn.rsplitn(2, ":rule/").collect();
3436 if let Some(rule_path) = parts.first() {
3437 if let Some((bus, rule_name)) = rule_path.rsplit_once('/') {
3438 format!("Rule {rule_name} does not exist on EventBus {bus}.")
3439 } else {
3440 format!("Rule {} does not exist on EventBus default.", rule_path)
3441 }
3442 } else {
3443 format!("Resource {arn} not found.")
3444 }
3445 } else {
3446 format!("Resource {arn} not found.")
3447 };
3448
3449 Err(AwsServiceError::aws_error(
3450 StatusCode::BAD_REQUEST,
3451 "ResourceNotFoundException",
3452 error_msg,
3453 ))
3454}
3455
3456fn validate_event_pattern(pattern: &str) -> Result<(), AwsServiceError> {
3459 let parsed: Value = serde_json::from_str(pattern).map_err(|_| {
3460 AwsServiceError::aws_error(
3461 StatusCode::BAD_REQUEST,
3462 "InvalidEventPatternException",
3463 "Event pattern is not valid. Reason: Invalid JSON",
3464 )
3465 })?;
3466
3467 validate_pattern_values(&parsed, "")?;
3468 Ok(())
3469}
3470
3471fn validate_pattern_values(value: &Value, path: &str) -> Result<(), AwsServiceError> {
3472 match value {
3473 Value::Object(obj) => {
3474 for (key, val) in obj {
3475 let new_path = if path.is_empty() {
3476 key.clone()
3477 } else {
3478 format!("{path}.{key}")
3479 };
3480 match val {
3481 Value::Object(_) => validate_pattern_values(val, &new_path)?,
3482 Value::Array(_) => {} _ => {
3484 return Err(AwsServiceError::aws_error(
3485 StatusCode::BAD_REQUEST,
3486 "InvalidEventPatternException",
3487 format!(
3488 "Event pattern is not valid. Reason: '{}' must be an object or an array",
3489 key
3490 ),
3491 ));
3492 }
3493 }
3494 }
3495 Ok(())
3496 }
3497 _ => Ok(()),
3498 }
3499}
3500
3501fn build_auth_params_response(auth_type: &str, params: &Value) -> Value {
3504 match auth_type {
3505 "API_KEY" => {
3506 let mut resp = json!({});
3507 if let Some(api_key) = params.get("ApiKeyAuthParameters") {
3508 resp["ApiKeyAuthParameters"] = json!({
3509 "ApiKeyName": api_key["ApiKeyName"],
3510 });
3511 }
3512 resp
3513 }
3514 "BASIC" => {
3515 let mut resp = json!({});
3516 if let Some(basic) = params.get("BasicAuthParameters") {
3517 resp["BasicAuthParameters"] = json!({
3518 "Username": basic["Username"],
3519 });
3520 }
3521 resp
3522 }
3523 "OAUTH_CLIENT_CREDENTIALS" => {
3524 let mut resp = json!({});
3525 if let Some(oauth) = params.get("OAuthParameters") {
3526 resp["OAuthParameters"] = json!({
3527 "AuthorizationEndpoint": oauth["AuthorizationEndpoint"],
3528 "HttpMethod": oauth["HttpMethod"],
3529 "ClientParameters": {
3530 "ClientID": oauth.get("ClientParameters").and_then(|c| c.get("ClientID")),
3531 },
3532 });
3533 }
3534 resp
3535 }
3536 _ => params.clone(),
3537 }
3538}
3539
3540pub fn matches_pattern(
3544 pattern_json: Option<&str>,
3545 source: &str,
3546 detail_type: &str,
3547 detail: &str,
3548 account: &str,
3549 region: &str,
3550 resources: &[String],
3551) -> bool {
3552 let pattern_json = match pattern_json {
3553 Some(p) => p,
3554 None => return true,
3555 };
3556
3557 let pattern: Value = match serde_json::from_str(pattern_json) {
3558 Ok(v) => v,
3559 Err(_) => return false,
3560 };
3561
3562 let pattern_obj = match pattern.as_object() {
3563 Some(o) => o,
3564 None => return false,
3565 };
3566
3567 let detail_value: Value = serde_json::from_str(detail).unwrap_or(json!({}));
3568 let event = json!({
3569 "source": source,
3570 "detail-type": detail_type,
3571 "detail": detail_value,
3572 "account": account,
3573 "region": region,
3574 "resources": resources,
3575 });
3576
3577 for (key, pattern_value) in pattern_obj {
3578 let event_value = &event[key];
3579 if !matches_value(pattern_value, event_value) {
3580 return false;
3581 }
3582 }
3583
3584 true
3585}
3586
3587fn matches_value(pattern: &Value, event_value: &Value) -> bool {
3588 match pattern {
3589 Value::Object(obj) => {
3590 for (key, sub_pattern) in obj {
3591 let sub_value = &event_value[key];
3592 if !matches_value(sub_pattern, sub_value) {
3593 return false;
3594 }
3595 }
3596 true
3597 }
3598 Value::Array(arr) => arr.iter().any(|elem| matches_single(elem, event_value)),
3599 _ => false,
3600 }
3601}
3602
3603fn matches_single(pattern_elem: &Value, event_value: &Value) -> bool {
3604 match pattern_elem {
3605 Value::Object(obj) => {
3606 if let Some(prefix_val) = obj.get("prefix") {
3607 if let (Some(prefix), Some(actual)) = (prefix_val.as_str(), event_value.as_str()) {
3608 return actual.starts_with(prefix);
3609 }
3610 return false;
3611 }
3612 if let Some(exists_val) = obj.get("exists") {
3613 let should_exist = exists_val.as_bool().unwrap_or(true);
3614 let does_exist = !event_value.is_null();
3615 return should_exist == does_exist;
3616 }
3617 if let Some(anything_but_val) = obj.get("anything-but") {
3618 return match anything_but_val {
3619 Value::String(s) => event_value.as_str() != Some(s.as_str()),
3620 Value::Array(arr) => !arr.iter().any(|v| values_equal(v, event_value)),
3621 Value::Number(_) => event_value != anything_but_val,
3622 _ => true,
3623 };
3624 }
3625 if let Some(numeric_val) = obj.get("numeric") {
3626 return matches_numeric(numeric_val, event_value);
3627 }
3628 false
3629 }
3630 _ => values_equal(pattern_elem, event_value),
3631 }
3632}
3633
3634fn matches_numeric(numeric_arr: &Value, event_value: &Value) -> bool {
3635 let arr = match numeric_arr.as_array() {
3636 Some(a) => a,
3637 None => return false,
3638 };
3639 let actual = match event_value.as_f64() {
3640 Some(n) => n,
3641 None => return false,
3642 };
3643 let mut i = 0;
3644 while i + 1 < arr.len() {
3645 let op = match arr[i].as_str() {
3646 Some(s) => s,
3647 None => return false,
3648 };
3649 let threshold = match arr[i + 1].as_f64() {
3650 Some(n) => n,
3651 None => return false,
3652 };
3653 let ok = match op {
3654 ">" => actual > threshold,
3655 ">=" => actual >= threshold,
3656 "<" => actual < threshold,
3657 "<=" => actual <= threshold,
3658 "=" => (actual - threshold).abs() < f64::EPSILON,
3659 _ => return false,
3660 };
3661 if !ok {
3662 return false;
3663 }
3664 i += 2;
3665 }
3666 true
3667}
3668
3669fn values_equal(a: &Value, b: &Value) -> bool {
3670 a == b
3671}
3672
3673fn resolve_json_path(event: &Value, path: &str) -> Option<Value> {
3675 let path = path.strip_prefix('$').unwrap_or(path);
3676 let mut current = event;
3677 for segment in path.split('.') {
3678 if segment.is_empty() {
3679 continue;
3680 }
3681 current = current.get(segment)?;
3682 }
3683 Some(current.clone())
3684}
3685
3686fn apply_input_transformer(transformer: &Value, event: &Value) -> String {
3688 let input_paths_map = transformer
3689 .get("InputPathsMap")
3690 .and_then(|v| v.as_object())
3691 .cloned()
3692 .unwrap_or_default();
3693 let template = transformer
3694 .get("InputTemplate")
3695 .and_then(|v| v.as_str())
3696 .unwrap_or("")
3697 .to_string();
3698
3699 let mut resolved: HashMap<String, Value> = HashMap::new();
3701 for (var_name, path_val) in &input_paths_map {
3702 if let Some(path_str) = path_val.as_str() {
3703 if let Some(val) = resolve_json_path(event, path_str) {
3704 resolved.insert(var_name.clone(), val);
3705 }
3706 }
3707 }
3708
3709 let mut result = template;
3711 for (var_name, val) in &resolved {
3712 let placeholder = format!("<{var_name}>");
3713 let replacement = match val {
3714 Value::String(s) => s.clone(),
3715 other => other.to_string(),
3716 };
3717 result = result.replace(&placeholder, &replacement);
3718 }
3719
3720 result
3721}
3722
3723fn missing(name: &str) -> AwsServiceError {
3724 AwsServiceError::aws_error(
3725 StatusCode::BAD_REQUEST,
3726 "ValidationException",
3727 format!("The request must contain the parameter {name}"),
3728 )
3729}
3730
3731fn function_name_from_arn(arn: &str) -> &str {
3736 let parts: Vec<&str> = arn.split(':').collect();
3737 if parts.len() >= 7 && parts[5] == "function" {
3738 parts[6]
3739 } else {
3740 arn
3741 }
3742}
3743
3744pub fn invoke_lambda_async(
3747 container_runtime: &Option<Arc<ContainerRuntime>>,
3748 lambda_state: &Option<SharedLambdaState>,
3749 function_arn: &str,
3750 payload: &str,
3751) {
3752 let runtime = match container_runtime {
3753 Some(rt) => rt.clone(),
3754 None => return,
3755 };
3756 let lambda_state = match lambda_state {
3757 Some(ls) => ls.clone(),
3758 None => return,
3759 };
3760 let func_name = function_name_from_arn(function_arn).to_string();
3761 let payload = payload.as_bytes().to_vec();
3762
3763 tokio::spawn(async move {
3764 let func = {
3765 let state = lambda_state.read();
3766 state.functions.get(&func_name).cloned()
3767 };
3768 let func = match func {
3769 Some(f) => f,
3770 None => {
3771 tracing::warn!(
3772 function = %func_name,
3773 "EventBridge Lambda target not found, skipping invocation"
3774 );
3775 return;
3776 }
3777 };
3778 match runtime.invoke(&func, &payload).await {
3779 Ok(_) => {
3780 tracing::info!(function = %func_name, "EventBridge Lambda invocation succeeded");
3781 }
3782 Err(e) => {
3783 tracing::warn!(
3784 function = %func_name,
3785 error = %e,
3786 "EventBridge Lambda invocation failed"
3787 );
3788 }
3789 }
3790 });
3791}
3792
3793pub fn deliver_to_logs(
3796 logs_state: &SharedLogsState,
3797 log_group_arn: &str,
3798 payload: &str,
3799 timestamp: chrono::DateTime<chrono::Utc>,
3800) {
3801 let group_name = if log_group_arn.contains(":log-group:") {
3804 log_group_arn
3805 .split(":log-group:")
3806 .nth(1)
3807 .unwrap_or(log_group_arn)
3808 .trim_end_matches(":*")
3809 } else {
3810 log_group_arn
3811 };
3812
3813 let stream_name = "events".to_string();
3814 let ts_millis = timestamp.timestamp_millis();
3815
3816 let mut state = logs_state.write();
3817 let region = state.region.clone();
3818 let account_id = state.account_id.clone();
3819
3820 let group = state
3822 .log_groups
3823 .entry(group_name.to_string())
3824 .or_insert_with(|| fakecloud_logs::state::LogGroup {
3825 name: group_name.to_string(),
3826 arn: Arn::new(
3827 "logs",
3828 ®ion,
3829 &account_id,
3830 &format!("log-group:{group_name}"),
3831 )
3832 .to_string(),
3833 creation_time: ts_millis,
3834 retention_in_days: None,
3835 kms_key_id: None,
3836 tags: HashMap::new(),
3837 log_streams: HashMap::new(),
3838 stored_bytes: 0,
3839 subscription_filters: Vec::new(),
3840 data_protection_policy: None,
3841 index_policies: Vec::new(),
3842 transformer: None,
3843 deletion_protection: false,
3844 });
3845
3846 let stream = group
3847 .log_streams
3848 .entry(stream_name.clone())
3849 .or_insert_with(|| fakecloud_logs::state::LogStream {
3850 name: stream_name,
3851 arn: format!("{}:log-stream:events", group.arn),
3852 creation_time: ts_millis,
3853 first_event_timestamp: None,
3854 last_event_timestamp: None,
3855 last_ingestion_time: None,
3856 upload_sequence_token: "1".to_string(),
3857 events: Vec::new(),
3858 });
3859
3860 stream.events.push(fakecloud_logs::state::LogEvent {
3861 timestamp: ts_millis,
3862 message: payload.to_string(),
3863 ingestion_time: ts_millis,
3864 });
3865 stream.last_event_timestamp = Some(ts_millis);
3866 stream.last_ingestion_time = Some(ts_millis);
3867 if stream.first_event_timestamp.is_none() {
3868 stream.first_event_timestamp = Some(ts_millis);
3869 }
3870}
3871
3872fn apply_connection_auth(
3874 mut builder: reqwest::RequestBuilder,
3875 conn: &Connection,
3876) -> reqwest::RequestBuilder {
3877 match conn.authorization_type.as_str() {
3878 "API_KEY" => {
3879 if let Some(params) = conn.auth_parameters.get("ApiKeyAuthParameters") {
3880 if let (Some(name), Some(value)) = (
3881 params["ApiKeyName"].as_str(),
3882 params["ApiKeyValue"].as_str(),
3883 ) {
3884 builder = builder.header(name, value);
3885 }
3886 }
3887 }
3888 "BASIC" => {
3889 if let Some(params) = conn.auth_parameters.get("BasicAuthParameters") {
3890 if let (Some(user), Some(pass)) =
3891 (params["Username"].as_str(), params["Password"].as_str())
3892 {
3893 builder = builder.basic_auth(user, Some(pass));
3894 }
3895 }
3896 }
3897 "OAUTH_CLIENT_CREDENTIALS" => {
3898 if let Some(params) = conn.auth_parameters.get("OAuthParameters") {
3901 if let (Some(client_id), Some(client_secret)) = (
3902 params["ClientParameters"]["ClientID"].as_str(),
3903 params["ClientParameters"]["ClientSecret"].as_str(),
3904 ) {
3905 builder = builder.basic_auth(client_id, Some(client_secret));
3906 }
3907 }
3908 }
3909 _ => {}
3910 }
3911 builder
3912}
3913
3914#[cfg(test)]
3915mod tests {
3916 use super::*;
3917
3918 fn test_matches(
3920 pattern_json: Option<&str>,
3921 source: &str,
3922 detail_type: &str,
3923 detail: &str,
3924 ) -> bool {
3925 matches_pattern(
3926 pattern_json,
3927 source,
3928 detail_type,
3929 detail,
3930 "123456789012",
3931 "us-east-1",
3932 &[],
3933 )
3934 }
3935
3936 #[test]
3937 fn pattern_matches_source() {
3938 assert!(test_matches(
3939 Some(r#"{"source": ["my.app"]}"#),
3940 "my.app",
3941 "OrderPlaced",
3942 "{}"
3943 ));
3944 assert!(!test_matches(
3945 Some(r#"{"source": ["other.app"]}"#),
3946 "my.app",
3947 "OrderPlaced",
3948 "{}"
3949 ));
3950 }
3951
3952 #[test]
3953 fn pattern_matches_detail_type() {
3954 assert!(test_matches(
3955 Some(r#"{"detail-type": ["OrderPlaced"]}"#),
3956 "my.app",
3957 "OrderPlaced",
3958 "{}"
3959 ));
3960 assert!(!test_matches(
3961 Some(r#"{"detail-type": ["OrderShipped"]}"#),
3962 "my.app",
3963 "OrderPlaced",
3964 "{}"
3965 ));
3966 }
3967
3968 #[test]
3969 fn pattern_matches_detail_field() {
3970 assert!(test_matches(
3971 Some(r#"{"detail": {"status": ["ACTIVE"]}}"#),
3972 "my.app",
3973 "StatusChange",
3974 r#"{"status": "ACTIVE"}"#
3975 ));
3976 assert!(!test_matches(
3977 Some(r#"{"detail": {"status": ["ACTIVE"]}}"#),
3978 "my.app",
3979 "StatusChange",
3980 r#"{"status": "INACTIVE"}"#
3981 ));
3982 }
3983
3984 #[test]
3985 fn no_pattern_matches_everything() {
3986 assert!(test_matches(None, "any", "any", "{}"));
3987 }
3988
3989 #[test]
3990 fn combined_pattern() {
3991 let pattern = r#"{"source": ["orders"], "detail-type": ["OrderPlaced"]}"#;
3992 assert!(test_matches(Some(pattern), "orders", "OrderPlaced", "{}"));
3993 assert!(!test_matches(Some(pattern), "orders", "OrderShipped", "{}"));
3994 assert!(!test_matches(Some(pattern), "other", "OrderPlaced", "{}"));
3995 }
3996
3997 #[test]
3998 fn nested_detail_pattern() {
3999 let pattern = r#"{"detail": {"order": {"status": ["PLACED"]}}}"#;
4000 assert!(test_matches(
4001 Some(pattern),
4002 "my.app",
4003 "OrderEvent",
4004 r#"{"order": {"status": "PLACED", "id": "123"}}"#
4005 ));
4006 assert!(!test_matches(
4007 Some(pattern),
4008 "my.app",
4009 "OrderEvent",
4010 r#"{"order": {"status": "SHIPPED", "id": "123"}}"#
4011 ));
4012 assert!(!test_matches(
4013 Some(pattern),
4014 "my.app",
4015 "OrderEvent",
4016 r#"{"order": {"id": "123"}}"#
4017 ));
4018 }
4019
4020 #[test]
4021 fn deeply_nested_detail_pattern() {
4022 let pattern = r#"{"detail": {"a": {"b": {"c": ["deep"]}}}}"#;
4023 assert!(test_matches(
4024 Some(pattern),
4025 "src",
4026 "type",
4027 r#"{"a": {"b": {"c": "deep"}}}"#
4028 ));
4029 assert!(!test_matches(
4030 Some(pattern),
4031 "src",
4032 "type",
4033 r#"{"a": {"b": {"c": "shallow"}}}"#
4034 ));
4035 }
4036
4037 #[test]
4038 fn prefix_matcher() {
4039 let pattern = r#"{"source": [{"prefix": "com.myapp"}]}"#;
4040 assert!(test_matches(
4041 Some(pattern),
4042 "com.myapp.orders",
4043 "OrderPlaced",
4044 "{}"
4045 ));
4046 assert!(test_matches(
4047 Some(pattern),
4048 "com.myapp",
4049 "OrderPlaced",
4050 "{}"
4051 ));
4052 assert!(!test_matches(
4053 Some(pattern),
4054 "com.other",
4055 "OrderPlaced",
4056 "{}"
4057 ));
4058 }
4059
4060 #[test]
4061 fn prefix_matcher_in_detail() {
4062 let pattern = r#"{"detail": {"region": [{"prefix": "us-"}]}}"#;
4063 assert!(test_matches(
4064 Some(pattern),
4065 "src",
4066 "type",
4067 r#"{"region": "us-east-1"}"#
4068 ));
4069 assert!(!test_matches(
4070 Some(pattern),
4071 "src",
4072 "type",
4073 r#"{"region": "eu-west-1"}"#
4074 ));
4075 }
4076
4077 #[test]
4078 fn exists_matcher() {
4079 let pattern = r#"{"detail": {"error": [{"exists": true}]}}"#;
4080 assert!(test_matches(
4081 Some(pattern),
4082 "src",
4083 "type",
4084 r#"{"error": "something broke"}"#
4085 ));
4086 assert!(!test_matches(
4087 Some(pattern),
4088 "src",
4089 "type",
4090 r#"{"status": "ok"}"#
4091 ));
4092
4093 let pattern = r#"{"detail": {"error": [{"exists": false}]}}"#;
4094 assert!(test_matches(
4095 Some(pattern),
4096 "src",
4097 "type",
4098 r#"{"status": "ok"}"#
4099 ));
4100 assert!(!test_matches(
4101 Some(pattern),
4102 "src",
4103 "type",
4104 r#"{"error": "something broke"}"#
4105 ));
4106 }
4107
4108 #[test]
4109 fn anything_but_matcher() {
4110 let pattern = r#"{"source": [{"anything-but": "internal"}]}"#;
4111 assert!(test_matches(Some(pattern), "external", "Event", "{}"));
4112 assert!(!test_matches(Some(pattern), "internal", "Event", "{}"));
4113
4114 let pattern = r#"{"source": [{"anything-but": ["internal", "test"]}]}"#;
4115 assert!(test_matches(Some(pattern), "external", "Event", "{}"));
4116 assert!(!test_matches(Some(pattern), "internal", "Event", "{}"));
4117 assert!(!test_matches(Some(pattern), "test", "Event", "{}"));
4118 }
4119
4120 #[test]
4121 fn anything_but_in_detail() {
4122 let pattern = r#"{"detail": {"env": [{"anything-but": "prod"}]}}"#;
4123 assert!(test_matches(
4124 Some(pattern),
4125 "src",
4126 "type",
4127 r#"{"env": "staging"}"#
4128 ));
4129 assert!(!test_matches(
4130 Some(pattern),
4131 "src",
4132 "type",
4133 r#"{"env": "prod"}"#
4134 ));
4135 }
4136
4137 #[test]
4138 fn numeric_greater_than() {
4139 let pattern = r#"{"detail": {"count": [{"numeric": [">", 100]}]}}"#;
4140 assert!(test_matches(
4141 Some(pattern),
4142 "src",
4143 "type",
4144 r#"{"count": 150}"#
4145 ));
4146 assert!(!test_matches(
4147 Some(pattern),
4148 "src",
4149 "type",
4150 r#"{"count": 100}"#
4151 ));
4152 assert!(!test_matches(
4153 Some(pattern),
4154 "src",
4155 "type",
4156 r#"{"count": 50}"#
4157 ));
4158 }
4159
4160 #[test]
4161 fn numeric_less_than() {
4162 let pattern = r#"{"detail": {"count": [{"numeric": ["<", 10]}]}}"#;
4163 assert!(test_matches(
4164 Some(pattern),
4165 "src",
4166 "type",
4167 r#"{"count": 5}"#
4168 ));
4169 assert!(!test_matches(
4170 Some(pattern),
4171 "src",
4172 "type",
4173 r#"{"count": 10}"#
4174 ));
4175 assert!(!test_matches(
4176 Some(pattern),
4177 "src",
4178 "type",
4179 r#"{"count": 15}"#
4180 ));
4181 }
4182
4183 #[test]
4184 fn numeric_range() {
4185 let pattern = r#"{"detail": {"count": [{"numeric": [">=", 50, "<", 200]}]}}"#;
4186 assert!(test_matches(
4187 Some(pattern),
4188 "src",
4189 "type",
4190 r#"{"count": 50}"#
4191 ));
4192 assert!(test_matches(
4193 Some(pattern),
4194 "src",
4195 "type",
4196 r#"{"count": 100}"#
4197 ));
4198 assert!(!test_matches(
4199 Some(pattern),
4200 "src",
4201 "type",
4202 r#"{"count": 200}"#
4203 ));
4204 assert!(!test_matches(
4205 Some(pattern),
4206 "src",
4207 "type",
4208 r#"{"count": 49}"#
4209 ));
4210 }
4211
4212 #[test]
4213 fn mixed_matchers_and_literals() {
4214 let pattern = r#"{"source": ["exact.match", {"prefix": "com.myapp"}]}"#;
4215 assert!(test_matches(Some(pattern), "exact.match", "Event", "{}"));
4216 assert!(test_matches(
4217 Some(pattern),
4218 "com.myapp.orders",
4219 "Event",
4220 "{}"
4221 ));
4222 assert!(!test_matches(Some(pattern), "other.source", "Event", "{}"));
4223 }
4224
4225 use crate::state::EventBridgeState;
4228 use fakecloud_core::delivery::DeliveryBus;
4229 use parking_lot::RwLock;
4230
4231 fn make_service() -> EventBridgeService {
4232 let state = Arc::new(RwLock::new(EventBridgeState::new(
4233 "123456789012",
4234 "us-east-1",
4235 )));
4236 let delivery = Arc::new(DeliveryBus::new());
4237 EventBridgeService::new(state, delivery)
4238 }
4239
4240 fn make_request(action: &str, body: Value) -> AwsRequest {
4241 AwsRequest {
4242 service: "events".to_string(),
4243 action: action.to_string(),
4244 region: "us-east-1".to_string(),
4245 account_id: "123456789012".to_string(),
4246 request_id: "test-id".to_string(),
4247 headers: http::HeaderMap::new(),
4248 query_params: HashMap::new(),
4249 body: serde_json::to_vec(&body).unwrap().into(),
4250 path_segments: vec![],
4251 raw_path: "/".to_string(),
4252 raw_query: String::new(),
4253 method: http::Method::POST,
4254 is_query_protocol: false,
4255 access_key_id: None,
4256 }
4257 }
4258
4259 fn create_connection(svc: &EventBridgeService, name: &str) {
4260 let req = make_request(
4261 "CreateConnection",
4262 json!({
4263 "Name": name,
4264 "AuthorizationType": "API_KEY",
4265 "AuthParameters": {
4266 "ApiKeyAuthParameters": {
4267 "ApiKeyName": "x-api-key",
4268 "ApiKeyValue": "secret"
4269 }
4270 }
4271 }),
4272 );
4273 svc.create_connection(&req).unwrap();
4274 }
4275
4276 fn create_api_destination(svc: &EventBridgeService, name: &str, conn_name: &str) {
4277 let conn_arn_field = {
4278 let state = svc.state.read();
4279 state.connections.get(conn_name).unwrap().arn.clone()
4280 };
4281 let req = make_request(
4282 "CreateApiDestination",
4283 json!({
4284 "Name": name,
4285 "ConnectionArn": conn_arn_field,
4286 "InvocationEndpoint": "https://example.com",
4287 "HttpMethod": "POST"
4288 }),
4289 );
4290 svc.create_api_destination(&req).unwrap();
4291 }
4292
4293 #[test]
4296 fn list_connections_returns_all_by_default() {
4297 let svc = make_service();
4298 create_connection(&svc, "conn-alpha");
4299 create_connection(&svc, "conn-beta");
4300 create_connection(&svc, "conn-gamma");
4301
4302 let req = make_request("ListConnections", json!({}));
4303 let resp = svc.list_connections(&req).unwrap();
4304 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4305 assert_eq!(body["Connections"].as_array().unwrap().len(), 3);
4306 assert!(body["NextToken"].is_null());
4307 }
4308
4309 #[test]
4310 fn list_connections_name_prefix_filter() {
4311 let svc = make_service();
4312 create_connection(&svc, "prod-conn-1");
4313 create_connection(&svc, "prod-conn-2");
4314 create_connection(&svc, "dev-conn-1");
4315
4316 let req = make_request("ListConnections", json!({ "NamePrefix": "prod-" }));
4317 let resp = svc.list_connections(&req).unwrap();
4318 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4319 let names: Vec<&str> = body["Connections"]
4320 .as_array()
4321 .unwrap()
4322 .iter()
4323 .map(|c| c["Name"].as_str().unwrap())
4324 .collect();
4325 assert_eq!(names.len(), 2);
4326 assert!(names.iter().all(|n| n.starts_with("prod-")));
4327 }
4328
4329 #[test]
4330 fn list_connections_state_filter() {
4331 let svc = make_service();
4332 create_connection(&svc, "conn-a");
4333 create_connection(&svc, "conn-b");
4334
4335 {
4337 let mut state = svc.state.write();
4338 state
4339 .connections
4340 .get_mut("conn-b")
4341 .unwrap()
4342 .connection_state = "DEAUTHORIZED".to_string();
4343 }
4344
4345 let req = make_request(
4346 "ListConnections",
4347 json!({ "ConnectionState": "AUTHORIZED" }),
4348 );
4349 let resp = svc.list_connections(&req).unwrap();
4350 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4351 let conns = body["Connections"].as_array().unwrap();
4352 assert_eq!(conns.len(), 1);
4353 assert_eq!(conns[0]["Name"].as_str().unwrap(), "conn-a");
4354 }
4355
4356 #[test]
4357 fn list_connections_pagination() {
4358 let svc = make_service();
4359 for i in 0..5 {
4360 create_connection(&svc, &format!("conn-{i:02}"));
4361 }
4362
4363 let req = make_request("ListConnections", json!({ "Limit": 2 }));
4365 let resp = svc.list_connections(&req).unwrap();
4366 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4367 assert_eq!(body["Connections"].as_array().unwrap().len(), 2);
4368 let token = body["NextToken"].as_str().unwrap();
4369 assert_eq!(token, "2");
4370
4371 let req = make_request("ListConnections", json!({ "Limit": 2, "NextToken": token }));
4373 let resp = svc.list_connections(&req).unwrap();
4374 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4375 assert_eq!(body["Connections"].as_array().unwrap().len(), 2);
4376 let token = body["NextToken"].as_str().unwrap();
4377 assert_eq!(token, "4");
4378
4379 let req = make_request("ListConnections", json!({ "Limit": 2, "NextToken": token }));
4381 let resp = svc.list_connections(&req).unwrap();
4382 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4383 assert_eq!(body["Connections"].as_array().unwrap().len(), 1);
4384 assert!(body["NextToken"].is_null());
4385 }
4386
4387 #[test]
4388 fn list_connections_pagination_with_filter() {
4389 let svc = make_service();
4390 for i in 0..4 {
4391 create_connection(&svc, &format!("prod-{i:02}"));
4392 }
4393 create_connection(&svc, "dev-00");
4394
4395 let req = make_request(
4396 "ListConnections",
4397 json!({ "NamePrefix": "prod-", "Limit": 2 }),
4398 );
4399 let resp = svc.list_connections(&req).unwrap();
4400 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4401 assert_eq!(body["Connections"].as_array().unwrap().len(), 2);
4402 assert!(body["NextToken"].as_str().is_some());
4403 }
4404
4405 #[test]
4408 fn list_api_destinations_returns_all_by_default() {
4409 let svc = make_service();
4410 create_connection(&svc, "my-conn");
4411 create_api_destination(&svc, "dest-alpha", "my-conn");
4412 create_api_destination(&svc, "dest-beta", "my-conn");
4413
4414 let req = make_request("ListApiDestinations", json!({}));
4415 let resp = svc.list_api_destinations(&req).unwrap();
4416 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4417 assert_eq!(body["ApiDestinations"].as_array().unwrap().len(), 2);
4418 assert!(body["NextToken"].is_null());
4419 }
4420
4421 #[test]
4422 fn list_api_destinations_name_prefix_filter() {
4423 let svc = make_service();
4424 create_connection(&svc, "my-conn");
4425 create_api_destination(&svc, "prod-dest-1", "my-conn");
4426 create_api_destination(&svc, "prod-dest-2", "my-conn");
4427 create_api_destination(&svc, "dev-dest-1", "my-conn");
4428
4429 let req = make_request("ListApiDestinations", json!({ "NamePrefix": "prod-" }));
4430 let resp = svc.list_api_destinations(&req).unwrap();
4431 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4432 let names: Vec<&str> = body["ApiDestinations"]
4433 .as_array()
4434 .unwrap()
4435 .iter()
4436 .map(|d| d["Name"].as_str().unwrap())
4437 .collect();
4438 assert_eq!(names.len(), 2);
4439 assert!(names.iter().all(|n| n.starts_with("prod-")));
4440 }
4441
4442 #[test]
4443 fn list_api_destinations_connection_arn_filter() {
4444 let svc = make_service();
4445 create_connection(&svc, "conn-a");
4446 create_connection(&svc, "conn-b");
4447 create_api_destination(&svc, "dest-1", "conn-a");
4448 create_api_destination(&svc, "dest-2", "conn-b");
4449 create_api_destination(&svc, "dest-3", "conn-a");
4450
4451 let conn_a_arn = {
4452 let state = svc.state.read();
4453 state.connections.get("conn-a").unwrap().arn.clone()
4454 };
4455
4456 let req = make_request(
4457 "ListApiDestinations",
4458 json!({ "ConnectionArn": conn_a_arn }),
4459 );
4460 let resp = svc.list_api_destinations(&req).unwrap();
4461 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4462 let names: Vec<&str> = body["ApiDestinations"]
4463 .as_array()
4464 .unwrap()
4465 .iter()
4466 .map(|d| d["Name"].as_str().unwrap())
4467 .collect();
4468 assert_eq!(names.len(), 2);
4469 assert!(names.contains(&"dest-1"));
4470 assert!(names.contains(&"dest-3"));
4471 }
4472
4473 #[test]
4474 fn list_api_destinations_pagination() {
4475 let svc = make_service();
4476 create_connection(&svc, "my-conn");
4477 for i in 0..5 {
4478 create_api_destination(&svc, &format!("dest-{i:02}"), "my-conn");
4479 }
4480
4481 let req = make_request("ListApiDestinations", json!({ "Limit": 2 }));
4483 let resp = svc.list_api_destinations(&req).unwrap();
4484 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4485 assert_eq!(body["ApiDestinations"].as_array().unwrap().len(), 2);
4486 let token = body["NextToken"].as_str().unwrap();
4487 assert_eq!(token, "2");
4488
4489 let req = make_request(
4491 "ListApiDestinations",
4492 json!({ "Limit": 2, "NextToken": token }),
4493 );
4494 let resp = svc.list_api_destinations(&req).unwrap();
4495 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4496 assert_eq!(body["ApiDestinations"].as_array().unwrap().len(), 2);
4497 let token = body["NextToken"].as_str().unwrap();
4498 assert_eq!(token, "4");
4499
4500 let req = make_request(
4502 "ListApiDestinations",
4503 json!({ "Limit": 2, "NextToken": token }),
4504 );
4505 let resp = svc.list_api_destinations(&req).unwrap();
4506 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4507 assert_eq!(body["ApiDestinations"].as_array().unwrap().len(), 1);
4508 assert!(body["NextToken"].is_null());
4509 }
4510
4511 fn create_event_bus(svc: &EventBridgeService, name: &str) {
4514 let req = make_request("CreateEventBus", json!({ "Name": name }));
4515 svc.create_event_bus(&req).unwrap();
4516 }
4517
4518 #[test]
4519 fn list_event_buses_pagination() {
4520 let svc = make_service();
4521 for i in 0..4 {
4523 create_event_bus(&svc, &format!("bus-{i:02}"));
4524 }
4525
4526 let req = make_request("ListEventBuses", json!({ "Limit": 2 }));
4528 let resp = svc.list_event_buses(&req).unwrap();
4529 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4530 assert_eq!(body["EventBuses"].as_array().unwrap().len(), 2);
4531 let token = body["NextToken"].as_str().unwrap();
4532 assert_eq!(token, "2");
4533
4534 let req = make_request("ListEventBuses", json!({ "Limit": 2, "NextToken": token }));
4536 let resp = svc.list_event_buses(&req).unwrap();
4537 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4538 assert_eq!(body["EventBuses"].as_array().unwrap().len(), 2);
4539 let token = body["NextToken"].as_str().unwrap();
4540 assert_eq!(token, "4");
4541
4542 let req = make_request("ListEventBuses", json!({ "Limit": 2, "NextToken": token }));
4544 let resp = svc.list_event_buses(&req).unwrap();
4545 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4546 assert_eq!(body["EventBuses"].as_array().unwrap().len(), 1);
4547 assert!(body["NextToken"].is_null());
4548 }
4549
4550 #[test]
4551 fn list_event_buses_no_pagination_returns_all() {
4552 let svc = make_service();
4553 create_event_bus(&svc, "bus-alpha");
4554 create_event_bus(&svc, "bus-beta");
4555
4556 let req = make_request("ListEventBuses", json!({}));
4557 let resp = svc.list_event_buses(&req).unwrap();
4558 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4559 assert_eq!(body["EventBuses"].as_array().unwrap().len(), 3);
4561 assert!(body["NextToken"].is_null());
4562 }
4563
4564 #[test]
4567 fn put_events_never_includes_endpoint_id_in_response() {
4568 let svc = make_service();
4569 let req = make_request(
4571 "PutEvents",
4572 json!({
4573 "EndpointId": "my-endpoint.abc123",
4574 "Entries": [{
4575 "Source": "my.source",
4576 "DetailType": "MyType",
4577 "Detail": "{}",
4578 "EventBusName": "default"
4579 }]
4580 }),
4581 );
4582 let resp = svc.put_events(&req).unwrap();
4583 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4584 assert!(
4585 !body.as_object().unwrap().contains_key("EndpointId"),
4586 "EndpointId should never be in the PutEvents response"
4587 );
4588 assert_eq!(body["FailedEntryCount"], 0);
4589 }
4590
4591 fn create_archive(svc: &EventBridgeService, name: &str) {
4594 let req = make_request(
4595 "CreateArchive",
4596 json!({
4597 "ArchiveName": name,
4598 "EventSourceArn": "arn:aws:events:us-east-1:123456789012:event-bus/default"
4599 }),
4600 );
4601 svc.create_archive(&req).unwrap();
4602 }
4603
4604 #[test]
4605 fn list_archives_pagination() {
4606 let svc = make_service();
4607 for i in 0..5 {
4608 create_archive(&svc, &format!("archive-{i:02}"));
4609 }
4610
4611 let req = make_request("ListArchives", json!({ "Limit": 2 }));
4613 let resp = svc.list_archives(&req).unwrap();
4614 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4615 assert_eq!(body["Archives"].as_array().unwrap().len(), 2);
4616 let token = body["NextToken"].as_str().unwrap();
4617 assert_eq!(token, "2");
4618
4619 let req = make_request("ListArchives", json!({ "Limit": 2, "NextToken": token }));
4621 let resp = svc.list_archives(&req).unwrap();
4622 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4623 assert_eq!(body["Archives"].as_array().unwrap().len(), 2);
4624 let token = body["NextToken"].as_str().unwrap();
4625 assert_eq!(token, "4");
4626
4627 let req = make_request("ListArchives", json!({ "Limit": 2, "NextToken": token }));
4629 let resp = svc.list_archives(&req).unwrap();
4630 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4631 assert_eq!(body["Archives"].as_array().unwrap().len(), 1);
4632 assert!(body["NextToken"].is_null());
4633 }
4634
4635 fn create_replay(svc: &EventBridgeService, name: &str) {
4638 let archive_arn = {
4640 let state = svc.state.read();
4641 if state.archives.contains_key("replay-archive") {
4642 state.archives["replay-archive"].arn.clone()
4643 } else {
4644 drop(state);
4645 create_archive(svc, "replay-archive");
4646 svc.state.read().archives["replay-archive"].arn.clone()
4647 }
4648 };
4649 let req = make_request(
4650 "StartReplay",
4651 json!({
4652 "ReplayName": name,
4653 "EventSourceArn": archive_arn,
4654 "EventStartTime": 1000000.0,
4655 "EventEndTime": 2000000.0,
4656 "Destination": {
4657 "Arn": "arn:aws:events:us-east-1:123456789012:event-bus/default"
4658 }
4659 }),
4660 );
4661 svc.start_replay(&req).unwrap();
4662 }
4663
4664 #[test]
4665 fn list_replays_pagination() {
4666 let svc = make_service();
4667 for i in 0..5 {
4668 create_replay(&svc, &format!("replay-{i:02}"));
4669 }
4670
4671 let req = make_request("ListReplays", json!({ "Limit": 2 }));
4673 let resp = svc.list_replays(&req).unwrap();
4674 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4675 assert_eq!(body["Replays"].as_array().unwrap().len(), 2);
4676 let token = body["NextToken"].as_str().unwrap();
4677 assert_eq!(token, "2");
4678
4679 let req = make_request("ListReplays", json!({ "Limit": 2, "NextToken": token }));
4681 let resp = svc.list_replays(&req).unwrap();
4682 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4683 assert_eq!(body["Replays"].as_array().unwrap().len(), 2);
4684 let token = body["NextToken"].as_str().unwrap();
4685 assert_eq!(token, "4");
4686
4687 let req = make_request("ListReplays", json!({ "Limit": 2, "NextToken": token }));
4689 let resp = svc.list_replays(&req).unwrap();
4690 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4691 assert_eq!(body["Replays"].as_array().unwrap().len(), 1);
4692 assert!(body["NextToken"].is_null());
4693 }
4694
4695 #[test]
4696 fn list_event_buses_invalid_next_token_returns_error() {
4697 let svc = make_service();
4698
4699 let req = make_request("ListEventBuses", json!({ "NextToken": "not-a-number" }));
4700 let result = svc.list_event_buses(&req);
4701 assert!(
4702 result.is_err(),
4703 "non-numeric NextToken should return an error"
4704 );
4705 }
4706
4707 #[test]
4710 fn test_event_pattern_match() {
4711 let svc = make_service();
4712 let req = make_request(
4713 "TestEventPattern",
4714 json!({
4715 "EventPattern": r#"{"source": ["my.app"]}"#,
4716 "Event": r#"{"source": "my.app", "detail-type": "Test", "detail": {}}"#
4717 }),
4718 );
4719 let resp = svc.test_event_pattern(&req).unwrap();
4720 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4721 assert_eq!(body["Result"], true);
4722 }
4723
4724 #[test]
4725 fn test_event_pattern_no_match() {
4726 let svc = make_service();
4727 let req = make_request(
4728 "TestEventPattern",
4729 json!({
4730 "EventPattern": r#"{"source": ["other.app"]}"#,
4731 "Event": r#"{"source": "my.app", "detail-type": "Test", "detail": {}}"#
4732 }),
4733 );
4734 let resp = svc.test_event_pattern(&req).unwrap();
4735 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4736 assert_eq!(body["Result"], false);
4737 }
4738
4739 #[test]
4740 fn test_event_pattern_detail_match() {
4741 let svc = make_service();
4742 let req = make_request(
4743 "TestEventPattern",
4744 json!({
4745 "EventPattern": r#"{"detail": {"status": ["PLACED"]}}"#,
4746 "Event": r#"{"source": "my.app", "detail-type": "Order", "detail": {"status": "PLACED", "id": "123"}}"#
4747 }),
4748 );
4749 let resp = svc.test_event_pattern(&req).unwrap();
4750 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4751 assert_eq!(body["Result"], true);
4752 }
4753
4754 #[test]
4757 fn update_event_bus_description() {
4758 let svc = make_service();
4759 create_event_bus(&svc, "my-bus");
4760
4761 let req = make_request(
4762 "UpdateEventBus",
4763 json!({ "Name": "my-bus", "Description": "Updated desc" }),
4764 );
4765 let resp = svc.update_event_bus(&req).unwrap();
4766 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4767 assert_eq!(body["Name"], "my-bus");
4768
4769 let req = make_request("DescribeEventBus", json!({ "Name": "my-bus" }));
4771 let resp = svc.describe_event_bus(&req).unwrap();
4772 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4773 assert_eq!(body["Description"], "Updated desc");
4774 }
4775
4776 #[test]
4777 fn update_event_bus_not_found() {
4778 let svc = make_service();
4779 let req = make_request(
4780 "UpdateEventBus",
4781 json!({ "Name": "ghost-bus", "Description": "nope" }),
4782 );
4783 assert!(svc.update_event_bus(&req).is_err());
4784 }
4785
4786 fn create_endpoint_helper(svc: &EventBridgeService, name: &str) {
4789 let req = make_request(
4790 "CreateEndpoint",
4791 json!({
4792 "Name": name,
4793 "RoutingConfig": {
4794 "FailoverConfig": {
4795 "Primary": { "HealthCheck": "" },
4796 "Secondary": { "Route": "us-west-2" }
4797 }
4798 },
4799 "EventBuses": [
4800 { "EventBusArn": "arn:aws:events:us-east-1:123456789012:event-bus/default" }
4801 ]
4802 }),
4803 );
4804 svc.create_endpoint(&req).unwrap();
4805 }
4806
4807 #[test]
4808 fn endpoint_create_describe_delete() {
4809 let svc = make_service();
4810 create_endpoint_helper(&svc, "my-endpoint");
4811
4812 let req = make_request("DescribeEndpoint", json!({ "Name": "my-endpoint" }));
4814 let resp = svc.describe_endpoint(&req).unwrap();
4815 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4816 assert_eq!(body["Name"], "my-endpoint");
4817 assert_eq!(body["State"], "ACTIVE");
4818 assert!(body["EndpointId"].as_str().unwrap().contains("my-endpoint"));
4819
4820 let req = make_request("DeleteEndpoint", json!({ "Name": "my-endpoint" }));
4822 svc.delete_endpoint(&req).unwrap();
4823
4824 let req = make_request("DescribeEndpoint", json!({ "Name": "my-endpoint" }));
4826 assert!(svc.describe_endpoint(&req).is_err());
4827 }
4828
4829 #[test]
4830 fn endpoint_list_and_update() {
4831 let svc = make_service();
4832 create_endpoint_helper(&svc, "ep-alpha");
4833 create_endpoint_helper(&svc, "ep-beta");
4834
4835 let req = make_request("ListEndpoints", json!({}));
4837 let resp = svc.list_endpoints(&req).unwrap();
4838 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4839 assert_eq!(body["Endpoints"].as_array().unwrap().len(), 2);
4840
4841 let req = make_request(
4843 "UpdateEndpoint",
4844 json!({ "Name": "ep-alpha", "Description": "updated" }),
4845 );
4846 let resp = svc.update_endpoint(&req).unwrap();
4847 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4848 assert_eq!(body["Name"], "ep-alpha");
4849
4850 let req = make_request("DescribeEndpoint", json!({ "Name": "ep-alpha" }));
4852 let resp = svc.describe_endpoint(&req).unwrap();
4853 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4854 assert_eq!(body["Description"], "updated");
4855 }
4856
4857 #[test]
4858 fn endpoint_duplicate_fails() {
4859 let svc = make_service();
4860 create_endpoint_helper(&svc, "dup-ep");
4861 let req = make_request(
4862 "CreateEndpoint",
4863 json!({
4864 "Name": "dup-ep",
4865 "RoutingConfig": {},
4866 "EventBuses": []
4867 }),
4868 );
4869 assert!(svc.create_endpoint(&req).is_err());
4870 }
4871
4872 #[test]
4875 fn deauthorize_connection_sets_state() {
4876 let svc = make_service();
4877 create_connection(&svc, "deauth-conn");
4878
4879 let req = make_request("DeauthorizeConnection", json!({ "Name": "deauth-conn" }));
4880 let resp = svc.deauthorize_connection(&req).unwrap();
4881 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4882 assert_eq!(body["ConnectionState"], "DEAUTHORIZING");
4883 assert!(body["ConnectionArn"]
4884 .as_str()
4885 .unwrap()
4886 .contains("deauth-conn"));
4887
4888 let req = make_request("DescribeConnection", json!({ "Name": "deauth-conn" }));
4890 let resp = svc.describe_connection(&req).unwrap();
4891 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4892 assert_eq!(body["ConnectionState"], "DEAUTHORIZING");
4893 }
4894
4895 #[test]
4896 fn deauthorize_connection_not_found() {
4897 let svc = make_service();
4898 let req = make_request("DeauthorizeConnection", json!({ "Name": "ghost-conn" }));
4899 assert!(svc.deauthorize_connection(&req).is_err());
4900 }
4901
4902 #[test]
4905 fn partner_event_source_crud() {
4906 let svc = make_service();
4907
4908 let req = make_request(
4910 "CreatePartnerEventSource",
4911 json!({ "Name": "partner/test", "Account": "123456789012" }),
4912 );
4913 svc.create_partner_event_source(&req).unwrap();
4914
4915 let req = make_request(
4917 "DescribePartnerEventSource",
4918 json!({ "Name": "partner/test" }),
4919 );
4920 let resp = svc.describe_partner_event_source(&req).unwrap();
4921 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4922 assert_eq!(body["Name"], "partner/test");
4923
4924 let req = make_request("ListPartnerEventSources", json!({"NamePrefix": "partner/"}));
4926 let resp = svc.list_partner_event_sources(&req).unwrap();
4927 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4928 assert_eq!(body["PartnerEventSources"].as_array().unwrap().len(), 1);
4929
4930 let req = make_request(
4932 "ListPartnerEventSourceAccounts",
4933 json!({ "EventSourceName": "partner/test" }),
4934 );
4935 let resp = svc.list_partner_event_source_accounts(&req).unwrap();
4936 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4937 assert_eq!(
4938 body["PartnerEventSourceAccounts"].as_array().unwrap().len(),
4939 1
4940 );
4941
4942 let req = make_request("DescribeEventSource", json!({ "Name": "partner/test" }));
4944 let resp = svc.describe_event_source(&req).unwrap();
4945 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4946 assert_eq!(body["Name"], "partner/test");
4947 assert_eq!(body["State"], "ACTIVE");
4948
4949 let req = make_request("ListEventSources", json!({}));
4951 let resp = svc.list_event_sources(&req).unwrap();
4952 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4953 assert_eq!(body["EventSources"].as_array().unwrap().len(), 1);
4954
4955 let req = make_request(
4957 "DeletePartnerEventSource",
4958 json!({ "Name": "partner/test", "Account": "123456789012" }),
4959 );
4960 svc.delete_partner_event_source(&req).unwrap();
4961
4962 let req = make_request(
4964 "DescribePartnerEventSource",
4965 json!({ "Name": "partner/test" }),
4966 );
4967 assert!(svc.describe_partner_event_source(&req).is_err());
4968 }
4969
4970 #[test]
4971 fn activate_deactivate_event_source() {
4972 let svc = make_service();
4973
4974 let req = make_request(
4976 "CreatePartnerEventSource",
4977 json!({ "Name": "aws.partner/test", "Account": "123456789012" }),
4978 );
4979 svc.create_partner_event_source(&req).unwrap();
4980
4981 let req = make_request(
4983 "DeactivateEventSource",
4984 json!({ "Name": "aws.partner/test" }),
4985 );
4986 svc.deactivate_event_source(&req).unwrap();
4987 {
4988 let state = svc.state.read();
4989 assert_eq!(
4990 state.partner_event_sources["aws.partner/test"].state,
4991 "INACTIVE"
4992 );
4993 }
4994
4995 let req = make_request("ActivateEventSource", json!({ "Name": "aws.partner/test" }));
4997 svc.activate_event_source(&req).unwrap();
4998 {
4999 let state = svc.state.read();
5000 assert_eq!(
5001 state.partner_event_sources["aws.partner/test"].state,
5002 "ACTIVE"
5003 );
5004 }
5005
5006 let req = make_request("ActivateEventSource", json!({ "Name": "nonexistent" }));
5008 assert!(svc.activate_event_source(&req).is_err());
5009
5010 let req = make_request("DeactivateEventSource", json!({ "Name": "nonexistent" }));
5011 assert!(svc.deactivate_event_source(&req).is_err());
5012 }
5013
5014 #[test]
5015 fn delete_partner_event_source_verifies_account() {
5016 let svc = make_service();
5017
5018 let req = make_request(
5020 "CreatePartnerEventSource",
5021 json!({ "Name": "aws.partner/test", "Account": "123456789012" }),
5022 );
5023 svc.create_partner_event_source(&req).unwrap();
5024
5025 let req = make_request(
5027 "DeletePartnerEventSource",
5028 json!({ "Name": "aws.partner/test", "Account": "999999999999" }),
5029 );
5030 assert!(svc.delete_partner_event_source(&req).is_err());
5031 assert!(svc
5033 .state
5034 .read()
5035 .partner_event_sources
5036 .contains_key("aws.partner/test"));
5037
5038 let req = make_request(
5040 "DeletePartnerEventSource",
5041 json!({ "Name": "aws.partner/test", "Account": "123456789012" }),
5042 );
5043 svc.delete_partner_event_source(&req).unwrap();
5044 assert!(!svc
5045 .state
5046 .read()
5047 .partner_event_sources
5048 .contains_key("aws.partner/test"));
5049
5050 let req = make_request(
5052 "DeletePartnerEventSource",
5053 json!({ "Name": "aws.partner/test", "Account": "123456789012" }),
5054 );
5055 assert!(svc.delete_partner_event_source(&req).is_err());
5056 }
5057
5058 #[test]
5059 fn put_partner_events() {
5060 let svc = make_service();
5061 let req = make_request(
5062 "PutPartnerEvents",
5063 json!({
5064 "Entries": [
5065 { "Source": "partner.app", "DetailType": "Test", "Detail": "{}" }
5066 ]
5067 }),
5068 );
5069 let resp = svc.put_partner_events(&req).unwrap();
5070 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5071 assert_eq!(body["FailedEntryCount"], 0);
5072 assert_eq!(body["Entries"].as_array().unwrap().len(), 1);
5073 assert!(body["Entries"][0]["EventId"].as_str().is_some());
5074 }
5075
5076 #[allow(clippy::type_complexity)]
5080 fn make_service_with_sqs_recorder() -> (
5081 EventBridgeService,
5082 Arc<parking_lot::Mutex<Vec<(String, String)>>>,
5083 ) {
5084 use fakecloud_core::delivery::SqsDelivery;
5085
5086 struct RecordingSqsDelivery {
5087 messages: Arc<parking_lot::Mutex<Vec<(String, String)>>>,
5088 }
5089
5090 impl SqsDelivery for RecordingSqsDelivery {
5091 fn deliver_to_queue(
5092 &self,
5093 queue_arn: &str,
5094 message_body: &str,
5095 _attributes: &HashMap<String, String>,
5096 ) {
5097 self.messages
5098 .lock()
5099 .push((queue_arn.to_string(), message_body.to_string()));
5100 }
5101 }
5102
5103 let messages: Arc<parking_lot::Mutex<Vec<(String, String)>>> =
5104 Arc::new(parking_lot::Mutex::new(Vec::new()));
5105 let state = Arc::new(RwLock::new(EventBridgeState::new(
5106 "123456789012",
5107 "us-east-1",
5108 )));
5109 let delivery = Arc::new(DeliveryBus::new().with_sqs(Arc::new(RecordingSqsDelivery {
5110 messages: messages.clone(),
5111 })));
5112 let svc = EventBridgeService::new(state, delivery);
5113 (svc, messages)
5114 }
5115
5116 #[test]
5117 fn start_replay_delivers_archived_events_to_sqs_target() {
5118 let (svc, messages) = make_service_with_sqs_recorder();
5119 let queue_arn = "arn:aws:sqs:us-east-1:123456789012:replay-queue";
5120
5121 let req = make_request(
5123 "PutRule",
5124 json!({
5125 "Name": "replay-test-rule",
5126 "EventPattern": r#"{"source": ["my.app"]}"#,
5127 "State": "ENABLED"
5128 }),
5129 );
5130 svc.put_rule(&req).unwrap();
5131
5132 let req = make_request(
5133 "PutTargets",
5134 json!({
5135 "Rule": "replay-test-rule",
5136 "Targets": [{
5137 "Id": "sqs-target",
5138 "Arn": queue_arn
5139 }]
5140 }),
5141 );
5142 svc.put_targets(&req).unwrap();
5143
5144 let req = make_request(
5146 "CreateArchive",
5147 json!({
5148 "ArchiveName": "test-archive",
5149 "EventSourceArn": "arn:aws:events:us-east-1:123456789012:event-bus/default"
5150 }),
5151 );
5152 svc.create_archive(&req).unwrap();
5153
5154 let req = make_request(
5156 "PutEvents",
5157 json!({
5158 "Entries": [
5159 {
5160 "Source": "my.app",
5161 "DetailType": "OrderCreated",
5162 "Detail": "{\"orderId\": \"1\"}",
5163 "EventBusName": "default"
5164 },
5165 {
5166 "Source": "my.app",
5167 "DetailType": "OrderShipped",
5168 "Detail": "{\"orderId\": \"2\"}",
5169 "EventBusName": "default"
5170 }
5171 ]
5172 }),
5173 );
5174 let resp = svc.put_events(&req).unwrap();
5175 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5176 assert_eq!(body["FailedEntryCount"], 0);
5177
5178 {
5180 let state = svc.state.read();
5181 let archive = state.archives.get("test-archive").unwrap();
5182 assert_eq!(archive.events.len(), 2);
5183 assert_eq!(archive.event_count, 2);
5184 }
5185
5186 messages.lock().clear();
5188
5189 let archive_arn = {
5191 let state = svc.state.read();
5192 state.archives.get("test-archive").unwrap().arn.clone()
5193 };
5194
5195 let start_ts = 0.0_f64;
5197 let end_ts = (chrono::Utc::now().timestamp() + 3600) as f64;
5198
5199 let req = make_request(
5200 "StartReplay",
5201 json!({
5202 "ReplayName": "my-replay",
5203 "EventSourceArn": archive_arn,
5204 "Destination": {
5205 "Arn": "arn:aws:events:us-east-1:123456789012:event-bus/default"
5206 },
5207 "EventStartTime": start_ts,
5208 "EventEndTime": end_ts
5209 }),
5210 );
5211 let resp = svc.start_replay(&req).unwrap();
5212 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5213 assert_eq!(body["State"], "STARTING");
5214
5215 let delivered = messages.lock();
5217 assert_eq!(
5218 delivered.len(),
5219 2,
5220 "expected 2 replayed events delivered to SQS"
5221 );
5222 for (arn, msg) in delivered.iter() {
5223 assert_eq!(arn, queue_arn);
5224 let event: Value = serde_json::from_str(msg).unwrap();
5225 assert_eq!(event["source"], "my.app");
5226 assert!(event["replay-name"].as_str().is_some());
5228 }
5229
5230 let state = svc.state.read();
5232 let replay = state.replays.get("my-replay").unwrap();
5233 assert_eq!(replay.state, "COMPLETED");
5234 }
5235
5236 #[test]
5237 fn apply_connection_auth_api_key() {
5238 let conn = Connection {
5239 name: "test-conn".to_string(),
5240 arn: "arn:aws:events:us-east-1:123456789012:connection/test-conn/uuid".to_string(),
5241 description: None,
5242 authorization_type: "API_KEY".to_string(),
5243 auth_parameters: json!({
5244 "ApiKeyAuthParameters": {
5245 "ApiKeyName": "x-api-key",
5246 "ApiKeyValue": "my-secret"
5247 }
5248 }),
5249 connection_state: "AUTHORIZED".to_string(),
5250 secret_arn: "arn:aws:secretsmanager:us-east-1:123456789012:secret:test".to_string(),
5251 creation_time: Utc::now(),
5252 last_modified_time: Utc::now(),
5253 last_authorized_time: Utc::now(),
5254 };
5255
5256 let client = reqwest::Client::new();
5257 let builder = client
5258 .post("http://localhost:12345/test")
5259 .header("Content-Type", "application/json");
5260 let builder = apply_connection_auth(builder, &conn);
5261
5262 let request = builder.body("{}").build().unwrap();
5264 assert_eq!(
5265 request
5266 .headers()
5267 .get("x-api-key")
5268 .unwrap()
5269 .to_str()
5270 .unwrap(),
5271 "my-secret"
5272 );
5273 }
5274
5275 #[test]
5276 fn apply_connection_auth_basic() {
5277 let conn = Connection {
5278 name: "basic-conn".to_string(),
5279 arn: "arn:aws:events:us-east-1:123456789012:connection/basic-conn/uuid".to_string(),
5280 description: None,
5281 authorization_type: "BASIC".to_string(),
5282 auth_parameters: json!({
5283 "BasicAuthParameters": {
5284 "Username": "user",
5285 "Password": "pass"
5286 }
5287 }),
5288 connection_state: "AUTHORIZED".to_string(),
5289 secret_arn: "arn:aws:secretsmanager:us-east-1:123456789012:secret:test".to_string(),
5290 creation_time: Utc::now(),
5291 last_modified_time: Utc::now(),
5292 last_authorized_time: Utc::now(),
5293 };
5294
5295 let client = reqwest::Client::new();
5296 let builder = client.post("http://localhost:12345/test");
5297 let builder = apply_connection_auth(builder, &conn);
5298
5299 let request = builder.body("{}").build().unwrap();
5300 let auth_header = request
5301 .headers()
5302 .get("authorization")
5303 .unwrap()
5304 .to_str()
5305 .unwrap();
5306 assert!(
5307 auth_header.starts_with("Basic "),
5308 "Expected Basic auth header, got: {auth_header}"
5309 );
5310 }
5311
5312 #[tokio::test]
5313 async fn put_events_with_api_destination_target_resolves_destination() {
5314 let state = Arc::new(RwLock::new(EventBridgeState::new(
5318 "123456789012",
5319 "us-east-1",
5320 )));
5321 let delivery = Arc::new(DeliveryBus::new());
5322 let svc = EventBridgeService::new(state, delivery);
5323
5324 create_connection(&svc, "my-conn");
5326 let conn_arn = {
5327 let state = svc.state.read();
5328 state.connections.get("my-conn").unwrap().arn.clone()
5329 };
5330 let req = make_request(
5331 "CreateApiDestination",
5332 json!({
5333 "Name": "my-dest",
5334 "ConnectionArn": conn_arn,
5335 "InvocationEndpoint": "http://127.0.0.1:1/noop",
5336 "HttpMethod": "POST"
5337 }),
5338 );
5339 svc.create_api_destination(&req).unwrap();
5340
5341 let dest_arn = {
5342 let state = svc.state.read();
5343 state.api_destinations.get("my-dest").unwrap().arn.clone()
5344 };
5345
5346 let req = make_request(
5348 "PutRule",
5349 json!({
5350 "Name": "api-dest-rule",
5351 "EventPattern": r#"{"source":["test.app"]}"#,
5352 "State": "ENABLED"
5353 }),
5354 );
5355 svc.put_rule(&req).unwrap();
5356
5357 let req = make_request(
5358 "PutTargets",
5359 json!({
5360 "Rule": "api-dest-rule",
5361 "Targets": [{ "Id": "dest-target", "Arn": dest_arn }]
5362 }),
5363 );
5364 svc.put_targets(&req).unwrap();
5365
5366 let req = make_request(
5368 "PutEvents",
5369 json!({
5370 "Entries": [{
5371 "Source": "test.app",
5372 "DetailType": "TestEvent",
5373 "Detail": r#"{"key":"value"}"#
5374 }]
5375 }),
5376 );
5377 let resp = svc.put_events(&req).unwrap();
5378 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5379 assert_eq!(body["FailedEntryCount"], 0);
5380 assert_eq!(body["Entries"].as_array().unwrap().len(), 1);
5381 assert!(body["Entries"][0]["EventId"].as_str().is_some());
5382 }
5383
5384 #[test]
5385 fn test_function_name_from_arn() {
5386 assert_eq!(
5388 super::function_name_from_arn("arn:aws:lambda:us-east-1:123456789012:function:my-func"),
5389 "my-func"
5390 );
5391 assert_eq!(
5393 super::function_name_from_arn(
5394 "arn:aws:lambda:us-east-1:123456789012:function:my-func:prod"
5395 ),
5396 "my-func"
5397 );
5398 assert_eq!(
5400 super::function_name_from_arn(
5401 "arn:aws:lambda:us-east-1:123456789012:function:my-func:42"
5402 ),
5403 "my-func"
5404 );
5405 assert_eq!(super::function_name_from_arn("my-func"), "my-func");
5407 }
5408}