1use async_trait::async_trait;
2use chrono::{DateTime, Utc};
3use http::StatusCode;
4use serde_json::{json, Value};
5
6use std::collections::HashMap;
7use std::sync::Arc;
8
9use fakecloud_aws::arn::Arn;
10use fakecloud_core::delivery::DeliveryBus;
11use fakecloud_core::pagination::paginate;
12use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
13use fakecloud_core::validation::*;
14
15use fakecloud_lambda::runtime::ContainerRuntime;
16use fakecloud_lambda::state::{LambdaInvocation, SharedLambdaState};
17use fakecloud_logs::state::SharedLogsState;
18
19use crate::state::{
20 ApiDestination, Archive, Connection, Endpoint, EventBus, EventRule, EventTarget,
21 PartnerEventSource, PutEvent, Replay, SharedEventBridgeState,
22};
23
24pub struct EventBridgeService {
25 state: SharedEventBridgeState,
26 delivery: Arc<DeliveryBus>,
27 lambda_state: Option<SharedLambdaState>,
28 logs_state: Option<SharedLogsState>,
29 container_runtime: Option<Arc<ContainerRuntime>>,
30}
31
32impl EventBridgeService {
33 pub fn new(state: SharedEventBridgeState, delivery: Arc<DeliveryBus>) -> Self {
34 Self {
35 state,
36 delivery,
37 lambda_state: None,
38 logs_state: None,
39 container_runtime: None,
40 }
41 }
42
43 pub fn with_lambda(mut self, lambda_state: SharedLambdaState) -> Self {
44 self.lambda_state = Some(lambda_state);
45 self
46 }
47
48 pub fn with_logs(mut self, logs_state: SharedLogsState) -> Self {
49 self.logs_state = Some(logs_state);
50 self
51 }
52
53 pub fn with_runtime(mut self, runtime: Arc<ContainerRuntime>) -> Self {
54 self.container_runtime = Some(runtime);
55 self
56 }
57}
58
59#[async_trait]
60impl AwsService for EventBridgeService {
61 fn service_name(&self) -> &str {
62 "events"
63 }
64
65 async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
66 match req.action.as_str() {
67 "CreateEventBus" => self.create_event_bus(&req),
68 "DeleteEventBus" => self.delete_event_bus(&req),
69 "ListEventBuses" => self.list_event_buses(&req),
70 "DescribeEventBus" => self.describe_event_bus(&req),
71 "PutRule" => self.put_rule(&req),
72 "DeleteRule" => self.delete_rule(&req),
73 "ListRules" => self.list_rules(&req),
74 "DescribeRule" => self.describe_rule(&req),
75 "EnableRule" => self.enable_rule(&req),
76 "DisableRule" => self.disable_rule(&req),
77 "PutTargets" => self.put_targets(&req),
78 "RemoveTargets" => self.remove_targets(&req),
79 "ListTargetsByRule" => self.list_targets_by_rule(&req),
80 "ListRuleNamesByTarget" => self.list_rule_names_by_target(&req),
81 "PutEvents" => self.put_events(&req),
82 "PutPermission" => self.put_permission(&req),
83 "RemovePermission" => self.remove_permission(&req),
84 "TagResource" => self.tag_resource(&req),
85 "UntagResource" => self.untag_resource(&req),
86 "ListTagsForResource" => self.list_tags_for_resource(&req),
87 "CreateArchive" => self.create_archive(&req),
88 "DescribeArchive" => self.describe_archive(&req),
89 "ListArchives" => self.list_archives(&req),
90 "UpdateArchive" => self.update_archive(&req),
91 "DeleteArchive" => self.delete_archive(&req),
92 "CreateConnection" => self.create_connection(&req),
93 "DescribeConnection" => self.describe_connection(&req),
94 "ListConnections" => self.list_connections(&req),
95 "UpdateConnection" => self.update_connection(&req),
96 "DeleteConnection" => self.delete_connection(&req),
97 "CreateApiDestination" => self.create_api_destination(&req),
98 "DescribeApiDestination" => self.describe_api_destination(&req),
99 "ListApiDestinations" => self.list_api_destinations(&req),
100 "UpdateApiDestination" => self.update_api_destination(&req),
101 "DeleteApiDestination" => self.delete_api_destination(&req),
102 "StartReplay" => self.start_replay(&req),
103 "DescribeReplay" => self.describe_replay(&req),
104 "ListReplays" => self.list_replays(&req),
105 "CancelReplay" => self.cancel_replay(&req),
106 "CreatePartnerEventSource" => self.create_partner_event_source(&req),
107 "DeletePartnerEventSource" => self.delete_partner_event_source(&req),
108 "DescribePartnerEventSource" => self.describe_partner_event_source(&req),
109 "ListPartnerEventSources" => self.list_partner_event_sources(&req),
110 "ListPartnerEventSourceAccounts" => self.list_partner_event_source_accounts(&req),
111 "ActivateEventSource" => self.activate_event_source(&req),
112 "DeactivateEventSource" => self.deactivate_event_source(&req),
113 "DescribeEventSource" => self.describe_event_source(&req),
114 "ListEventSources" => self.list_event_sources(&req),
115 "PutPartnerEvents" => self.put_partner_events(&req),
116 "TestEventPattern" => self.test_event_pattern(&req),
117 "UpdateEventBus" => self.update_event_bus(&req),
118 "CreateEndpoint" => self.create_endpoint(&req),
119 "DeleteEndpoint" => self.delete_endpoint(&req),
120 "DescribeEndpoint" => self.describe_endpoint(&req),
121 "ListEndpoints" => self.list_endpoints(&req),
122 "UpdateEndpoint" => self.update_endpoint(&req),
123 "DeauthorizeConnection" => self.deauthorize_connection(&req),
124 _ => Err(AwsServiceError::action_not_implemented(
125 "events",
126 &req.action,
127 )),
128 }
129 }
130
131 fn supported_actions(&self) -> &[&str] {
132 &[
133 "CreateEventBus",
134 "DeleteEventBus",
135 "ListEventBuses",
136 "DescribeEventBus",
137 "PutRule",
138 "DeleteRule",
139 "ListRules",
140 "DescribeRule",
141 "EnableRule",
142 "DisableRule",
143 "PutTargets",
144 "RemoveTargets",
145 "ListTargetsByRule",
146 "ListRuleNamesByTarget",
147 "PutEvents",
148 "PutPermission",
149 "RemovePermission",
150 "TagResource",
151 "UntagResource",
152 "ListTagsForResource",
153 "CreateArchive",
154 "DescribeArchive",
155 "ListArchives",
156 "UpdateArchive",
157 "DeleteArchive",
158 "CreateConnection",
159 "DescribeConnection",
160 "ListConnections",
161 "UpdateConnection",
162 "DeleteConnection",
163 "CreateApiDestination",
164 "DescribeApiDestination",
165 "ListApiDestinations",
166 "UpdateApiDestination",
167 "DeleteApiDestination",
168 "StartReplay",
169 "DescribeReplay",
170 "ListReplays",
171 "CancelReplay",
172 "CreatePartnerEventSource",
173 "DeletePartnerEventSource",
174 "DescribePartnerEventSource",
175 "ListPartnerEventSources",
176 "ListPartnerEventSourceAccounts",
177 "ActivateEventSource",
178 "DeactivateEventSource",
179 "DescribeEventSource",
180 "ListEventSources",
181 "PutPartnerEvents",
182 "TestEventPattern",
183 "UpdateEventBus",
184 "CreateEndpoint",
185 "DeleteEndpoint",
186 "DescribeEndpoint",
187 "ListEndpoints",
188 "UpdateEndpoint",
189 "DeauthorizeConnection",
190 ]
191 }
192}
193
194fn parse_tags(body: &Value) -> HashMap<String, String> {
195 let mut tags = HashMap::new();
196 if let Some(arr) = body["Tags"].as_array() {
197 for tag in arr {
198 if let (Some(key), Some(val)) = (tag["Key"].as_str(), tag["Value"].as_str()) {
199 tags.insert(key.to_string(), val.to_string());
200 }
201 }
202 }
203 tags
204}
205
206fn parse_target(target: &Value) -> EventTarget {
207 EventTarget {
208 id: target["Id"].as_str().unwrap_or("").to_string(),
209 arn: target["Arn"].as_str().unwrap_or("").to_string(),
210 input: target["Input"].as_str().map(|s| s.to_string()),
211 input_path: target["InputPath"].as_str().map(|s| s.to_string()),
212 input_transformer: target.get("InputTransformer").cloned(),
213 sqs_parameters: target.get("SqsParameters").cloned(),
214 }
215}
216
217fn target_to_json(t: &EventTarget) -> Value {
218 let mut obj = json!({ "Id": t.id, "Arn": t.arn });
219 if let Some(ref input) = t.input {
220 obj["Input"] = json!(input);
221 }
222 if let Some(ref input_path) = t.input_path {
223 obj["InputPath"] = json!(input_path);
224 }
225 if let Some(ref it) = t.input_transformer {
226 obj["InputTransformer"] = it.clone();
227 }
228 if let Some(ref sp) = t.sqs_parameters {
229 obj["SqsParameters"] = sp.clone();
230 }
231 obj
232}
233
234impl EventBridgeService {
236 fn create_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
237 let body = req.json_body();
238 validate_required("Name", &body["Name"])?;
239 let name = body["Name"]
240 .as_str()
241 .ok_or_else(|| missing("Name"))?
242 .to_string();
243 validate_string_length("name", &name, 1, 256)?;
244 validate_optional_string_length(
245 "eventSourceName",
246 body["EventSourceName"].as_str(),
247 1,
248 256,
249 )?;
250 validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
251 validate_optional_string_length(
252 "kmsKeyIdentifier",
253 body["KmsKeyIdentifier"].as_str(),
254 0,
255 2048,
256 )?;
257
258 if name.contains('/') && !name.starts_with("aws.partner/") {
260 return Err(AwsServiceError::aws_error(
261 StatusCode::BAD_REQUEST,
262 "ValidationException",
263 "Event bus name must not contain '/'.",
264 ));
265 }
266
267 if name.starts_with("aws.partner/") {
269 let event_source = body["EventSourceName"].as_str().unwrap_or("");
270 let state_r = self.state.read();
271 let has_source = state_r.partner_event_sources.contains_key(event_source);
272 drop(state_r);
273 if !has_source {
274 return Err(AwsServiceError::aws_error(
275 StatusCode::BAD_REQUEST,
276 "ResourceNotFoundException",
277 format!("Event source {event_source} does not exist."),
278 ));
279 }
280 }
281
282 let mut state = self.state.write();
283
284 if state.buses.contains_key(&name) {
285 return Err(AwsServiceError::aws_error(
286 StatusCode::BAD_REQUEST,
287 "ResourceAlreadyExistsException",
288 format!("Event bus {name} already exists."),
289 ));
290 }
291
292 let arn = format!(
293 "arn:aws:events:{}:{}:event-bus/{}",
294 req.region, state.account_id, name
295 );
296 let now = Utc::now();
297 let description = body["Description"].as_str().map(|s| s.to_string());
298 let kms_key_identifier = body["KmsKeyIdentifier"].as_str().map(|s| s.to_string());
299 let dead_letter_config = body.get("DeadLetterConfig").cloned();
300
301 let tags = parse_tags(&body);
302
303 let bus = EventBus {
304 name: name.clone(),
305 arn: arn.clone(),
306 tags,
307 policy: None,
308 description,
309 kms_key_identifier,
310 dead_letter_config,
311 creation_time: now,
312 last_modified_time: now,
313 };
314 state.buses.insert(name, bus);
315
316 Ok(AwsResponse::ok_json(json!({ "EventBusArn": arn })))
317 }
318
319 fn delete_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
320 let body = req.json_body();
321 validate_required("Name", &body["Name"])?;
322 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
323 validate_string_length("name", name, 1, 256)?;
324
325 if name == "default" {
326 return Err(AwsServiceError::aws_error(
327 StatusCode::BAD_REQUEST,
328 "ValidationException",
329 format!("Cannot delete event bus {name}."),
330 ));
331 }
332
333 let mut state = self.state.write();
334 state.buses.remove(name);
335 state.rules.retain(|k, _| k.0 != name);
336
337 Ok(AwsResponse::ok_json(json!({})))
338 }
339
340 fn list_event_buses(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
341 let body = req.json_body();
342 validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 256)?;
343 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
344 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
345 let name_prefix = body["NamePrefix"].as_str();
346 let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
347 if let Some(t) = body["NextToken"].as_str() {
348 t.parse::<usize>().map_err(|_| {
349 AwsServiceError::aws_error(
350 StatusCode::BAD_REQUEST,
351 "InvalidNextTokenException",
352 format!("Invalid NextToken value: '{t}'"),
353 )
354 })?;
355 }
356
357 let state = self.state.read();
358 let filtered: Vec<&_> = state
359 .buses
360 .values()
361 .filter(|b| match name_prefix {
362 Some(prefix) => b.name.starts_with(prefix),
363 None => true,
364 })
365 .collect();
366
367 let (page, next_token) = paginate(&filtered, body["NextToken"].as_str(), limit);
368 let buses: Vec<Value> = page
369 .iter()
370 .map(|b| json!({ "Name": b.name, "Arn": b.arn }))
371 .collect();
372 let mut resp = json!({ "EventBuses": buses });
373 if let Some(token) = next_token {
374 resp["NextToken"] = json!(token);
375 }
376
377 Ok(AwsResponse::ok_json(resp))
378 }
379
380 fn describe_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
381 let body = req.json_body();
382 validate_optional_string_length("name", body["Name"].as_str(), 1, 1600)?;
383 let name = body["Name"].as_str().unwrap_or("default");
384
385 let state = self.state.read();
386 let bus = state.buses.get(name).ok_or_else(|| {
387 AwsServiceError::aws_error(
388 StatusCode::BAD_REQUEST,
389 "ResourceNotFoundException",
390 format!("Event bus {name} does not exist."),
391 )
392 })?;
393
394 let mut resp = json!({
395 "Name": bus.name,
396 "Arn": bus.arn,
397 "CreationTime": bus.creation_time.timestamp() as f64,
398 "LastModifiedTime": bus.last_modified_time.timestamp() as f64,
399 });
400
401 if let Some(ref policy) = bus.policy {
402 resp["Policy"] = Value::String(serde_json::to_string(policy).unwrap());
403 }
404 if let Some(ref desc) = bus.description {
405 resp["Description"] = json!(desc);
406 }
407 if let Some(ref kms) = bus.kms_key_identifier {
408 resp["KmsKeyIdentifier"] = json!(kms);
409 }
410 if let Some(ref dlc) = bus.dead_letter_config {
411 resp["DeadLetterConfig"] = dlc.clone();
412 }
413
414 Ok(AwsResponse::ok_json(resp))
415 }
416
417 fn put_permission(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
420 let body = req.json_body();
421 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 256)?;
422 validate_optional_string_length("action", body["Action"].as_str(), 1, 64)?;
423 validate_optional_string_length("principal", body["Principal"].as_str(), 1, 12)?;
424 validate_optional_string_length("statementId", body["StatementId"].as_str(), 1, 64)?;
425 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
426
427 let mut state = self.state.write();
428
429 let bus = state.buses.get_mut(event_bus_name).ok_or_else(|| {
430 AwsServiceError::aws_error(
431 StatusCode::BAD_REQUEST,
432 "ResourceNotFoundException",
433 format!("Event bus {event_bus_name} does not exist."),
434 )
435 })?;
436
437 if let Some(policy_str) = body["Policy"].as_str() {
439 if let Ok(policy) = serde_json::from_str::<Value>(policy_str) {
440 bus.policy = Some(policy);
441 return Ok(AwsResponse::ok_json(json!({})));
442 }
443 }
444
445 let action = body["Action"].as_str().unwrap_or("");
447 let principal = body["Principal"].as_str().unwrap_or("");
448 let statement_id = body["StatementId"].as_str().unwrap_or("");
449
450 if action != "events:PutEvents" {
452 return Err(AwsServiceError::aws_error(
453 StatusCode::BAD_REQUEST,
454 "ValidationException",
455 "Provided value in parameter 'action' is not supported.",
456 ));
457 }
458
459 let statement = json!({
460 "Sid": statement_id,
461 "Effect": "Allow",
462 "Principal": { "AWS": Arn::global("iam", principal, "root").to_string() },
463 "Action": action,
464 "Resource": bus.arn,
465 });
466
467 let policy = bus.policy.get_or_insert_with(|| {
468 json!({
469 "Version": "2012-10-17",
470 "Statement": [],
471 })
472 });
473
474 if let Some(stmts) = policy["Statement"].as_array_mut() {
475 stmts.push(statement);
476 }
477
478 Ok(AwsResponse::ok_json(json!({})))
479 }
480
481 fn remove_permission(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
482 let body = req.json_body();
483 validate_optional_string_length("statementId", body["StatementId"].as_str(), 1, 64)?;
484 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 256)?;
485 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
486 let statement_id = body["StatementId"].as_str().unwrap_or("");
487 let remove_all = body["RemoveAllPermissions"].as_bool().unwrap_or(false);
488
489 let mut state = self.state.write();
490
491 let bus = state.buses.get_mut(event_bus_name).ok_or_else(|| {
492 AwsServiceError::aws_error(
493 StatusCode::BAD_REQUEST,
494 "ResourceNotFoundException",
495 format!("Event bus {event_bus_name} does not exist."),
496 )
497 })?;
498
499 if remove_all {
500 bus.policy = None;
501 return Ok(AwsResponse::ok_json(json!({})));
502 }
503
504 let policy = bus.policy.as_mut().ok_or_else(|| {
505 AwsServiceError::aws_error(
506 StatusCode::BAD_REQUEST,
507 "ResourceNotFoundException",
508 "EventBus does not have a policy.",
509 )
510 })?;
511
512 if let Some(stmts) = policy["Statement"].as_array_mut() {
513 let before = stmts.len();
514 stmts.retain(|s| s["Sid"].as_str() != Some(statement_id));
515 if stmts.len() == before {
516 return Err(AwsServiceError::aws_error(
517 StatusCode::BAD_REQUEST,
518 "ResourceNotFoundException",
519 "Statement with the provided id does not exist.",
520 ));
521 }
522 if stmts.is_empty() {
523 bus.policy = None;
524 }
525 }
526
527 Ok(AwsResponse::ok_json(json!({})))
528 }
529
530 fn put_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
533 let body = req.json_body();
534 validate_required("Name", &body["Name"])?;
535 let name = body["Name"]
536 .as_str()
537 .ok_or_else(|| missing("Name"))?
538 .to_string();
539 validate_string_length("name", &name, 1, 64)?;
540 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
541 validate_optional_string_length(
542 "scheduleExpression",
543 body["ScheduleExpression"].as_str(),
544 0,
545 256,
546 )?;
547 validate_optional_string_length("eventPattern", body["EventPattern"].as_str(), 0, 4096)?;
548 validate_optional_enum(
549 "state",
550 body["State"].as_str(),
551 &[
552 "ENABLED",
553 "DISABLED",
554 "ENABLED_WITH_ALL_CLOUDTRAIL_MANAGEMENT_EVENTS",
555 ],
556 )?;
557 validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
558 validate_optional_string_length("roleArn", body["RoleArn"].as_str(), 1, 1600)?;
559
560 let raw_bus = body["EventBusName"]
561 .as_str()
562 .unwrap_or("default")
563 .to_string();
564
565 let mut state = self.state.write();
566 let event_bus_name = state.resolve_bus_name(&raw_bus);
567
568 let event_pattern = body["EventPattern"].as_str().and_then(|s| {
569 if s.is_empty() {
570 None
571 } else {
572 Some(s.to_string())
573 }
574 });
575 let schedule_expression = body["ScheduleExpression"].as_str().and_then(|s| {
576 if s.is_empty() {
577 None
578 } else {
579 Some(s.to_string())
580 }
581 });
582 let description = body["Description"].as_str().map(|s| s.to_string());
583 let role_arn = body["RoleArn"].as_str().map(|s| s.to_string());
584 let rule_state = body["State"].as_str().unwrap_or("ENABLED").to_string();
585
586 if schedule_expression.is_some() && event_bus_name != "default" {
588 return Err(AwsServiceError::aws_error(
589 StatusCode::BAD_REQUEST,
590 "ValidationException",
591 "ScheduleExpression is supported only on the default event bus.",
592 ));
593 }
594
595 if !state.buses.contains_key(&event_bus_name) {
596 return Err(AwsServiceError::aws_error(
597 StatusCode::BAD_REQUEST,
598 "ResourceNotFoundException",
599 format!("Event bus {event_bus_name} does not exist."),
600 ));
601 }
602
603 let arn = if event_bus_name == "default" {
604 format!(
605 "arn:aws:events:{}:{}:rule/{}",
606 req.region, state.account_id, name
607 )
608 } else {
609 format!(
610 "arn:aws:events:{}:{}:rule/{}/{}",
611 req.region, state.account_id, event_bus_name, name
612 )
613 };
614
615 let key = (event_bus_name.clone(), name.clone());
616 let targets = state
617 .rules
618 .get(&key)
619 .map(|r| r.targets.clone())
620 .unwrap_or_default();
621
622 let tags = parse_tags(&body);
623
624 let rule = EventRule {
625 name: name.clone(),
626 arn: arn.clone(),
627 event_bus_name,
628 event_pattern,
629 schedule_expression,
630 state: rule_state,
631 description,
632 role_arn,
633 managed_by: None,
634 created_by: None,
635 targets,
636 tags,
637 last_fired: None,
638 };
639
640 state.rules.insert(key, rule);
641 Ok(AwsResponse::ok_json(json!({ "RuleArn": arn })))
642 }
643
644 fn delete_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
645 let body = req.json_body();
646 validate_required("Name", &body["Name"])?;
647 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
648 validate_string_length("name", name, 1, 64)?;
649 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
650 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
651
652 let mut state = self.state.write();
653 let bus_name = state.resolve_bus_name(event_bus_name);
654 let key = (bus_name, name.to_string());
655
656 if let Some(rule) = state.rules.get(&key) {
658 if !rule.targets.is_empty() {
659 return Err(AwsServiceError::aws_error(
660 StatusCode::BAD_REQUEST,
661 "ValidationException",
662 "Rule can't be deleted since it has targets.",
663 ));
664 }
665 }
666
667 state.rules.remove(&key);
668 Ok(AwsResponse::ok_json(json!({})))
669 }
670
671 fn list_rules(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
672 let body = req.json_body();
673 validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 64)?;
674 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
675 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
676 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
677 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
678 let name_prefix = body["NamePrefix"].as_str();
679 let limit = body["Limit"].as_u64().map(|n| n as usize);
680 let next_token = body["NextToken"].as_str();
681
682 let state = self.state.read();
683 let bus_name = state.resolve_bus_name(event_bus_name);
684
685 let mut rules: Vec<&EventRule> = state
686 .rules
687 .values()
688 .filter(|r| r.event_bus_name == bus_name)
689 .filter(|r| match name_prefix {
690 Some(prefix) => r.name.starts_with(prefix),
691 None => true,
692 })
693 .collect();
694 rules.sort_by(|a, b| a.name.cmp(&b.name));
695
696 let start = next_token
698 .and_then(|t| t.parse::<usize>().ok())
699 .unwrap_or(0)
700 .min(rules.len());
701 let rules_slice = &rules[start..];
702
703 let (page, new_next_token) = if let Some(lim) = limit {
704 if rules_slice.len() > lim {
705 (&rules_slice[..lim], Some((start + lim).to_string()))
706 } else {
707 (rules_slice, None)
708 }
709 } else {
710 (rules_slice, None)
711 };
712
713 let rules_json: Vec<Value> = page
714 .iter()
715 .map(|r| {
716 let mut obj = json!({
717 "Name": r.name,
718 "Arn": r.arn,
719 "EventBusName": r.event_bus_name,
720 "State": r.state,
721 });
722 if let Some(ref desc) = r.description {
723 obj["Description"] = json!(desc);
724 }
725 if let Some(ref ep) = r.event_pattern {
726 obj["EventPattern"] = json!(ep);
727 }
728 if let Some(ref se) = r.schedule_expression {
729 obj["ScheduleExpression"] = json!(se);
730 }
731 if let Some(ref mb) = r.managed_by {
732 obj["ManagedBy"] = json!(mb);
733 }
734 obj
735 })
736 .collect();
737
738 let mut resp = json!({ "Rules": rules_json });
739 if let Some(token) = new_next_token {
740 resp["NextToken"] = json!(token);
741 }
742
743 Ok(AwsResponse::ok_json(resp))
744 }
745
746 fn describe_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
747 let body = req.json_body();
748 validate_required("Name", &body["Name"])?;
749 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
750 validate_string_length("name", name, 1, 64)?;
751 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
752 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
753
754 let state = self.state.read();
755 let bus_name = state.resolve_bus_name(event_bus_name);
756 let key = (bus_name.clone(), name.to_string());
757
758 let rule = state.rules.get(&key).ok_or_else(|| {
759 AwsServiceError::aws_error(
760 StatusCode::BAD_REQUEST,
761 "ResourceNotFoundException",
762 format!("Rule {name} does not exist."),
763 )
764 })?;
765
766 let mut resp = json!({
767 "Name": rule.name,
768 "Arn": rule.arn,
769 "EventBusName": rule.event_bus_name,
770 "State": rule.state,
771 });
772
773 if let Some(ref desc) = rule.description {
774 resp["Description"] = json!(desc);
775 }
776 if let Some(ref ep) = rule.event_pattern {
777 resp["EventPattern"] = json!(ep);
778 }
779 if let Some(ref se) = rule.schedule_expression {
780 resp["ScheduleExpression"] = json!(se);
781 }
782 if let Some(ref role) = rule.role_arn {
783 resp["RoleArn"] = json!(role);
784 }
785 if let Some(ref mb) = rule.managed_by {
786 resp["ManagedBy"] = json!(mb);
787 }
788 if let Some(ref cb) = rule.created_by {
789 resp["CreatedBy"] = json!(cb);
790 }
791 if rule.event_bus_name != "default" && rule.created_by.is_none() {
793 resp["CreatedBy"] = json!(state.account_id);
794 }
795
796 Ok(AwsResponse::ok_json(resp))
797 }
798
799 fn enable_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
800 let body = req.json_body();
801 validate_required("Name", &body["Name"])?;
802 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
803 validate_string_length("name", name, 1, 64)?;
804 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
805 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
806
807 let mut state = self.state.write();
808 let bus_name = state.resolve_bus_name(event_bus_name);
809 let key = (bus_name, name.to_string());
810
811 let rule = state.rules.get_mut(&key).ok_or_else(|| {
812 AwsServiceError::aws_error(
813 StatusCode::BAD_REQUEST,
814 "ResourceNotFoundException",
815 format!("Rule {name} does not exist."),
816 )
817 })?;
818
819 rule.state = "ENABLED".to_string();
820 Ok(AwsResponse::ok_json(json!({})))
821 }
822
823 fn disable_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
824 let body = req.json_body();
825 validate_required("Name", &body["Name"])?;
826 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
827 validate_string_length("name", name, 1, 64)?;
828 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
829 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
830
831 let mut state = self.state.write();
832 let bus_name = state.resolve_bus_name(event_bus_name);
833 let key = (bus_name, name.to_string());
834
835 let rule = state.rules.get_mut(&key).ok_or_else(|| {
836 AwsServiceError::aws_error(
837 StatusCode::BAD_REQUEST,
838 "ResourceNotFoundException",
839 format!("Rule {name} does not exist."),
840 )
841 })?;
842
843 rule.state = "DISABLED".to_string();
844 Ok(AwsResponse::ok_json(json!({})))
845 }
846
847 fn put_targets(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
850 let body = req.json_body();
851 validate_required("Rule", &body["Rule"])?;
852 let rule_name = body["Rule"].as_str().ok_or_else(|| missing("Rule"))?;
853 validate_string_length("rule", rule_name, 1, 64)?;
854 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
855 validate_required("Targets", &body["Targets"])?;
856 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
857 let targets = body["Targets"]
858 .as_array()
859 .ok_or_else(|| missing("Targets"))?;
860
861 for target in targets {
863 let target_id = target["Id"].as_str().unwrap_or("");
864 let target_arn = target["Arn"].as_str().unwrap_or("");
865
866 if target_arn.ends_with(".fifo") && target.get("SqsParameters").is_none() {
867 return Err(AwsServiceError::aws_error(
868 StatusCode::BAD_REQUEST,
869 "ValidationException",
870 format!(
871 "Parameter(s) SqsParameters must be specified for target: {target_id}."
872 ),
873 ));
874 }
875
876 if !target_arn.starts_with("arn:") {
878 return Err(AwsServiceError::aws_error(
879 StatusCode::BAD_REQUEST,
880 "ValidationException",
881 format!(
882 "Parameter {target_arn} is not valid. Reason: Provided Arn is not in correct format."
883 ),
884 ));
885 }
886 }
887
888 let mut state = self.state.write();
889 let bus_name = state.resolve_bus_name(event_bus_name);
890 let key = (bus_name.clone(), rule_name.to_string());
891
892 let rule = state.rules.get_mut(&key).ok_or_else(|| {
893 AwsServiceError::aws_error(
894 StatusCode::BAD_REQUEST,
895 "ResourceNotFoundException",
896 format!("Rule {rule_name} does not exist on EventBus {bus_name}."),
897 )
898 })?;
899
900 for target in targets {
901 let et = parse_target(target);
902 rule.targets.retain(|t| t.id != et.id);
904 rule.targets.push(et);
905 }
906
907 Ok(AwsResponse::ok_json(json!({
908 "FailedEntryCount": 0,
909 "FailedEntries": [],
910 })))
911 }
912
913 fn remove_targets(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
914 let body = req.json_body();
915 validate_required("Rule", &body["Rule"])?;
916 let rule_name = body["Rule"].as_str().ok_or_else(|| missing("Rule"))?;
917 validate_string_length("rule", rule_name, 1, 64)?;
918 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
919 validate_required("Ids", &body["Ids"])?;
920 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
921 let ids = body["Ids"].as_array().ok_or_else(|| missing("Ids"))?;
922
923 let target_ids: Vec<String> = ids
924 .iter()
925 .filter_map(|v| v.as_str().map(|s| s.to_string()))
926 .collect();
927
928 let mut state = self.state.write();
929 let bus_name = state.resolve_bus_name(event_bus_name);
930 let key = (bus_name.clone(), rule_name.to_string());
931
932 let rule = state.rules.get_mut(&key).ok_or_else(|| {
933 AwsServiceError::aws_error(
934 StatusCode::BAD_REQUEST,
935 "ResourceNotFoundException",
936 format!("Rule {rule_name} does not exist on EventBus {bus_name}."),
937 )
938 })?;
939
940 rule.targets.retain(|t| !target_ids.contains(&t.id));
941
942 Ok(AwsResponse::ok_json(json!({
943 "FailedEntryCount": 0,
944 "FailedEntries": [],
945 })))
946 }
947
948 fn list_targets_by_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
949 let body = req.json_body();
950 validate_required("Rule", &body["Rule"])?;
951 let rule_name = body["Rule"].as_str().ok_or_else(|| missing("Rule"))?;
952 validate_string_length("rule", rule_name, 1, 64)?;
953 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
954 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
955 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
956 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
957 let limit = body["Limit"].as_u64().map(|n| n as usize);
958 let next_token = body["NextToken"].as_str();
959
960 let state = self.state.read();
961 let bus_name = state.resolve_bus_name(event_bus_name);
962 let key = (bus_name, rule_name.to_string());
963
964 let rule = state.rules.get(&key).ok_or_else(|| {
965 AwsServiceError::aws_error(
966 StatusCode::BAD_REQUEST,
967 "ResourceNotFoundException",
968 format!("Rule {rule_name} does not exist."),
969 )
970 })?;
971
972 let all_targets = &rule.targets;
973 let start = next_token
974 .and_then(|t| t.parse::<usize>().ok())
975 .unwrap_or(0)
976 .min(all_targets.len());
977 let slice = &all_targets[start..];
978
979 let (page, new_next_token) = if let Some(lim) = limit {
980 if slice.len() > lim {
981 (&slice[..lim], Some((start + lim).to_string()))
982 } else {
983 (slice, None)
984 }
985 } else {
986 (slice, None)
987 };
988
989 let targets: Vec<Value> = page.iter().map(target_to_json).collect();
990
991 let mut resp = json!({ "Targets": targets });
992 if let Some(token) = new_next_token {
993 resp["NextToken"] = json!(token);
994 }
995
996 Ok(AwsResponse::ok_json(resp))
997 }
998
999 fn list_rule_names_by_target(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1000 let body = req.json_body();
1001 validate_required("TargetArn", &body["TargetArn"])?;
1002 let target_arn = body["TargetArn"]
1003 .as_str()
1004 .ok_or_else(|| missing("TargetArn"))?;
1005 validate_string_length("targetArn", target_arn, 1, 1600)?;
1006 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
1007 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1008 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1009 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
1010 let limit = body["Limit"].as_u64().map(|n| n as usize);
1011 let next_token = body["NextToken"].as_str();
1012
1013 let state = self.state.read();
1014 let bus_name = state.resolve_bus_name(event_bus_name);
1015
1016 let mut rule_names: Vec<String> = Vec::new();
1018 for rule in state.rules.values() {
1019 if rule.event_bus_name == bus_name
1020 && rule.targets.iter().any(|t| t.arn == target_arn)
1021 && !rule_names.contains(&rule.name)
1022 {
1023 rule_names.push(rule.name.clone());
1024 }
1025 }
1026 rule_names.sort();
1027
1028 let start = next_token
1029 .and_then(|t| t.parse::<usize>().ok())
1030 .unwrap_or(0)
1031 .min(rule_names.len());
1032 let slice = &rule_names[start..];
1033
1034 let (page, new_next_token) = if let Some(lim) = limit {
1035 if slice.len() > lim {
1036 (&slice[..lim], Some((start + lim).to_string()))
1037 } else {
1038 (slice, None)
1039 }
1040 } else {
1041 (slice, None)
1042 };
1043
1044 let mut resp = json!({ "RuleNames": page });
1045 if let Some(token) = new_next_token {
1046 resp["NextToken"] = json!(token);
1047 }
1048
1049 Ok(AwsResponse::ok_json(resp))
1050 }
1051
1052 fn create_partner_event_source(
1055 &self,
1056 req: &AwsRequest,
1057 ) -> Result<AwsResponse, AwsServiceError> {
1058 let body = req.json_body();
1059 validate_required("Name", &body["Name"])?;
1060 let name = body["Name"]
1061 .as_str()
1062 .ok_or_else(|| missing("Name"))?
1063 .to_string();
1064 validate_string_length("name", &name, 1, 256)?;
1065 validate_required("Account", &body["Account"])?;
1066 let account = body["Account"]
1067 .as_str()
1068 .ok_or_else(|| missing("Account"))?
1069 .to_string();
1070 validate_string_length("account", &account, 12, 12)?;
1071
1072 let mut state = self.state.write();
1073 if state.partner_event_sources.contains_key(&name) {
1074 return Err(AwsServiceError::aws_error(
1075 StatusCode::CONFLICT,
1076 "ResourceAlreadyExistsException",
1077 format!("Partner event source {name} already exists."),
1078 ));
1079 }
1080 let arn = format!(
1081 "arn:aws:events:{}::event-source/aws.partner/{}",
1082 state.region, name
1083 );
1084 let now = Utc::now();
1085 let ps = PartnerEventSource {
1086 name: name.clone(),
1087 arn: arn.clone(),
1088 account,
1089 creation_time: now,
1090 expiration_time: None,
1091 state: "ACTIVE".to_string(),
1092 };
1093 state.partner_event_sources.insert(name.clone(), ps);
1094
1095 Ok(AwsResponse::ok_json(json!({ "EventSourceArn": arn })))
1096 }
1097
1098 fn delete_partner_event_source(
1099 &self,
1100 req: &AwsRequest,
1101 ) -> Result<AwsResponse, AwsServiceError> {
1102 let body = req.json_body();
1103 validate_required("Name", &body["Name"])?;
1104 let name = body["Name"]
1105 .as_str()
1106 .ok_or_else(|| missing("Name"))?
1107 .to_string();
1108 validate_required("Account", &body["Account"])?;
1109 let account = body["Account"]
1110 .as_str()
1111 .ok_or_else(|| missing("Account"))?
1112 .to_string();
1113
1114 let mut state = self.state.write();
1115 match state.partner_event_sources.get(&name) {
1116 Some(ps) if ps.account == account => {
1117 state.partner_event_sources.remove(&name);
1118 }
1119 Some(_) => {
1120 return Err(AwsServiceError::aws_error(
1121 StatusCode::NOT_FOUND,
1122 "ResourceNotFoundException",
1123 format!("Partner event source {name} does not exist for account {account}."),
1124 ));
1125 }
1126 None => {
1127 return Err(AwsServiceError::aws_error(
1128 StatusCode::NOT_FOUND,
1129 "ResourceNotFoundException",
1130 format!("Partner event source {name} does not exist."),
1131 ));
1132 }
1133 }
1134
1135 Ok(AwsResponse::ok_json(json!({})))
1136 }
1137
1138 fn describe_partner_event_source(
1139 &self,
1140 req: &AwsRequest,
1141 ) -> Result<AwsResponse, AwsServiceError> {
1142 let body = req.json_body();
1143 validate_required("Name", &body["Name"])?;
1144 let name = body["Name"]
1145 .as_str()
1146 .ok_or_else(|| missing("Name"))?
1147 .to_string();
1148 validate_string_length("name", &name, 1, 256)?;
1149
1150 let state = self.state.read();
1151 let ps = state.partner_event_sources.get(&name).ok_or_else(|| {
1152 AwsServiceError::aws_error(
1153 StatusCode::NOT_FOUND,
1154 "ResourceNotFoundException",
1155 format!("Partner event source {name} does not exist."),
1156 )
1157 })?;
1158
1159 Ok(AwsResponse::ok_json(json!({
1160 "Arn": ps.arn,
1161 "Name": ps.name,
1162 })))
1163 }
1164
1165 fn list_partner_event_sources(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1166 let body = req.json_body();
1167 validate_required("namePrefix", &body["NamePrefix"])?;
1168 let name_prefix = body["NamePrefix"]
1169 .as_str()
1170 .ok_or_else(|| missing("NamePrefix"))?;
1171 validate_string_length("namePrefix", name_prefix, 1, 256)?;
1172 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1173 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1174 let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
1175
1176 let state = self.state.read();
1177 let all: Vec<Value> = state
1178 .partner_event_sources
1179 .values()
1180 .filter(|ps| ps.name.starts_with(name_prefix))
1181 .map(|ps| {
1182 json!({
1183 "Arn": ps.arn,
1184 "Name": ps.name,
1185 })
1186 })
1187 .collect();
1188
1189 let (sources, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
1190 let mut resp = json!({ "PartnerEventSources": sources });
1191 if let Some(token) = next_token {
1192 resp["NextToken"] = json!(token);
1193 }
1194
1195 Ok(AwsResponse::ok_json(resp))
1196 }
1197
1198 fn list_partner_event_source_accounts(
1199 &self,
1200 req: &AwsRequest,
1201 ) -> Result<AwsResponse, AwsServiceError> {
1202 let body = req.json_body();
1203 validate_required("EventSourceName", &body["EventSourceName"])?;
1204 let event_source_name = body["EventSourceName"]
1205 .as_str()
1206 .ok_or_else(|| missing("EventSourceName"))?;
1207 validate_string_length("eventSourceName", event_source_name, 1, 256)?;
1208 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1209 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1210
1211 let state = self.state.read();
1212 let accounts: Vec<Value> = state
1213 .partner_event_sources
1214 .values()
1215 .filter(|ps| ps.name == event_source_name)
1216 .map(|ps| json!({ "Account": ps.account }))
1217 .collect();
1218
1219 Ok(AwsResponse::ok_json(json!({
1220 "PartnerEventSourceAccounts": accounts
1221 })))
1222 }
1223
1224 fn activate_event_source(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1225 let body = req.json_body();
1226 validate_required("Name", &body["Name"])?;
1227 let name = body["Name"]
1228 .as_str()
1229 .ok_or_else(|| missing("Name"))?
1230 .to_string();
1231
1232 let mut state = self.state.write();
1233 let ps = state.partner_event_sources.get_mut(&name).ok_or_else(|| {
1234 AwsServiceError::aws_error(
1235 StatusCode::NOT_FOUND,
1236 "ResourceNotFoundException",
1237 format!("Event source {name} does not exist."),
1238 )
1239 })?;
1240 ps.state = "ACTIVE".to_string();
1241
1242 Ok(AwsResponse::ok_json(json!({})))
1243 }
1244
1245 fn deactivate_event_source(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1246 let body = req.json_body();
1247 validate_required("Name", &body["Name"])?;
1248 let name = body["Name"]
1249 .as_str()
1250 .ok_or_else(|| missing("Name"))?
1251 .to_string();
1252
1253 let mut state = self.state.write();
1254 let ps = state.partner_event_sources.get_mut(&name).ok_or_else(|| {
1255 AwsServiceError::aws_error(
1256 StatusCode::NOT_FOUND,
1257 "ResourceNotFoundException",
1258 format!("Event source {name} does not exist."),
1259 )
1260 })?;
1261 ps.state = "INACTIVE".to_string();
1262
1263 Ok(AwsResponse::ok_json(json!({})))
1264 }
1265
1266 fn describe_event_source(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1267 let body = req.json_body();
1268 validate_required("Name", &body["Name"])?;
1269 let name = body["Name"]
1270 .as_str()
1271 .ok_or_else(|| missing("Name"))?
1272 .to_string();
1273
1274 let state = self.state.read();
1275 let ps = state.partner_event_sources.get(&name).ok_or_else(|| {
1276 AwsServiceError::aws_error(
1277 StatusCode::NOT_FOUND,
1278 "ResourceNotFoundException",
1279 format!("Event source {name} does not exist."),
1280 )
1281 })?;
1282
1283 Ok(AwsResponse::ok_json(json!({
1284 "Arn": ps.arn,
1285 "Name": ps.name,
1286 "CreatedBy": ps.account,
1287 "CreationTime": ps.creation_time.timestamp() as f64,
1288 "State": ps.state,
1289 })))
1290 }
1291
1292 fn list_event_sources(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1293 let body = req.json_body();
1294 validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 256)?;
1295 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1296 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1297 let name_prefix = body["NamePrefix"].as_str();
1298 let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
1299
1300 let state = self.state.read();
1301 let all: Vec<Value> = state
1302 .partner_event_sources
1303 .values()
1304 .filter(|ps| match name_prefix {
1305 Some(prefix) => ps.name.starts_with(prefix),
1306 None => true,
1307 })
1308 .map(|ps| {
1309 json!({
1310 "Arn": ps.arn,
1311 "Name": ps.name,
1312 "CreatedBy": ps.account,
1313 "CreationTime": ps.creation_time.timestamp() as f64,
1314 "State": ps.state,
1315 })
1316 })
1317 .collect();
1318
1319 let (sources, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
1320 let mut resp = json!({ "EventSources": sources });
1321 if let Some(token) = next_token {
1322 resp["NextToken"] = json!(token);
1323 }
1324
1325 Ok(AwsResponse::ok_json(resp))
1326 }
1327
1328 fn put_partner_events(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1329 let body = req.json_body();
1330 validate_required("Entries", &body["Entries"])?;
1331 let entries = body["Entries"]
1332 .as_array()
1333 .ok_or_else(|| missing("Entries"))?;
1334
1335 let mut result_entries = Vec::new();
1336 for _entry in entries {
1337 let event_id = uuid::Uuid::new_v4().to_string();
1338 result_entries.push(json!({ "EventId": event_id }));
1339 }
1340
1341 Ok(AwsResponse::ok_json(json!({
1342 "FailedEntryCount": 0,
1343 "Entries": result_entries,
1344 })))
1345 }
1346
1347 fn test_event_pattern(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1350 let body = req.json_body();
1351 validate_required("EventPattern", &body["EventPattern"])?;
1352 validate_required("Event", &body["Event"])?;
1353 let event_pattern = body["EventPattern"]
1354 .as_str()
1355 .ok_or_else(|| missing("EventPattern"))?;
1356 let event_str = body["Event"].as_str().ok_or_else(|| missing("Event"))?;
1357
1358 let event: Value = serde_json::from_str(event_str).map_err(|_| {
1360 AwsServiceError::aws_error(
1361 StatusCode::BAD_REQUEST,
1362 "InvalidEventPatternException",
1363 "Event is not valid JSON.",
1364 )
1365 })?;
1366
1367 let _pattern: Value = serde_json::from_str(event_pattern).map_err(|_| {
1369 AwsServiceError::aws_error(
1370 StatusCode::BAD_REQUEST,
1371 "InvalidEventPatternException",
1372 "Event pattern is not valid JSON.",
1373 )
1374 })?;
1375
1376 let source = event["source"].as_str().unwrap_or("");
1377 let detail_type = event["detail-type"].as_str().unwrap_or("");
1378 let detail = event
1379 .get("detail")
1380 .map(|v| serde_json::to_string(v).unwrap_or_default())
1381 .unwrap_or_else(|| "{}".to_string());
1382 let account = event["account"].as_str().unwrap_or("");
1383 let region = event["region"].as_str().unwrap_or("");
1384 let resources: Vec<String> = event["resources"]
1385 .as_array()
1386 .map(|arr| {
1387 arr.iter()
1388 .filter_map(|v| v.as_str().map(|s| s.to_string()))
1389 .collect()
1390 })
1391 .unwrap_or_default();
1392
1393 let result = matches_pattern(
1394 Some(event_pattern),
1395 source,
1396 detail_type,
1397 &detail,
1398 account,
1399 region,
1400 &resources,
1401 );
1402
1403 Ok(AwsResponse::ok_json(json!({ "Result": result })))
1404 }
1405
1406 fn update_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1409 let body = req.json_body();
1410 validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
1411 validate_optional_string_length(
1412 "kmsKeyIdentifier",
1413 body["KmsKeyIdentifier"].as_str(),
1414 0,
1415 2048,
1416 )?;
1417 let name = body["Name"].as_str().unwrap_or("default");
1418
1419 let mut state = self.state.write();
1420 let bus = state.buses.get_mut(name).ok_or_else(|| {
1421 AwsServiceError::aws_error(
1422 StatusCode::BAD_REQUEST,
1423 "ResourceNotFoundException",
1424 format!("Event bus {name} does not exist."),
1425 )
1426 })?;
1427
1428 if let Some(desc) = body["Description"].as_str() {
1429 bus.description = Some(desc.to_string());
1430 }
1431 if let Some(kms) = body["KmsKeyIdentifier"].as_str() {
1432 bus.kms_key_identifier = Some(kms.to_string());
1433 }
1434 if let Some(dlc) = body.get("DeadLetterConfig") {
1435 bus.dead_letter_config = Some(dlc.clone());
1436 }
1437 bus.last_modified_time = Utc::now();
1438
1439 let arn = bus.arn.clone();
1440 let bus_name = bus.name.clone();
1441
1442 Ok(AwsResponse::ok_json(json!({
1443 "Arn": arn,
1444 "Name": bus_name,
1445 })))
1446 }
1447
1448 fn create_endpoint(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1451 let body = req.json_body();
1452 validate_required("Name", &body["Name"])?;
1453 let name = body["Name"]
1454 .as_str()
1455 .ok_or_else(|| missing("Name"))?
1456 .to_string();
1457 validate_string_length("name", &name, 1, 64)?;
1458 validate_required("RoutingConfig", &body["RoutingConfig"])?;
1459 validate_required("EventBuses", &body["EventBuses"])?;
1460
1461 let description = body["Description"].as_str().map(|s| s.to_string());
1462 let routing_config = body["RoutingConfig"].clone();
1463 let replication_config = body.get("ReplicationConfig").cloned();
1464 let event_buses = body["EventBuses"].as_array().cloned().unwrap_or_default();
1465 let role_arn = body["RoleArn"].as_str().map(|s| s.to_string());
1466
1467 let mut state = self.state.write();
1468 if state.endpoints.contains_key(&name) {
1469 return Err(AwsServiceError::aws_error(
1470 StatusCode::CONFLICT,
1471 "ResourceAlreadyExistsException",
1472 format!("Endpoint {name} already exists."),
1473 ));
1474 }
1475
1476 let endpoint_id = format!("{}.abc123", name);
1477 let arn = format!(
1478 "arn:aws:events:{}:{}:endpoint/{}",
1479 req.region, state.account_id, name
1480 );
1481 let endpoint_url = format!(
1482 "https://{}.endpoint.events.{}.amazonaws.com",
1483 endpoint_id, req.region
1484 );
1485 let now = Utc::now();
1486
1487 let endpoint = Endpoint {
1488 name: name.clone(),
1489 arn: arn.clone(),
1490 endpoint_id: endpoint_id.clone(),
1491 endpoint_url: Some(endpoint_url),
1492 description,
1493 routing_config: routing_config.clone(),
1494 replication_config: replication_config.clone(),
1495 event_buses: event_buses.clone(),
1496 role_arn: role_arn.clone(),
1497 state: "ACTIVE".to_string(),
1498 creation_time: now,
1499 last_modified_time: now,
1500 };
1501 state.endpoints.insert(name.clone(), endpoint);
1502
1503 let mut resp = json!({
1504 "Name": name,
1505 "Arn": arn,
1506 "State": "ACTIVE",
1507 "RoutingConfig": routing_config,
1508 "EventBuses": event_buses,
1509 });
1510 if let Some(ref rc) = replication_config {
1511 resp["ReplicationConfig"] = rc.clone();
1512 }
1513 if let Some(ref ra) = role_arn {
1514 resp["RoleArn"] = json!(ra);
1515 }
1516
1517 Ok(AwsResponse::ok_json(resp))
1518 }
1519
1520 fn delete_endpoint(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1521 let body = req.json_body();
1522 validate_required("Name", &body["Name"])?;
1523 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
1524
1525 let mut state = self.state.write();
1526 state.endpoints.remove(name).ok_or_else(|| {
1527 AwsServiceError::aws_error(
1528 StatusCode::BAD_REQUEST,
1529 "ResourceNotFoundException",
1530 format!("Endpoint '{name}' does not exist."),
1531 )
1532 })?;
1533
1534 Ok(AwsResponse::ok_json(json!({})))
1535 }
1536
1537 fn describe_endpoint(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1538 let body = req.json_body();
1539 validate_required("Name", &body["Name"])?;
1540 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
1541
1542 let state = self.state.read();
1543 let ep = state.endpoints.get(name).ok_or_else(|| {
1544 AwsServiceError::aws_error(
1545 StatusCode::BAD_REQUEST,
1546 "ResourceNotFoundException",
1547 format!("Endpoint '{name}' does not exist."),
1548 )
1549 })?;
1550
1551 let mut resp = json!({
1552 "Name": ep.name,
1553 "Arn": ep.arn,
1554 "EndpointId": ep.endpoint_id,
1555 "State": ep.state,
1556 "RoutingConfig": ep.routing_config,
1557 "EventBuses": ep.event_buses,
1558 "CreationTime": ep.creation_time.timestamp() as f64,
1559 "LastModifiedTime": ep.last_modified_time.timestamp() as f64,
1560 });
1561 if let Some(ref url) = ep.endpoint_url {
1562 resp["EndpointUrl"] = json!(url);
1563 }
1564 if let Some(ref desc) = ep.description {
1565 resp["Description"] = json!(desc);
1566 }
1567 if let Some(ref rc) = ep.replication_config {
1568 resp["ReplicationConfig"] = rc.clone();
1569 }
1570 if let Some(ref ra) = ep.role_arn {
1571 resp["RoleArn"] = json!(ra);
1572 }
1573
1574 Ok(AwsResponse::ok_json(resp))
1575 }
1576
1577 fn list_endpoints(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1578 let body = req.json_body();
1579 validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 64)?;
1580 validate_optional_string_length("homeRegion", body["HomeRegion"].as_str(), 9, 20)?;
1581 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1582 validate_optional_range_i64("maxResults", body["MaxResults"].as_i64(), 1, 100)?;
1583 let name_prefix = body["NamePrefix"].as_str();
1584 let limit = body["MaxResults"].as_i64().unwrap_or(100) as usize;
1585
1586 let state = self.state.read();
1587 let all: Vec<Value> = state
1588 .endpoints
1589 .values()
1590 .filter(|ep| match name_prefix {
1591 Some(prefix) => ep.name.starts_with(prefix),
1592 None => true,
1593 })
1594 .map(|ep| {
1595 let mut obj = json!({
1596 "Name": ep.name,
1597 "Arn": ep.arn,
1598 "EndpointId": ep.endpoint_id,
1599 "State": ep.state,
1600 "RoutingConfig": ep.routing_config,
1601 "EventBuses": ep.event_buses,
1602 "CreationTime": ep.creation_time.timestamp() as f64,
1603 "LastModifiedTime": ep.last_modified_time.timestamp() as f64,
1604 });
1605 if let Some(ref url) = ep.endpoint_url {
1606 obj["EndpointUrl"] = json!(url);
1607 }
1608 obj
1609 })
1610 .collect();
1611
1612 let (endpoints, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
1613 let mut resp = json!({ "Endpoints": endpoints });
1614 if let Some(token) = next_token {
1615 resp["NextToken"] = json!(token);
1616 }
1617
1618 Ok(AwsResponse::ok_json(resp))
1619 }
1620
1621 fn update_endpoint(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1622 let body = req.json_body();
1623 validate_required("Name", &body["Name"])?;
1624 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
1625
1626 let mut state = self.state.write();
1627 let ep = state.endpoints.get_mut(name).ok_or_else(|| {
1628 AwsServiceError::aws_error(
1629 StatusCode::BAD_REQUEST,
1630 "ResourceNotFoundException",
1631 format!("Endpoint '{name}' does not exist."),
1632 )
1633 })?;
1634
1635 if let Some(desc) = body["Description"].as_str() {
1636 ep.description = Some(desc.to_string());
1637 }
1638 if !body["RoutingConfig"].is_null() {
1639 ep.routing_config = body["RoutingConfig"].clone();
1640 }
1641 if let Some(rc) = body.get("ReplicationConfig") {
1642 ep.replication_config = Some(rc.clone());
1643 }
1644 if let Some(buses) = body["EventBuses"].as_array() {
1645 ep.event_buses = buses.clone();
1646 }
1647 if let Some(ra) = body["RoleArn"].as_str() {
1648 ep.role_arn = Some(ra.to_string());
1649 }
1650 ep.last_modified_time = Utc::now();
1651
1652 let resp = json!({
1653 "Name": ep.name,
1654 "Arn": ep.arn,
1655 "EndpointId": ep.endpoint_id,
1656 "State": ep.state,
1657 "RoutingConfig": ep.routing_config,
1658 "EventBuses": ep.event_buses,
1659 });
1660
1661 Ok(AwsResponse::ok_json(resp))
1662 }
1663
1664 fn deauthorize_connection(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1667 let body = req.json_body();
1668 validate_required("Name", &body["Name"])?;
1669 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
1670 validate_string_length("name", name, 1, 64)?;
1671
1672 let mut state = self.state.write();
1673 let conn = state.connections.get_mut(name).ok_or_else(|| {
1674 AwsServiceError::aws_error(
1675 StatusCode::BAD_REQUEST,
1676 "ResourceNotFoundException",
1677 format!("Connection '{name}' does not exist."),
1678 )
1679 })?;
1680
1681 conn.connection_state = "DEAUTHORIZING".to_string();
1682 conn.last_modified_time = Utc::now();
1683
1684 let resp = json!({
1685 "ConnectionArn": conn.arn,
1686 "ConnectionState": conn.connection_state,
1687 "CreationTime": conn.creation_time.timestamp() as f64,
1688 "LastModifiedTime": conn.last_modified_time.timestamp() as f64,
1689 "LastAuthorizedTime": conn.last_authorized_time.timestamp() as f64,
1690 });
1691
1692 Ok(AwsResponse::ok_json(resp))
1693 }
1694
1695 fn put_events(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1698 let body = req.json_body();
1699 validate_required("Entries", &body["Entries"])?;
1700 validate_optional_string_length("endpointId", body["EndpointId"].as_str(), 1, 50)?;
1701 let entries = body["Entries"]
1702 .as_array()
1703 .ok_or_else(|| missing("Entries"))?;
1704
1705 if entries.is_empty() {
1707 return Err(AwsServiceError::aws_error(
1708 StatusCode::BAD_REQUEST,
1709 "ValidationException",
1710 "1 validation error detected: Value '[PutEventsRequestEntry]' at 'entries' failed to satisfy constraint: Member must have length greater than or equal to 1",
1711 ));
1712 }
1713 if entries.len() > 10 {
1714 return Err(AwsServiceError::aws_error(
1715 StatusCode::BAD_REQUEST,
1716 "ValidationException",
1717 "1 validation error detected: Value '[PutEventsRequestEntry]' at 'entries' failed to satisfy constraint: Member must have length less than or equal to 10",
1718 ));
1719 }
1720
1721 let mut state = self.state.write();
1722 let mut result_entries = Vec::new();
1723 let mut events_to_deliver = Vec::new();
1724 let mut failed_count = 0;
1725
1726 for entry in entries {
1727 let source = entry["Source"].as_str().unwrap_or("").to_string();
1728 let detail_type = entry["DetailType"].as_str().unwrap_or("").to_string();
1729 let detail = entry["Detail"].as_str().unwrap_or("").to_string();
1730
1731 if source.is_empty() {
1733 failed_count += 1;
1734 result_entries.push(json!({
1735 "ErrorCode": "InvalidArgument",
1736 "ErrorMessage": "Parameter Source is not valid. Reason: Source is a required argument.",
1737 }));
1738 continue;
1739 }
1740 if detail_type.is_empty() {
1741 failed_count += 1;
1742 result_entries.push(json!({
1743 "ErrorCode": "InvalidArgument",
1744 "ErrorMessage": "Parameter DetailType is not valid. Reason: DetailType is a required argument.",
1745 }));
1746 continue;
1747 }
1748 if detail.is_empty() {
1749 failed_count += 1;
1750 result_entries.push(json!({
1751 "ErrorCode": "InvalidArgument",
1752 "ErrorMessage": "Parameter Detail is not valid. Reason: Detail is a required argument.",
1753 }));
1754 continue;
1755 }
1756
1757 if serde_json::from_str::<Value>(&detail).is_err() {
1759 failed_count += 1;
1760 result_entries.push(json!({
1761 "ErrorCode": "MalformedDetail",
1762 "ErrorMessage": "Detail is malformed.",
1763 }));
1764 continue;
1765 }
1766
1767 let event_id = uuid::Uuid::new_v4().to_string();
1768 let raw_bus = entry["EventBusName"]
1769 .as_str()
1770 .unwrap_or("default")
1771 .to_string();
1772 let event_bus_name = state.resolve_bus_name(&raw_bus);
1773 let time = if let Some(s) = entry["Time"].as_str() {
1774 DateTime::parse_from_rfc3339(s)
1775 .map(|dt| dt.with_timezone(&Utc))
1776 .unwrap_or_else(|_| Utc::now())
1777 } else if let Some(ts) = entry["Time"].as_f64() {
1778 DateTime::from_timestamp(ts as i64, ((ts.fract()) * 1_000_000_000.0) as u32)
1779 .unwrap_or_else(Utc::now)
1780 } else if let Some(ts) = entry["Time"].as_i64() {
1781 DateTime::from_timestamp(ts, 0).unwrap_or_else(Utc::now)
1782 } else {
1783 Utc::now()
1784 };
1785 let resources: Vec<String> = entry["Resources"]
1786 .as_array()
1787 .map(|arr| {
1788 arr.iter()
1789 .filter_map(|v| v.as_str().map(|s| s.to_string()))
1790 .collect()
1791 })
1792 .unwrap_or_default();
1793
1794 let event = PutEvent {
1795 event_id: event_id.clone(),
1796 source: source.clone(),
1797 detail_type: detail_type.clone(),
1798 detail: detail.clone(),
1799 event_bus_name: event_bus_name.clone(),
1800 time,
1801 resources: resources.clone(),
1802 };
1803
1804 let archive_keys: Vec<String> = state.archives.keys().cloned().collect();
1806 for akey in archive_keys {
1807 let (archive_bus, archive_pattern, archive_enabled) = {
1808 let a = &state.archives[&akey];
1809 (
1810 state.resolve_bus_name(&a.event_source_arn),
1811 a.event_pattern.clone(),
1812 a.state == "ENABLED",
1813 )
1814 };
1815 if archive_bus == event_bus_name && archive_enabled {
1816 let pattern_matches = matches_pattern(
1818 archive_pattern.as_deref(),
1819 &source,
1820 &detail_type,
1821 &detail,
1822 &req.account_id,
1823 &req.region,
1824 &resources,
1825 );
1826 if pattern_matches {
1827 if let Some(archive) = state.archives.get_mut(&akey) {
1828 archive.event_count += 1;
1829 archive.size_bytes += detail.len() as i64;
1830 archive.events.push(event.clone());
1831 }
1832 }
1833 }
1834 }
1835
1836 state.events.push(event);
1837
1838 let matching_targets: Vec<EventTarget> = state
1840 .rules
1841 .values()
1842 .filter(|r| {
1843 r.event_bus_name == event_bus_name
1844 && r.state == "ENABLED"
1845 && matches_pattern(
1846 r.event_pattern.as_deref(),
1847 &source,
1848 &detail_type,
1849 &detail,
1850 &req.account_id,
1851 &req.region,
1852 &resources,
1853 )
1854 })
1855 .flat_map(|r| r.targets.clone())
1856 .collect();
1857
1858 if !matching_targets.is_empty() {
1859 events_to_deliver.push((
1860 event_id.clone(),
1861 source,
1862 detail_type,
1863 detail,
1864 time,
1865 resources,
1866 matching_targets,
1867 ));
1868 }
1869
1870 result_entries.push(json!({ "EventId": event_id }));
1871 }
1872
1873 drop(state);
1875
1876 for (event_id, source, detail_type, detail, time, resources, targets) in events_to_deliver {
1878 let detail_value: Value = serde_json::from_str(&detail).unwrap_or(json!({}));
1879 let event_json = json!({
1880 "version": "0",
1881 "id": event_id,
1882 "source": source,
1883 "account": req.account_id,
1884 "detail-type": detail_type,
1885 "detail": detail_value,
1886 "time": time.format("%Y-%m-%dT%H:%M:%SZ").to_string(),
1887 "region": req.region,
1888 "resources": resources,
1889 });
1890 let event_str = event_json.to_string();
1891
1892 for target in targets {
1893 let arn = &target.arn;
1894 let body_str = if let Some(ref transformer) = target.input_transformer {
1896 apply_input_transformer(transformer, &event_json)
1897 } else if let Some(ref input) = target.input {
1898 input.clone()
1899 } else if let Some(ref input_path) = target.input_path {
1900 resolve_json_path(&event_json, input_path)
1901 .map(|v| v.to_string())
1902 .unwrap_or_else(|| event_str.clone())
1903 } else {
1904 event_str.clone()
1905 };
1906
1907 if arn.contains(":sqs:") {
1908 let group_id = target
1910 .sqs_parameters
1911 .as_ref()
1912 .and_then(|p| p["MessageGroupId"].as_str())
1913 .map(|s| s.to_string());
1914 if group_id.is_some() {
1915 self.delivery.send_to_sqs_with_attrs(
1919 arn,
1920 &body_str,
1921 &HashMap::new(),
1922 group_id.as_deref(),
1923 None,
1924 );
1925 } else {
1926 self.delivery.send_to_sqs(arn, &body_str, &HashMap::new());
1927 }
1928 } else if arn.contains(":sns:") {
1929 self.delivery
1930 .publish_to_sns(arn, &body_str, Some(&detail_type));
1931 } else if arn.contains(":lambda:") {
1932 tracing::info!(
1933 function_arn = %arn,
1934 payload = %body_str,
1935 "EventBridge delivering to Lambda function"
1936 );
1937 let now = Utc::now();
1938 let mut state = self.state.write();
1939 state
1940 .lambda_invocations
1941 .push(crate::state::LambdaInvocation {
1942 function_arn: arn.clone(),
1943 payload: body_str.clone(),
1944 timestamp: now,
1945 });
1946 drop(state);
1947 if let Some(ref ls) = self.lambda_state {
1949 ls.write().invocations.push(LambdaInvocation {
1950 function_arn: arn.clone(),
1951 payload: body_str.clone(),
1952 timestamp: now,
1953 source: "aws:events".to_string(),
1954 });
1955 }
1956 invoke_lambda_async(
1958 &self.container_runtime,
1959 &self.lambda_state,
1960 arn,
1961 &body_str,
1962 );
1963 } else if arn.contains(":logs:") {
1964 tracing::info!(
1965 log_group_arn = %arn,
1966 payload = %body_str,
1967 "EventBridge delivering to CloudWatch Logs"
1968 );
1969 let now = Utc::now();
1970 let mut state = self.state.write();
1971 state.log_deliveries.push(crate::state::LogDelivery {
1972 log_group_arn: arn.clone(),
1973 payload: body_str.clone(),
1974 timestamp: now,
1975 });
1976 drop(state);
1977 if let Some(ref log_state) = self.logs_state {
1979 deliver_to_logs(log_state, arn, &body_str, now);
1980 }
1981 } else if arn.contains(":kinesis:") {
1982 tracing::info!(
1983 stream_arn = %arn,
1984 "EventBridge delivering to Kinesis stream"
1985 );
1986 self.delivery.send_to_kinesis(arn, &body_str, &event_id);
1988 } else if arn.contains(":states:") {
1989 tracing::info!(
1990 state_machine_arn = %arn,
1991 payload = %body_str,
1992 "EventBridge delivering to Step Functions (stub)"
1993 );
1994 let mut state = self.state.write();
1995 state
1996 .step_function_executions
1997 .push(crate::state::StepFunctionExecution {
1998 state_machine_arn: arn.clone(),
1999 payload: body_str.clone(),
2000 timestamp: Utc::now(),
2001 });
2002 } else if arn.contains(":api-destination/") {
2003 let state = self.state.read();
2005 let dest = state.api_destinations.values().find(|d| d.arn == *arn);
2006 if let Some(dest) = dest {
2007 let url = dest.invocation_endpoint.clone();
2008 let method = dest.http_method.clone();
2009 let conn = state
2010 .connections
2011 .values()
2012 .find(|c| c.arn == dest.connection_arn)
2013 .cloned();
2014 drop(state);
2015
2016 let payload = body_str.clone();
2017 tokio::spawn(async move {
2018 let client = reqwest::Client::new();
2019 let mut req_builder = match method.as_str() {
2020 "GET" => client.get(&url),
2021 "PUT" => client.put(&url),
2022 "DELETE" => client.delete(&url),
2023 "PATCH" => client.patch(&url),
2024 "HEAD" => client.head(&url),
2025 _ => client.post(&url),
2026 };
2027 req_builder = req_builder.header("Content-Type", "application/json");
2028
2029 if let Some(conn) = conn {
2031 req_builder = apply_connection_auth(req_builder, &conn);
2032 }
2033
2034 let result = req_builder.body(payload).send().await;
2035 if let Err(e) = result {
2036 tracing::warn!(
2037 endpoint = %url,
2038 error = %e,
2039 "EventBridge ApiDestination delivery failed"
2040 );
2041 }
2042 });
2043 }
2044 } else if arn.starts_with("https://") || arn.starts_with("http://") {
2045 let url = arn.clone();
2047 let payload = body_str.clone();
2048 tokio::spawn(async move {
2049 let client = reqwest::Client::new();
2050 let result = client
2051 .post(&url)
2052 .header("Content-Type", "application/json")
2053 .body(payload)
2054 .send()
2055 .await;
2056 if let Err(e) = result {
2057 tracing::warn!(
2058 endpoint = %url,
2059 error = %e,
2060 "EventBridge HTTP target delivery failed"
2061 );
2062 }
2063 });
2064 }
2065 }
2066 }
2067
2068 let resp = json!({
2069 "FailedEntryCount": failed_count,
2070 "Entries": result_entries,
2071 });
2072
2073 Ok(AwsResponse::ok_json(resp))
2074 }
2075
2076 fn tag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2079 let body = req.json_body();
2080 validate_required("ResourceARN", &body["ResourceARN"])?;
2081 let arn = body["ResourceARN"]
2082 .as_str()
2083 .ok_or_else(|| missing("ResourceARN"))?;
2084 validate_string_length("resourceARN", arn, 1, 1600)?;
2085 validate_required("Tags", &body["Tags"])?;
2086
2087 let mut state = self.state.write();
2088 let tag_map = find_tags_mut(&mut state, arn)?;
2089
2090 fakecloud_core::tags::apply_tags(tag_map, &body, "Tags", "Key", "Value").map_err(|f| {
2091 AwsServiceError::aws_error(
2092 StatusCode::BAD_REQUEST,
2093 "ValidationException",
2094 format!("{f} must be a list"),
2095 )
2096 })?;
2097
2098 Ok(AwsResponse::ok_json(json!({})))
2099 }
2100
2101 fn untag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2102 let body = req.json_body();
2103 validate_required("ResourceARN", &body["ResourceARN"])?;
2104 let arn = body["ResourceARN"]
2105 .as_str()
2106 .ok_or_else(|| missing("ResourceARN"))?;
2107 validate_string_length("resourceARN", arn, 1, 1600)?;
2108 validate_required("TagKeys", &body["TagKeys"])?;
2109
2110 let mut state = self.state.write();
2111 let tag_map = find_tags_mut(&mut state, arn)?;
2112
2113 fakecloud_core::tags::remove_tags(tag_map, &body, "TagKeys").map_err(|f| {
2114 AwsServiceError::aws_error(
2115 StatusCode::BAD_REQUEST,
2116 "ValidationException",
2117 format!("{f} must be a list"),
2118 )
2119 })?;
2120
2121 Ok(AwsResponse::ok_json(json!({})))
2122 }
2123
2124 fn list_tags_for_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2125 let body = req.json_body();
2126 validate_required("ResourceARN", &body["ResourceARN"])?;
2127 let arn = body["ResourceARN"]
2128 .as_str()
2129 .ok_or_else(|| missing("ResourceARN"))?;
2130 validate_string_length("resourceARN", arn, 1, 1600)?;
2131
2132 let state = self.state.read();
2133 let tag_map = find_tags(&state, arn)?;
2134
2135 let tags = fakecloud_core::tags::tags_to_json(tag_map, "Key", "Value");
2136
2137 Ok(AwsResponse::ok_json(json!({ "Tags": tags })))
2138 }
2139
2140 fn create_archive(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2143 let body = req.json_body();
2144 validate_required("ArchiveName", &body["ArchiveName"])?;
2145 let name = body["ArchiveName"]
2146 .as_str()
2147 .ok_or_else(|| missing("ArchiveName"))?
2148 .to_string();
2149 validate_string_length("archiveName", &name, 1, 48)?;
2150 validate_required("EventSourceArn", &body["EventSourceArn"])?;
2151 let event_source_arn = body["EventSourceArn"]
2152 .as_str()
2153 .ok_or_else(|| missing("EventSourceArn"))?
2154 .to_string();
2155 validate_string_length("eventSourceArn", &event_source_arn, 1, 1600)?;
2156 validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2157 validate_optional_string_length("eventPattern", body["EventPattern"].as_str(), 0, 4096)?;
2158 if let Some(rd) = body["RetentionDays"].as_i64() {
2159 validate_range_i64("retentionDays", rd, 0, i64::MAX)?;
2160 }
2161 let description = body["Description"].as_str().map(|s| s.to_string());
2162 let event_pattern = body["EventPattern"].as_str().map(|s| s.to_string());
2163 let retention_days = body["RetentionDays"].as_i64().unwrap_or(0);
2164
2165 if let Some(ref pattern) = event_pattern {
2167 validate_event_pattern(pattern)?;
2168 }
2169
2170 let mut state = self.state.write();
2171
2172 let bus_name = state.resolve_bus_name(&event_source_arn);
2174 if !state.buses.contains_key(&bus_name) {
2175 return Err(AwsServiceError::aws_error(
2176 StatusCode::BAD_REQUEST,
2177 "ResourceNotFoundException",
2178 format!("Event bus {bus_name} does not exist."),
2179 ));
2180 }
2181
2182 if state.archives.contains_key(&name) {
2184 return Err(AwsServiceError::aws_error(
2185 StatusCode::BAD_REQUEST,
2186 "ResourceAlreadyExistsException",
2187 format!("Archive {name} already exists."),
2188 ));
2189 }
2190
2191 let now = Utc::now();
2192 let arn = format!(
2193 "arn:aws:events:{}:{}:archive/{}",
2194 req.region, state.account_id, name
2195 );
2196
2197 let archive = Archive {
2198 name: name.clone(),
2199 arn: arn.clone(),
2200 event_source_arn: event_source_arn.clone(),
2201 description,
2202 event_pattern: event_pattern.clone(),
2203 retention_days,
2204 state: "ENABLED".to_string(),
2205 creation_time: now,
2206 event_count: 0,
2207 size_bytes: 0,
2208 events: Vec::new(),
2209 };
2210 state.archives.insert(name.clone(), archive);
2211
2212 let rule_name = format!("Events-Archive-{name}");
2214 let rule_arn = format!(
2215 "arn:aws:events:{}:{}:rule/{}",
2216 req.region, state.account_id, rule_name
2217 );
2218 let rule_event_pattern = {
2220 let mut merged = if let Some(ref ep) = event_pattern {
2221 serde_json::from_str::<Value>(ep).unwrap_or_else(|_| json!({}))
2222 } else {
2223 json!({})
2224 };
2225 if let Some(obj) = merged.as_object_mut() {
2226 obj.insert("replay-name".to_string(), json!([{"exists": false}]));
2227 }
2228 serde_json::to_string(&merged).unwrap_or_default()
2229 };
2230
2231 let archive_target = EventTarget {
2233 id: name.clone(),
2234 arn: format!("arn:aws:events:{}:::", req.region),
2235 input: None,
2236 input_path: None,
2237 input_transformer: Some(json!({
2238 "InputPathsMap": {},
2239 "InputTemplate": format!(
2240 "{{\"archive-arn\": \"{}\", \"event\": <aws.events.event.json>, \"ingestion-time\": <aws.events.event.ingestion-time>}}",
2241 arn
2242 )
2243 })),
2244 sqs_parameters: None,
2245 };
2246
2247 let archive_rule = EventRule {
2248 name: rule_name.clone(),
2249 arn: rule_arn,
2250 event_bus_name: bus_name.clone(),
2251 event_pattern: Some(rule_event_pattern),
2252 schedule_expression: None,
2253 state: "ENABLED".to_string(),
2254 description: None,
2255 role_arn: None,
2256 managed_by: Some("prod.vhs.events.aws.internal".to_string()),
2257 created_by: Some(state.account_id.clone()),
2258 targets: vec![archive_target],
2259 tags: HashMap::new(),
2260 last_fired: None,
2261 };
2262 let key = (bus_name, rule_name);
2263 state.rules.insert(key, archive_rule);
2264
2265 Ok(AwsResponse::ok_json(json!({
2266 "ArchiveArn": arn,
2267 "CreationTime": now.timestamp() as f64,
2268 "State": "ENABLED",
2269 })))
2270 }
2271
2272 fn describe_archive(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2273 let body = req.json_body();
2274 validate_required("ArchiveName", &body["ArchiveName"])?;
2275 let name = body["ArchiveName"]
2276 .as_str()
2277 .ok_or_else(|| missing("ArchiveName"))?;
2278 validate_string_length("archiveName", name, 1, 48)?;
2279
2280 let state = self.state.read();
2281 let archive = state.archives.get(name).ok_or_else(|| {
2282 AwsServiceError::aws_error(
2283 StatusCode::BAD_REQUEST,
2284 "ResourceNotFoundException",
2285 format!("Archive {name} does not exist."),
2286 )
2287 })?;
2288
2289 let mut resp = json!({
2290 "ArchiveArn": archive.arn,
2291 "ArchiveName": archive.name,
2292 "CreationTime": archive.creation_time.timestamp() as f64,
2293 "EventCount": archive.event_count,
2294 "EventSourceArn": archive.event_source_arn,
2295 "RetentionDays": archive.retention_days,
2296 "SizeBytes": archive.size_bytes,
2297 "State": archive.state,
2298 });
2299 if let Some(ref desc) = archive.description {
2300 resp["Description"] = json!(desc);
2301 }
2302 if let Some(ref ep) = archive.event_pattern {
2303 resp["EventPattern"] = json!(ep);
2304 }
2305
2306 Ok(AwsResponse::ok_json(resp))
2307 }
2308
2309 fn list_archives(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2310 let body = req.json_body();
2311 validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 48)?;
2312 validate_optional_string_length(
2313 "eventSourceArn",
2314 body["EventSourceArn"].as_str(),
2315 1,
2316 1600,
2317 )?;
2318 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
2319 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
2320 let name_prefix = body["NamePrefix"].as_str();
2321 let source_arn = body["EventSourceArn"].as_str();
2322 let archive_state = body["State"].as_str();
2323
2324 let filter_count = [
2326 name_prefix.is_some(),
2327 source_arn.is_some(),
2328 archive_state.is_some(),
2329 ]
2330 .iter()
2331 .filter(|&&x| x)
2332 .count();
2333 if filter_count > 1 {
2334 return Err(AwsServiceError::aws_error(
2335 StatusCode::BAD_REQUEST,
2336 "ValidationException",
2337 "At most one filter is allowed for ListArchives. Use either : State, EventSourceArn, or NamePrefix.",
2338 ));
2339 }
2340
2341 if let Some(s) = archive_state {
2343 let valid = [
2344 "ENABLED",
2345 "DISABLED",
2346 "CREATING",
2347 "UPDATING",
2348 "CREATE_FAILED",
2349 "UPDATE_FAILED",
2350 ];
2351 if !valid.contains(&s) {
2352 return Err(AwsServiceError::aws_error(
2353 StatusCode::BAD_REQUEST,
2354 "ValidationException",
2355 format!(
2356 "1 validation error detected: Value '{}' at 'state' failed to satisfy constraint: Member must satisfy enum value set: [ENABLED, DISABLED, CREATING, UPDATING, CREATE_FAILED, UPDATE_FAILED]",
2357 s
2358 ),
2359 ));
2360 }
2361 }
2362
2363 let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
2364
2365 let state = self.state.read();
2366 let all: Vec<Value> = state
2367 .archives
2368 .values()
2369 .filter(|a| {
2370 if let Some(prefix) = name_prefix {
2371 a.name.starts_with(prefix)
2372 } else if let Some(arn) = source_arn {
2373 a.event_source_arn == arn
2374 } else if let Some(s) = archive_state {
2375 a.state == s
2376 } else {
2377 true
2378 }
2379 })
2380 .map(|a| {
2381 json!({
2382 "ArchiveName": a.name,
2383 "CreationTime": a.creation_time.timestamp() as f64,
2384 "EventCount": a.event_count,
2385 "EventSourceArn": a.event_source_arn,
2386 "RetentionDays": a.retention_days,
2387 "SizeBytes": a.size_bytes,
2388 "State": a.state,
2389 })
2390 })
2391 .collect();
2392
2393 let (archives, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
2394 let mut resp = json!({ "Archives": archives });
2395 if let Some(token) = next_token {
2396 resp["NextToken"] = json!(token);
2397 }
2398
2399 Ok(AwsResponse::ok_json(resp))
2400 }
2401
2402 fn update_archive(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2403 let body = req.json_body();
2404 validate_required("ArchiveName", &body["ArchiveName"])?;
2405 let name = body["ArchiveName"]
2406 .as_str()
2407 .ok_or_else(|| missing("ArchiveName"))?;
2408 validate_string_length("archiveName", name, 1, 48)?;
2409 validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2410 validate_optional_string_length("eventPattern", body["EventPattern"].as_str(), 0, 4096)?;
2411 if let Some(rd) = body["RetentionDays"].as_i64() {
2412 validate_range_i64("retentionDays", rd, 0, i64::MAX)?;
2413 }
2414
2415 if let Some(pattern) = body["EventPattern"].as_str() {
2417 validate_event_pattern(pattern)?;
2418 }
2419
2420 let mut state = self.state.write();
2421 let archive = state.archives.get_mut(name).ok_or_else(|| {
2422 AwsServiceError::aws_error(
2423 StatusCode::BAD_REQUEST,
2424 "ResourceNotFoundException",
2425 format!("Archive {name} does not exist."),
2426 )
2427 })?;
2428
2429 if let Some(desc) = body["Description"].as_str() {
2430 archive.description = Some(desc.to_string());
2431 }
2432 if let Some(pattern) = body["EventPattern"].as_str() {
2433 archive.event_pattern = Some(pattern.to_string());
2434 }
2435 if let Some(days) = body["RetentionDays"].as_i64() {
2436 archive.retention_days = days;
2437 }
2438
2439 Ok(AwsResponse::ok_json(json!({
2440 "ArchiveArn": archive.arn,
2441 "CreationTime": archive.creation_time.timestamp() as f64,
2442 "State": archive.state,
2443 })))
2444 }
2445
2446 fn delete_archive(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2447 let body = req.json_body();
2448 validate_required("ArchiveName", &body["ArchiveName"])?;
2449 let name = body["ArchiveName"]
2450 .as_str()
2451 .ok_or_else(|| missing("ArchiveName"))?;
2452 validate_string_length("archiveName", name, 1, 48)?;
2453
2454 let mut state = self.state.write();
2455 if !state.archives.contains_key(name) {
2456 return Err(AwsServiceError::aws_error(
2457 StatusCode::BAD_REQUEST,
2458 "ResourceNotFoundException",
2459 format!("Archive {name} does not exist."),
2460 ));
2461 }
2462
2463 state.archives.remove(name);
2464
2465 let rule_name = format!("Events-Archive-{name}");
2467 state.rules.retain(|k, _| k.1 != rule_name);
2468
2469 Ok(AwsResponse::ok_json(json!({})))
2470 }
2471
2472 fn create_connection(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2475 let body = req.json_body();
2476 validate_required("Name", &body["Name"])?;
2477 let name = body["Name"]
2478 .as_str()
2479 .ok_or_else(|| missing("Name"))?
2480 .to_string();
2481 validate_string_length("name", &name, 1, 64)?;
2482 validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2483 validate_required("AuthorizationType", &body["AuthorizationType"])?;
2484 let description = body["Description"].as_str().map(|s| s.to_string());
2485 let auth_type = body["AuthorizationType"]
2486 .as_str()
2487 .ok_or_else(|| missing("AuthorizationType"))?
2488 .to_string();
2489 validate_enum(
2490 "authorizationType",
2491 &auth_type,
2492 &["BASIC", "OAUTH_CLIENT_CREDENTIALS", "API_KEY"],
2493 )?;
2494 validate_optional_string_length(
2495 "kmsKeyIdentifier",
2496 body["KmsKeyIdentifier"].as_str(),
2497 0,
2498 2048,
2499 )?;
2500 validate_required("AuthParameters", &body["AuthParameters"])?;
2501 let auth_params = body["AuthParameters"].clone();
2502
2503 let mut state = self.state.write();
2504 let now = Utc::now();
2505 let conn_uuid = uuid::Uuid::new_v4();
2506 let arn = format!(
2507 "arn:aws:events:{}:{}:connection/{}/{}",
2508 req.region, state.account_id, name, conn_uuid
2509 );
2510 let secret_arn = format!(
2511 "arn:aws:secretsmanager:{}:{}:secret:events!connection/{}/{}",
2512 req.region, state.account_id, name, conn_uuid
2513 );
2514
2515 let conn = Connection {
2516 name: name.clone(),
2517 arn: arn.clone(),
2518 description,
2519 authorization_type: auth_type.clone(),
2520 auth_parameters: auth_params,
2521 connection_state: "AUTHORIZED".to_string(),
2522 secret_arn: secret_arn.clone(),
2523 creation_time: now,
2524 last_modified_time: now,
2525 last_authorized_time: now,
2526 };
2527 state.connections.insert(name, conn);
2528
2529 Ok(AwsResponse::ok_json(json!({
2530 "ConnectionArn": arn,
2531 "ConnectionState": "AUTHORIZED",
2532 "CreationTime": now.timestamp() as f64,
2533 "LastModifiedTime": now.timestamp() as f64,
2534 })))
2535 }
2536
2537 fn describe_connection(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2538 let body = req.json_body();
2539 validate_required("Name", &body["Name"])?;
2540 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
2541 validate_string_length("name", name, 1, 64)?;
2542
2543 let state = self.state.read();
2544 let conn = state.connections.get(name).ok_or_else(|| {
2545 AwsServiceError::aws_error(
2546 StatusCode::BAD_REQUEST,
2547 "ResourceNotFoundException",
2548 format!("Connection '{name}' does not exist."),
2549 )
2550 })?;
2551
2552 let auth_params_response =
2554 build_auth_params_response(&conn.authorization_type, &conn.auth_parameters);
2555
2556 let mut resp = json!({
2557 "ConnectionArn": conn.arn,
2558 "Name": conn.name,
2559 "AuthorizationType": conn.authorization_type,
2560 "AuthParameters": auth_params_response,
2561 "ConnectionState": conn.connection_state,
2562 "SecretArn": conn.secret_arn,
2563 "CreationTime": conn.creation_time.timestamp() as f64,
2564 "LastModifiedTime": conn.last_modified_time.timestamp() as f64,
2565 "LastAuthorizedTime": conn.last_authorized_time.timestamp() as f64,
2566 });
2567 if let Some(ref desc) = conn.description {
2568 resp["Description"] = json!(desc);
2569 }
2570
2571 Ok(AwsResponse::ok_json(resp))
2572 }
2573
2574 fn list_connections(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2575 let body = req.json_body();
2576 validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 64)?;
2577 validate_optional_enum(
2578 "connectionState",
2579 body["ConnectionState"].as_str(),
2580 &[
2581 "CREATING",
2582 "UPDATING",
2583 "DELETING",
2584 "AUTHORIZED",
2585 "DEAUTHORIZED",
2586 "AUTHORIZING",
2587 "DEAUTHORIZING",
2588 "ACTIVE",
2589 "FAILED_CONNECTIVITY",
2590 ],
2591 )?;
2592 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
2593 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
2594
2595 let name_prefix = body["NamePrefix"].as_str();
2596 let connection_state = body["ConnectionState"].as_str();
2597 let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
2598
2599 let state = self.state.read();
2600 let all: Vec<Value> = state
2601 .connections
2602 .values()
2603 .filter(|c| {
2604 if let Some(prefix) = name_prefix {
2605 if !c.name.starts_with(prefix) {
2606 return false;
2607 }
2608 }
2609 if let Some(cs) = connection_state {
2610 if c.connection_state != cs {
2611 return false;
2612 }
2613 }
2614 true
2615 })
2616 .map(|c| {
2617 json!({
2618 "ConnectionArn": c.arn,
2619 "Name": c.name,
2620 "AuthorizationType": c.authorization_type,
2621 "ConnectionState": c.connection_state,
2622 "CreationTime": c.creation_time.timestamp() as f64,
2623 "LastModifiedTime": c.last_modified_time.timestamp() as f64,
2624 "LastAuthorizedTime": c.last_authorized_time.timestamp() as f64,
2625 })
2626 })
2627 .collect();
2628
2629 let (conns, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
2630 let mut resp = json!({ "Connections": conns });
2631 if let Some(token) = next_token {
2632 resp["NextToken"] = json!(token);
2633 }
2634
2635 Ok(AwsResponse::ok_json(resp))
2636 }
2637
2638 fn update_connection(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2639 let body = req.json_body();
2640 validate_required("Name", &body["Name"])?;
2641 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
2642 validate_string_length("name", name, 1, 64)?;
2643 validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2644 validate_optional_enum(
2645 "authorizationType",
2646 body["AuthorizationType"].as_str(),
2647 &["BASIC", "OAUTH_CLIENT_CREDENTIALS", "API_KEY"],
2648 )?;
2649
2650 let mut state = self.state.write();
2651 let conn = state.connections.get_mut(name).ok_or_else(|| {
2652 AwsServiceError::aws_error(
2653 StatusCode::BAD_REQUEST,
2654 "ResourceNotFoundException",
2655 format!("Connection '{name}' does not exist."),
2656 )
2657 })?;
2658
2659 if let Some(desc) = body["Description"].as_str() {
2660 conn.description = Some(desc.to_string());
2661 }
2662 if let Some(auth_type) = body["AuthorizationType"].as_str() {
2663 conn.authorization_type = auth_type.to_string();
2664 }
2665 if body.get("AuthParameters").is_some() {
2666 conn.auth_parameters = body["AuthParameters"].clone();
2667 }
2668 conn.last_modified_time = Utc::now();
2669
2670 Ok(AwsResponse::ok_json(json!({
2671 "ConnectionArn": conn.arn,
2672 "ConnectionState": conn.connection_state,
2673 "CreationTime": conn.creation_time.timestamp() as f64,
2674 "LastModifiedTime": conn.last_modified_time.timestamp() as f64,
2675 "LastAuthorizedTime": conn.last_authorized_time.timestamp() as f64,
2676 })))
2677 }
2678
2679 fn delete_connection(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2680 let body = req.json_body();
2681 validate_required("Name", &body["Name"])?;
2682 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
2683 validate_string_length("name", name, 1, 64)?;
2684
2685 let mut state = self.state.write();
2686 let conn = state.connections.remove(name).ok_or_else(|| {
2687 AwsServiceError::aws_error(
2688 StatusCode::BAD_REQUEST,
2689 "ResourceNotFoundException",
2690 format!("Connection '{name}' does not exist."),
2691 )
2692 })?;
2693
2694 Ok(AwsResponse::ok_json(json!({
2695 "ConnectionArn": conn.arn,
2696 "ConnectionState": conn.connection_state,
2697 "CreationTime": conn.creation_time.timestamp() as f64,
2698 "LastModifiedTime": conn.last_modified_time.timestamp() as f64,
2699 "LastAuthorizedTime": conn.last_authorized_time.timestamp() as f64,
2700 })))
2701 }
2702
2703 fn create_api_destination(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2706 let body = req.json_body();
2707 validate_required("Name", &body["Name"])?;
2708 let name = body["Name"]
2709 .as_str()
2710 .ok_or_else(|| missing("Name"))?
2711 .to_string();
2712 validate_string_length("name", &name, 1, 64)?;
2713 validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2714 validate_required("ConnectionArn", &body["ConnectionArn"])?;
2715 let description = body["Description"].as_str().map(|s| s.to_string());
2716 let connection_arn = body["ConnectionArn"]
2717 .as_str()
2718 .ok_or_else(|| missing("ConnectionArn"))?
2719 .to_string();
2720 validate_string_length("connectionArn", &connection_arn, 1, 1600)?;
2721 validate_required("InvocationEndpoint", &body["InvocationEndpoint"])?;
2722 let endpoint = body["InvocationEndpoint"]
2723 .as_str()
2724 .ok_or_else(|| missing("InvocationEndpoint"))?
2725 .to_string();
2726 validate_string_length("invocationEndpoint", &endpoint, 1, 2048)?;
2727 validate_required("HttpMethod", &body["HttpMethod"])?;
2728 let http_method = body["HttpMethod"]
2729 .as_str()
2730 .ok_or_else(|| missing("HttpMethod"))?
2731 .to_string();
2732 validate_enum(
2733 "httpMethod",
2734 &http_method,
2735 &["POST", "GET", "HEAD", "OPTIONS", "PUT", "PATCH", "DELETE"],
2736 )?;
2737 let rate_limit = body["InvocationRateLimitPerSecond"].as_i64();
2738 if let Some(r) = rate_limit {
2739 validate_range_i64("invocationRateLimitPerSecond", r, 1, i64::MAX)?;
2740 }
2741
2742 let mut state = self.state.write();
2743 let now = Utc::now();
2744 let dest_uuid = uuid::Uuid::new_v4();
2745 let arn = format!(
2746 "arn:aws:events:{}:{}:api-destination/{}/{}",
2747 req.region, state.account_id, name, dest_uuid
2748 );
2749
2750 let dest = ApiDestination {
2751 name: name.clone(),
2752 arn: arn.clone(),
2753 description,
2754 connection_arn,
2755 invocation_endpoint: endpoint,
2756 http_method,
2757 invocation_rate_limit_per_second: rate_limit,
2758 state: "ACTIVE".to_string(),
2759 creation_time: now,
2760 last_modified_time: now,
2761 };
2762 state.api_destinations.insert(name, dest);
2763
2764 Ok(AwsResponse::ok_json(json!({
2765 "ApiDestinationArn": arn,
2766 "ApiDestinationState": "ACTIVE",
2767 "CreationTime": now.timestamp() as f64,
2768 "LastModifiedTime": now.timestamp() as f64,
2769 })))
2770 }
2771
2772 fn describe_api_destination(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2773 let body = req.json_body();
2774 validate_required("Name", &body["Name"])?;
2775 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
2776 validate_string_length("name", name, 1, 64)?;
2777
2778 let state = self.state.read();
2779 let dest = state.api_destinations.get(name).ok_or_else(|| {
2780 AwsServiceError::aws_error(
2781 StatusCode::BAD_REQUEST,
2782 "ResourceNotFoundException",
2783 format!("An api-destination '{name}' does not exist."),
2784 )
2785 })?;
2786
2787 let mut resp = json!({
2788 "ApiDestinationArn": dest.arn,
2789 "Name": dest.name,
2790 "ConnectionArn": dest.connection_arn,
2791 "InvocationEndpoint": dest.invocation_endpoint,
2792 "HttpMethod": dest.http_method,
2793 "ApiDestinationState": dest.state,
2794 "CreationTime": dest.creation_time.timestamp() as f64,
2795 "LastModifiedTime": dest.last_modified_time.timestamp() as f64,
2796 });
2797 if let Some(ref desc) = dest.description {
2798 resp["Description"] = json!(desc);
2799 }
2800 if let Some(rate) = dest.invocation_rate_limit_per_second {
2801 resp["InvocationRateLimitPerSecond"] = json!(rate);
2802 }
2803
2804 Ok(AwsResponse::ok_json(resp))
2805 }
2806
2807 fn list_api_destinations(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2808 let body = req.json_body();
2809 validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 64)?;
2810 validate_optional_string_length("connectionArn", body["ConnectionArn"].as_str(), 1, 1600)?;
2811 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
2812 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
2813
2814 let name_prefix = body["NamePrefix"].as_str();
2815 let connection_arn = body["ConnectionArn"].as_str();
2816 let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
2817
2818 let state = self.state.read();
2819 let all: Vec<Value> = state
2820 .api_destinations
2821 .values()
2822 .filter(|d| {
2823 if let Some(prefix) = name_prefix {
2824 if !d.name.starts_with(prefix) {
2825 return false;
2826 }
2827 }
2828 if let Some(arn) = connection_arn {
2829 if d.connection_arn != arn {
2830 return false;
2831 }
2832 }
2833 true
2834 })
2835 .map(|d| {
2836 let mut obj = json!({
2837 "ApiDestinationArn": d.arn,
2838 "Name": d.name,
2839 "ConnectionArn": d.connection_arn,
2840 "InvocationEndpoint": d.invocation_endpoint,
2841 "HttpMethod": d.http_method,
2842 "ApiDestinationState": d.state,
2843 "CreationTime": d.creation_time.timestamp() as f64,
2844 "LastModifiedTime": d.last_modified_time.timestamp() as f64,
2845 });
2846 if let Some(rate) = d.invocation_rate_limit_per_second {
2847 obj["InvocationRateLimitPerSecond"] = json!(rate);
2848 }
2849 obj
2850 })
2851 .collect();
2852
2853 let (dests, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
2854 let mut resp = json!({ "ApiDestinations": dests });
2855 if let Some(token) = next_token {
2856 resp["NextToken"] = json!(token);
2857 }
2858
2859 Ok(AwsResponse::ok_json(resp))
2860 }
2861
2862 fn update_api_destination(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2863 let body = req.json_body();
2864 validate_required("Name", &body["Name"])?;
2865 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
2866 validate_string_length("name", name, 1, 64)?;
2867 validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2868 validate_optional_string_length("connectionArn", body["ConnectionArn"].as_str(), 1, 1600)?;
2869 validate_optional_string_length(
2870 "invocationEndpoint",
2871 body["InvocationEndpoint"].as_str(),
2872 1,
2873 2048,
2874 )?;
2875 validate_optional_enum(
2876 "httpMethod",
2877 body["HttpMethod"].as_str(),
2878 &["POST", "GET", "HEAD", "OPTIONS", "PUT", "PATCH", "DELETE"],
2879 )?;
2880 if let Some(r) = body["InvocationRateLimitPerSecond"].as_i64() {
2881 validate_range_i64("invocationRateLimitPerSecond", r, 1, i64::MAX)?;
2882 }
2883
2884 let mut state = self.state.write();
2885 let dest = state.api_destinations.get_mut(name).ok_or_else(|| {
2886 AwsServiceError::aws_error(
2887 StatusCode::BAD_REQUEST,
2888 "ResourceNotFoundException",
2889 format!("An api-destination '{name}' does not exist."),
2890 )
2891 })?;
2892
2893 if let Some(desc) = body["Description"].as_str() {
2894 dest.description = Some(desc.to_string());
2895 }
2896 if let Some(endpoint) = body["InvocationEndpoint"].as_str() {
2897 dest.invocation_endpoint = endpoint.to_string();
2898 }
2899 if let Some(method) = body["HttpMethod"].as_str() {
2900 dest.http_method = method.to_string();
2901 }
2902 if let Some(rate) = body["InvocationRateLimitPerSecond"].as_i64() {
2903 dest.invocation_rate_limit_per_second = Some(rate);
2904 }
2905 if let Some(conn) = body["ConnectionArn"].as_str() {
2906 dest.connection_arn = conn.to_string();
2907 }
2908 dest.last_modified_time = Utc::now();
2909
2910 Ok(AwsResponse::ok_json(json!({
2911 "ApiDestinationArn": dest.arn,
2912 "ApiDestinationState": dest.state,
2913 "CreationTime": dest.creation_time.timestamp() as f64,
2914 "LastModifiedTime": dest.last_modified_time.timestamp() as f64,
2915 })))
2916 }
2917
2918 fn delete_api_destination(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2919 let body = req.json_body();
2920 validate_required("Name", &body["Name"])?;
2921 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
2922 validate_string_length("name", name, 1, 64)?;
2923
2924 let mut state = self.state.write();
2925 if !state.api_destinations.contains_key(name) {
2926 return Err(AwsServiceError::aws_error(
2927 StatusCode::BAD_REQUEST,
2928 "ResourceNotFoundException",
2929 format!("An api-destination '{name}' does not exist."),
2930 ));
2931 }
2932 state.api_destinations.remove(name);
2933
2934 Ok(AwsResponse::ok_json(json!({})))
2935 }
2936
2937 fn start_replay(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2940 let body = req.json_body();
2941 validate_required("ReplayName", &body["ReplayName"])?;
2942 let name = body["ReplayName"]
2943 .as_str()
2944 .ok_or_else(|| missing("ReplayName"))?
2945 .to_string();
2946 validate_string_length("replayName", &name, 1, 64)?;
2947 validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2948 validate_required("EventSourceArn", &body["EventSourceArn"])?;
2949 let description = body["Description"].as_str().map(|s| s.to_string());
2950 let event_source_arn = body["EventSourceArn"]
2951 .as_str()
2952 .ok_or_else(|| missing("EventSourceArn"))?
2953 .to_string();
2954 validate_string_length("eventSourceArn", &event_source_arn, 1, 1600)?;
2955 validate_required("EventStartTime", &body["EventStartTime"])?;
2956 validate_required("EventEndTime", &body["EventEndTime"])?;
2957 validate_required("Destination", &body["Destination"])?;
2958 let destination = body["Destination"].clone();
2959 let event_start_time_f = body["EventStartTime"].as_f64();
2960 let event_end_time_f = body["EventEndTime"].as_f64();
2961
2962 let event_start_time = event_start_time_f
2963 .and_then(|f| DateTime::from_timestamp(f as i64, 0))
2964 .unwrap_or_else(Utc::now);
2965 let event_end_time = event_end_time_f
2966 .and_then(|f| DateTime::from_timestamp(f as i64, 0))
2967 .unwrap_or_else(Utc::now);
2968
2969 let dest_arn = destination["Arn"].as_str().unwrap_or("");
2971 if !dest_arn.contains(":event-bus/") {
2972 return Err(AwsServiceError::aws_error(
2973 StatusCode::BAD_REQUEST,
2974 "ValidationException",
2975 "Parameter Destination.Arn is not valid. Reason: Must contain an event bus ARN.",
2976 ));
2977 }
2978
2979 let mut state = self.state.write();
2980
2981 let bus_name = state.resolve_bus_name(dest_arn);
2983 if !state.buses.contains_key(&bus_name) {
2984 return Err(AwsServiceError::aws_error(
2985 StatusCode::BAD_REQUEST,
2986 "ResourceNotFoundException",
2987 format!("Event bus {bus_name} does not exist."),
2988 ));
2989 }
2990
2991 let archive_name = event_source_arn
2993 .rsplit_once("archive/")
2994 .map(|(_, n)| n.to_string())
2995 .unwrap_or_default();
2996 if !state.archives.contains_key(&archive_name) {
2997 return Err(AwsServiceError::aws_error(
2998 StatusCode::BAD_REQUEST,
2999 "ValidationException",
3000 format!(
3001 "Parameter EventSourceArn is not valid. Reason: Archive {archive_name} does not exist."
3002 ),
3003 ));
3004 }
3005
3006 let archive = state.archives.get(&archive_name).unwrap();
3008 let archive_bus = state.resolve_bus_name(&archive.event_source_arn);
3009 if archive_bus != bus_name {
3010 return Err(AwsServiceError::aws_error(
3011 StatusCode::BAD_REQUEST,
3012 "ValidationException",
3013 "Parameter Destination.Arn is not valid. Reason: Cross event bus replay is not permitted.",
3014 ));
3015 }
3016
3017 if event_end_time <= event_start_time {
3019 return Err(AwsServiceError::aws_error(
3020 StatusCode::BAD_REQUEST,
3021 "ValidationException",
3022 "Parameter EventEndTime is not valid. Reason: EventStartTime must be before EventEndTime.",
3023 ));
3024 }
3025
3026 if state.replays.contains_key(&name) {
3028 return Err(AwsServiceError::aws_error(
3029 StatusCode::BAD_REQUEST,
3030 "ResourceAlreadyExistsException",
3031 format!("Replay {name} already exists."),
3032 ));
3033 }
3034
3035 let now = Utc::now();
3036 let arn = format!(
3037 "arn:aws:events:{}:{}:replay/{}",
3038 req.region, state.account_id, name
3039 );
3040
3041 let archive = state.archives.get(&archive_name).unwrap();
3043 let replay_events: Vec<PutEvent> = archive
3044 .events
3045 .iter()
3046 .filter(|e| e.time >= event_start_time && e.time < event_end_time)
3047 .cloned()
3048 .collect();
3049
3050 let mut events_to_deliver: Vec<(PutEvent, Vec<EventTarget>)> = Vec::new();
3052
3053 for event in &replay_events {
3054 let matching_targets: Vec<EventTarget> = state
3055 .rules
3056 .values()
3057 .filter(|r| {
3058 r.event_bus_name == bus_name
3059 && r.state == "ENABLED"
3060 && matches_pattern(
3061 r.event_pattern.as_deref(),
3062 &event.source,
3063 &event.detail_type,
3064 &event.detail,
3065 &req.account_id,
3066 &req.region,
3067 &event.resources,
3068 )
3069 })
3070 .flat_map(|r| r.targets.clone())
3071 .collect();
3072
3073 if !matching_targets.is_empty() {
3074 events_to_deliver.push((event.clone(), matching_targets));
3075 }
3076 }
3077
3078 let replay = Replay {
3079 name: name.clone(),
3080 arn: arn.clone(),
3081 description,
3082 event_source_arn,
3083 destination,
3084 event_start_time,
3085 event_end_time,
3086 state: "COMPLETED".to_string(),
3087 replay_start_time: now,
3088 replay_end_time: Some(now),
3089 };
3090 state.replays.insert(name, replay);
3091
3092 drop(state);
3094
3095 for (event, targets) in events_to_deliver {
3097 let detail_value: Value = serde_json::from_str(&event.detail).unwrap_or(json!({}));
3098 let event_json = json!({
3099 "version": "0",
3100 "id": event.event_id,
3101 "source": event.source,
3102 "account": req.account_id,
3103 "detail-type": event.detail_type,
3104 "detail": detail_value,
3105 "time": event.time.format("%Y-%m-%dT%H:%M:%SZ").to_string(),
3106 "region": req.region,
3107 "resources": event.resources,
3108 "replay-name": arn,
3109 });
3110 let event_str = event_json.to_string();
3111
3112 for target in targets {
3113 let target_arn = &target.arn;
3114 let body_str = if let Some(ref transformer) = target.input_transformer {
3115 apply_input_transformer(transformer, &event_json)
3116 } else if let Some(ref input) = target.input {
3117 input.clone()
3118 } else if let Some(ref input_path) = target.input_path {
3119 resolve_json_path(&event_json, input_path)
3120 .map(|v| v.to_string())
3121 .unwrap_or_else(|| event_str.clone())
3122 } else {
3123 event_str.clone()
3124 };
3125
3126 if target_arn.contains(":sqs:") {
3127 let group_id = target
3128 .sqs_parameters
3129 .as_ref()
3130 .and_then(|p| p["MessageGroupId"].as_str())
3131 .map(|s| s.to_string());
3132 if group_id.is_some() {
3133 self.delivery.send_to_sqs_with_attrs(
3134 target_arn,
3135 &body_str,
3136 &HashMap::new(),
3137 group_id.as_deref(),
3138 None,
3139 );
3140 } else {
3141 self.delivery
3142 .send_to_sqs(target_arn, &body_str, &HashMap::new());
3143 }
3144 } else if target_arn.contains(":sns:") {
3145 self.delivery
3146 .publish_to_sns(target_arn, &body_str, Some(&event.detail_type));
3147 } else if target_arn.contains(":lambda:") {
3148 let mut state = self.state.write();
3149 state
3150 .lambda_invocations
3151 .push(crate::state::LambdaInvocation {
3152 function_arn: target_arn.clone(),
3153 payload: body_str.clone(),
3154 timestamp: Utc::now(),
3155 });
3156 drop(state);
3157 if let Some(ref ls) = self.lambda_state {
3158 ls.write().invocations.push(LambdaInvocation {
3159 function_arn: target_arn.clone(),
3160 payload: body_str.clone(),
3161 timestamp: Utc::now(),
3162 source: "aws:events".to_string(),
3163 });
3164 }
3165 invoke_lambda_async(
3166 &self.container_runtime,
3167 &self.lambda_state,
3168 target_arn,
3169 &body_str,
3170 );
3171 } else if target_arn.contains(":logs:") {
3172 let mut state = self.state.write();
3173 state.log_deliveries.push(crate::state::LogDelivery {
3174 log_group_arn: target_arn.clone(),
3175 payload: body_str.clone(),
3176 timestamp: Utc::now(),
3177 });
3178 drop(state);
3179 if let Some(ref log_state) = self.logs_state {
3180 deliver_to_logs(log_state, target_arn, &body_str, Utc::now());
3181 }
3182 } else if target_arn.contains(":states:") {
3183 let mut state = self.state.write();
3184 state
3185 .step_function_executions
3186 .push(crate::state::StepFunctionExecution {
3187 state_machine_arn: target_arn.clone(),
3188 payload: body_str.clone(),
3189 timestamp: Utc::now(),
3190 });
3191 } else if target_arn.starts_with("https://") || target_arn.starts_with("http://") {
3192 let url = target_arn.clone();
3193 let payload = body_str.clone();
3194 tokio::spawn(async move {
3195 let client = reqwest::Client::new();
3196 let _ = client
3197 .post(&url)
3198 .header("Content-Type", "application/json")
3199 .body(payload)
3200 .send()
3201 .await;
3202 });
3203 }
3204 }
3205 }
3206
3207 Ok(AwsResponse::ok_json(json!({
3208 "ReplayArn": arn,
3209 "ReplayStartTime": now.timestamp() as f64,
3210 "State": "STARTING",
3211 })))
3212 }
3213
3214 fn describe_replay(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3215 let body = req.json_body();
3216 validate_required("ReplayName", &body["ReplayName"])?;
3217 let name = body["ReplayName"]
3218 .as_str()
3219 .ok_or_else(|| missing("ReplayName"))?;
3220 validate_string_length("replayName", name, 1, 64)?;
3221
3222 let state = self.state.read();
3223 let replay = state.replays.get(name).ok_or_else(|| {
3224 AwsServiceError::aws_error(
3225 StatusCode::BAD_REQUEST,
3226 "ResourceNotFoundException",
3227 format!("Replay {name} does not exist."),
3228 )
3229 })?;
3230
3231 let mut resp = json!({
3232 "Destination": replay.destination,
3233 "EventSourceArn": replay.event_source_arn,
3234 "EventStartTime": replay.event_start_time.timestamp() as f64,
3235 "EventEndTime": replay.event_end_time.timestamp() as f64,
3236 "ReplayArn": replay.arn,
3237 "ReplayName": replay.name,
3238 "ReplayStartTime": replay.replay_start_time.timestamp() as f64,
3239 "State": replay.state,
3240 });
3241 if let Some(ref desc) = replay.description {
3242 resp["Description"] = json!(desc);
3243 }
3244 if let Some(ref end) = replay.replay_end_time {
3245 resp["ReplayEndTime"] = json!(end.timestamp() as f64);
3246 }
3247
3248 Ok(AwsResponse::ok_json(resp))
3249 }
3250
3251 fn list_replays(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3252 let body = req.json_body();
3253 validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 64)?;
3254 validate_optional_string_length(
3255 "eventSourceArn",
3256 body["EventSourceArn"].as_str(),
3257 1,
3258 1600,
3259 )?;
3260 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
3261 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
3262 let name_prefix = body["NamePrefix"].as_str();
3263 let source_arn = body["EventSourceArn"].as_str();
3264 let replay_state = body["State"].as_str();
3265
3266 let filter_count = [
3268 name_prefix.is_some(),
3269 source_arn.is_some(),
3270 replay_state.is_some(),
3271 ]
3272 .iter()
3273 .filter(|&&x| x)
3274 .count();
3275 if filter_count > 1 {
3276 return Err(AwsServiceError::aws_error(
3277 StatusCode::BAD_REQUEST,
3278 "ValidationException",
3279 "At most one filter is allowed for ListReplays. Use either : State, EventSourceArn, or NamePrefix.",
3280 ));
3281 }
3282
3283 if let Some(s) = replay_state {
3285 let valid = [
3286 "CANCELLED",
3287 "CANCELLING",
3288 "COMPLETED",
3289 "FAILED",
3290 "RUNNING",
3291 "STARTING",
3292 ];
3293 if !valid.contains(&s) {
3294 return Err(AwsServiceError::aws_error(
3295 StatusCode::BAD_REQUEST,
3296 "ValidationException",
3297 format!(
3298 "1 validation error detected: Value '{}' at 'state' failed to satisfy constraint: Member must satisfy enum value set: [CANCELLED, CANCELLING, COMPLETED, FAILED, RUNNING, STARTING]",
3299 s
3300 ),
3301 ));
3302 }
3303 }
3304
3305 let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
3306
3307 let state = self.state.read();
3308 let all: Vec<Value> = state
3309 .replays
3310 .values()
3311 .filter(|r| {
3312 if let Some(prefix) = name_prefix {
3313 r.name.starts_with(prefix)
3314 } else if let Some(arn) = source_arn {
3315 r.event_source_arn == arn
3316 } else if let Some(s) = replay_state {
3317 r.state == s
3318 } else {
3319 true
3320 }
3321 })
3322 .map(|r| {
3323 let mut obj = json!({
3324 "EventSourceArn": r.event_source_arn,
3325 "EventStartTime": r.event_start_time.timestamp() as f64,
3326 "EventEndTime": r.event_end_time.timestamp() as f64,
3327 "ReplayName": r.name,
3328 "ReplayStartTime": r.replay_start_time.timestamp() as f64,
3329 "State": r.state,
3330 });
3331 if let Some(ref end) = r.replay_end_time {
3332 obj["ReplayEndTime"] = json!(end.timestamp() as f64);
3333 }
3334 obj
3335 })
3336 .collect();
3337
3338 let (replays, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
3339 let mut resp = json!({ "Replays": replays });
3340 if let Some(token) = next_token {
3341 resp["NextToken"] = json!(token);
3342 }
3343
3344 Ok(AwsResponse::ok_json(resp))
3345 }
3346
3347 fn cancel_replay(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3348 let body = req.json_body();
3349 validate_required("ReplayName", &body["ReplayName"])?;
3350 let name = body["ReplayName"]
3351 .as_str()
3352 .ok_or_else(|| missing("ReplayName"))?;
3353 validate_string_length("replayName", name, 1, 64)?;
3354
3355 let mut state = self.state.write();
3356 let replay = state.replays.get_mut(name).ok_or_else(|| {
3357 AwsServiceError::aws_error(
3358 StatusCode::BAD_REQUEST,
3359 "ResourceNotFoundException",
3360 format!("Replay {name} does not exist."),
3361 )
3362 })?;
3363
3364 if replay.state == "CANCELLED" || replay.state == "CANCELLING" {
3366 return Err(AwsServiceError::aws_error(
3367 StatusCode::BAD_REQUEST,
3368 "IllegalStatusException",
3369 format!("Replay {name} is not in a valid state for this operation."),
3370 ));
3371 }
3372
3373 let arn = replay.arn.clone();
3374 replay.state = "CANCELLED".to_string();
3375
3376 Ok(AwsResponse::ok_json(json!({
3377 "ReplayArn": arn,
3378 "State": "CANCELLING",
3379 })))
3380 }
3381}
3382
3383fn find_tags_mut<'a>(
3386 state: &'a mut crate::state::EventBridgeState,
3387 arn: &str,
3388) -> Result<&'a mut HashMap<String, String>, AwsServiceError> {
3389 for bus in state.buses.values_mut() {
3391 if bus.arn == arn {
3392 return Ok(&mut bus.tags);
3393 }
3394 }
3395 for rule in state.rules.values_mut() {
3397 if rule.arn == arn {
3398 return Ok(&mut rule.tags);
3399 }
3400 }
3401
3402 let error_msg = if arn.contains(":rule/") {
3404 let parts: Vec<&str> = arn.rsplitn(2, ":rule/").collect();
3406 if let Some(rule_path) = parts.first() {
3407 if let Some((bus, rule_name)) = rule_path.rsplit_once('/') {
3408 format!("Rule {rule_name} does not exist on EventBus {bus}.")
3409 } else {
3410 format!("Rule {} does not exist on EventBus default.", rule_path)
3411 }
3412 } else {
3413 format!("Resource {arn} not found.")
3414 }
3415 } else {
3416 format!("Resource {arn} not found.")
3417 };
3418
3419 Err(AwsServiceError::aws_error(
3420 StatusCode::BAD_REQUEST,
3421 "ResourceNotFoundException",
3422 error_msg,
3423 ))
3424}
3425
3426fn find_tags<'a>(
3427 state: &'a crate::state::EventBridgeState,
3428 arn: &str,
3429) -> Result<&'a HashMap<String, String>, AwsServiceError> {
3430 for bus in state.buses.values() {
3431 if bus.arn == arn {
3432 return Ok(&bus.tags);
3433 }
3434 }
3435 for rule in state.rules.values() {
3436 if rule.arn == arn {
3437 return Ok(&rule.tags);
3438 }
3439 }
3440
3441 let error_msg = if arn.contains(":rule/") {
3442 let parts: Vec<&str> = arn.rsplitn(2, ":rule/").collect();
3443 if let Some(rule_path) = parts.first() {
3444 if let Some((bus, rule_name)) = rule_path.rsplit_once('/') {
3445 format!("Rule {rule_name} does not exist on EventBus {bus}.")
3446 } else {
3447 format!("Rule {} does not exist on EventBus default.", rule_path)
3448 }
3449 } else {
3450 format!("Resource {arn} not found.")
3451 }
3452 } else {
3453 format!("Resource {arn} not found.")
3454 };
3455
3456 Err(AwsServiceError::aws_error(
3457 StatusCode::BAD_REQUEST,
3458 "ResourceNotFoundException",
3459 error_msg,
3460 ))
3461}
3462
3463fn validate_event_pattern(pattern: &str) -> Result<(), AwsServiceError> {
3466 let parsed: Value = serde_json::from_str(pattern).map_err(|_| {
3467 AwsServiceError::aws_error(
3468 StatusCode::BAD_REQUEST,
3469 "InvalidEventPatternException",
3470 "Event pattern is not valid. Reason: Invalid JSON",
3471 )
3472 })?;
3473
3474 validate_pattern_values(&parsed, "")?;
3475 Ok(())
3476}
3477
3478fn validate_pattern_values(value: &Value, path: &str) -> Result<(), AwsServiceError> {
3479 match value {
3480 Value::Object(obj) => {
3481 for (key, val) in obj {
3482 let new_path = if path.is_empty() {
3483 key.clone()
3484 } else {
3485 format!("{path}.{key}")
3486 };
3487 match val {
3488 Value::Object(_) => validate_pattern_values(val, &new_path)?,
3489 Value::Array(_) => {} _ => {
3491 return Err(AwsServiceError::aws_error(
3492 StatusCode::BAD_REQUEST,
3493 "InvalidEventPatternException",
3494 format!(
3495 "Event pattern is not valid. Reason: '{}' must be an object or an array",
3496 key
3497 ),
3498 ));
3499 }
3500 }
3501 }
3502 Ok(())
3503 }
3504 _ => Ok(()),
3505 }
3506}
3507
3508fn build_auth_params_response(auth_type: &str, params: &Value) -> Value {
3511 match auth_type {
3512 "API_KEY" => {
3513 let mut resp = json!({});
3514 if let Some(api_key) = params.get("ApiKeyAuthParameters") {
3515 resp["ApiKeyAuthParameters"] = json!({
3516 "ApiKeyName": api_key["ApiKeyName"],
3517 });
3518 }
3519 resp
3520 }
3521 "BASIC" => {
3522 let mut resp = json!({});
3523 if let Some(basic) = params.get("BasicAuthParameters") {
3524 resp["BasicAuthParameters"] = json!({
3525 "Username": basic["Username"],
3526 });
3527 }
3528 resp
3529 }
3530 "OAUTH_CLIENT_CREDENTIALS" => {
3531 let mut resp = json!({});
3532 if let Some(oauth) = params.get("OAuthParameters") {
3533 resp["OAuthParameters"] = json!({
3534 "AuthorizationEndpoint": oauth["AuthorizationEndpoint"],
3535 "HttpMethod": oauth["HttpMethod"],
3536 "ClientParameters": {
3537 "ClientID": oauth.get("ClientParameters").and_then(|c| c.get("ClientID")),
3538 },
3539 });
3540 }
3541 resp
3542 }
3543 _ => params.clone(),
3544 }
3545}
3546
3547pub fn matches_pattern(
3551 pattern_json: Option<&str>,
3552 source: &str,
3553 detail_type: &str,
3554 detail: &str,
3555 account: &str,
3556 region: &str,
3557 resources: &[String],
3558) -> bool {
3559 let pattern_json = match pattern_json {
3560 Some(p) => p,
3561 None => return true,
3562 };
3563
3564 let pattern: Value = match serde_json::from_str(pattern_json) {
3565 Ok(v) => v,
3566 Err(_) => return false,
3567 };
3568
3569 let pattern_obj = match pattern.as_object() {
3570 Some(o) => o,
3571 None => return false,
3572 };
3573
3574 let detail_value: Value = serde_json::from_str(detail).unwrap_or(json!({}));
3575 let event = json!({
3576 "source": source,
3577 "detail-type": detail_type,
3578 "detail": detail_value,
3579 "account": account,
3580 "region": region,
3581 "resources": resources,
3582 });
3583
3584 for (key, pattern_value) in pattern_obj {
3585 let event_value = &event[key];
3586 if !matches_value(pattern_value, event_value) {
3587 return false;
3588 }
3589 }
3590
3591 true
3592}
3593
3594fn matches_value(pattern: &Value, event_value: &Value) -> bool {
3595 match pattern {
3596 Value::Object(obj) => {
3597 for (key, sub_pattern) in obj {
3598 let sub_value = &event_value[key];
3599 if !matches_value(sub_pattern, sub_value) {
3600 return false;
3601 }
3602 }
3603 true
3604 }
3605 Value::Array(arr) => arr.iter().any(|elem| matches_single(elem, event_value)),
3606 _ => false,
3607 }
3608}
3609
3610fn matches_single(pattern_elem: &Value, event_value: &Value) -> bool {
3611 match pattern_elem {
3612 Value::Object(obj) => {
3613 if let Some(prefix_val) = obj.get("prefix") {
3614 if let (Some(prefix), Some(actual)) = (prefix_val.as_str(), event_value.as_str()) {
3615 return actual.starts_with(prefix);
3616 }
3617 return false;
3618 }
3619 if let Some(exists_val) = obj.get("exists") {
3620 let should_exist = exists_val.as_bool().unwrap_or(true);
3621 let does_exist = !event_value.is_null();
3622 return should_exist == does_exist;
3623 }
3624 if let Some(anything_but_val) = obj.get("anything-but") {
3625 return match anything_but_val {
3626 Value::String(s) => event_value.as_str() != Some(s.as_str()),
3627 Value::Array(arr) => !arr.iter().any(|v| values_equal(v, event_value)),
3628 Value::Number(_) => event_value != anything_but_val,
3629 _ => true,
3630 };
3631 }
3632 if let Some(numeric_val) = obj.get("numeric") {
3633 return matches_numeric(numeric_val, event_value);
3634 }
3635 false
3636 }
3637 _ => values_equal(pattern_elem, event_value),
3638 }
3639}
3640
3641fn matches_numeric(numeric_arr: &Value, event_value: &Value) -> bool {
3642 let arr = match numeric_arr.as_array() {
3643 Some(a) => a,
3644 None => return false,
3645 };
3646 let actual = match event_value.as_f64() {
3647 Some(n) => n,
3648 None => return false,
3649 };
3650 let mut i = 0;
3651 while i + 1 < arr.len() {
3652 let op = match arr[i].as_str() {
3653 Some(s) => s,
3654 None => return false,
3655 };
3656 let threshold = match arr[i + 1].as_f64() {
3657 Some(n) => n,
3658 None => return false,
3659 };
3660 let ok = match op {
3661 ">" => actual > threshold,
3662 ">=" => actual >= threshold,
3663 "<" => actual < threshold,
3664 "<=" => actual <= threshold,
3665 "=" => (actual - threshold).abs() < f64::EPSILON,
3666 _ => return false,
3667 };
3668 if !ok {
3669 return false;
3670 }
3671 i += 2;
3672 }
3673 true
3674}
3675
3676fn values_equal(a: &Value, b: &Value) -> bool {
3677 a == b
3678}
3679
3680fn resolve_json_path(event: &Value, path: &str) -> Option<Value> {
3682 let path = path.strip_prefix('$').unwrap_or(path);
3683 let mut current = event;
3684 for segment in path.split('.') {
3685 if segment.is_empty() {
3686 continue;
3687 }
3688 current = current.get(segment)?;
3689 }
3690 Some(current.clone())
3691}
3692
3693fn apply_input_transformer(transformer: &Value, event: &Value) -> String {
3695 let input_paths_map = transformer
3696 .get("InputPathsMap")
3697 .and_then(|v| v.as_object())
3698 .cloned()
3699 .unwrap_or_default();
3700 let template = transformer
3701 .get("InputTemplate")
3702 .and_then(|v| v.as_str())
3703 .unwrap_or("")
3704 .to_string();
3705
3706 let mut resolved: HashMap<String, Value> = HashMap::new();
3708 for (var_name, path_val) in &input_paths_map {
3709 if let Some(path_str) = path_val.as_str() {
3710 if let Some(val) = resolve_json_path(event, path_str) {
3711 resolved.insert(var_name.clone(), val);
3712 }
3713 }
3714 }
3715
3716 let mut result = template;
3718 for (var_name, val) in &resolved {
3719 let placeholder = format!("<{var_name}>");
3720 let replacement = match val {
3721 Value::String(s) => s.clone(),
3722 other => other.to_string(),
3723 };
3724 result = result.replace(&placeholder, &replacement);
3725 }
3726
3727 result
3728}
3729
3730fn missing(name: &str) -> AwsServiceError {
3731 AwsServiceError::aws_error(
3732 StatusCode::BAD_REQUEST,
3733 "ValidationException",
3734 format!("The request must contain the parameter {name}"),
3735 )
3736}
3737
3738fn function_name_from_arn(arn: &str) -> &str {
3743 let parts: Vec<&str> = arn.split(':').collect();
3744 if parts.len() >= 7 && parts[5] == "function" {
3745 parts[6]
3746 } else {
3747 arn
3748 }
3749}
3750
3751pub fn invoke_lambda_async(
3754 container_runtime: &Option<Arc<ContainerRuntime>>,
3755 lambda_state: &Option<SharedLambdaState>,
3756 function_arn: &str,
3757 payload: &str,
3758) {
3759 let runtime = match container_runtime {
3760 Some(rt) => rt.clone(),
3761 None => return,
3762 };
3763 let lambda_state = match lambda_state {
3764 Some(ls) => ls.clone(),
3765 None => return,
3766 };
3767 let func_name = function_name_from_arn(function_arn).to_string();
3768 let payload = payload.as_bytes().to_vec();
3769
3770 tokio::spawn(async move {
3771 let func = {
3772 let state = lambda_state.read();
3773 state.functions.get(&func_name).cloned()
3774 };
3775 let func = match func {
3776 Some(f) => f,
3777 None => {
3778 tracing::warn!(
3779 function = %func_name,
3780 "EventBridge Lambda target not found, skipping invocation"
3781 );
3782 return;
3783 }
3784 };
3785 match runtime.invoke(&func, &payload).await {
3786 Ok(_) => {
3787 tracing::info!(function = %func_name, "EventBridge Lambda invocation succeeded");
3788 }
3789 Err(e) => {
3790 tracing::warn!(
3791 function = %func_name,
3792 error = %e,
3793 "EventBridge Lambda invocation failed"
3794 );
3795 }
3796 }
3797 });
3798}
3799
3800pub fn deliver_to_logs(
3803 logs_state: &SharedLogsState,
3804 log_group_arn: &str,
3805 payload: &str,
3806 timestamp: chrono::DateTime<chrono::Utc>,
3807) {
3808 let group_name = if log_group_arn.contains(":log-group:") {
3811 log_group_arn
3812 .split(":log-group:")
3813 .nth(1)
3814 .unwrap_or(log_group_arn)
3815 .trim_end_matches(":*")
3816 } else {
3817 log_group_arn
3818 };
3819
3820 let stream_name = "events".to_string();
3821 let ts_millis = timestamp.timestamp_millis();
3822
3823 let mut state = logs_state.write();
3824 let region = state.region.clone();
3825 let account_id = state.account_id.clone();
3826
3827 let group = state
3829 .log_groups
3830 .entry(group_name.to_string())
3831 .or_insert_with(|| fakecloud_logs::state::LogGroup {
3832 name: group_name.to_string(),
3833 arn: Arn::new(
3834 "logs",
3835 ®ion,
3836 &account_id,
3837 &format!("log-group:{group_name}"),
3838 )
3839 .to_string(),
3840 creation_time: ts_millis,
3841 retention_in_days: None,
3842 kms_key_id: None,
3843 tags: HashMap::new(),
3844 log_streams: HashMap::new(),
3845 stored_bytes: 0,
3846 subscription_filters: Vec::new(),
3847 data_protection_policy: None,
3848 index_policies: Vec::new(),
3849 transformer: None,
3850 deletion_protection: false,
3851 });
3852
3853 let stream = group
3854 .log_streams
3855 .entry(stream_name.clone())
3856 .or_insert_with(|| fakecloud_logs::state::LogStream {
3857 name: stream_name,
3858 arn: format!("{}:log-stream:events", group.arn),
3859 creation_time: ts_millis,
3860 first_event_timestamp: None,
3861 last_event_timestamp: None,
3862 last_ingestion_time: None,
3863 upload_sequence_token: "1".to_string(),
3864 events: Vec::new(),
3865 });
3866
3867 stream.events.push(fakecloud_logs::state::LogEvent {
3868 timestamp: ts_millis,
3869 message: payload.to_string(),
3870 ingestion_time: ts_millis,
3871 });
3872 stream.last_event_timestamp = Some(ts_millis);
3873 stream.last_ingestion_time = Some(ts_millis);
3874 if stream.first_event_timestamp.is_none() {
3875 stream.first_event_timestamp = Some(ts_millis);
3876 }
3877}
3878
3879fn apply_connection_auth(
3881 mut builder: reqwest::RequestBuilder,
3882 conn: &Connection,
3883) -> reqwest::RequestBuilder {
3884 match conn.authorization_type.as_str() {
3885 "API_KEY" => {
3886 if let Some(params) = conn.auth_parameters.get("ApiKeyAuthParameters") {
3887 if let (Some(name), Some(value)) = (
3888 params["ApiKeyName"].as_str(),
3889 params["ApiKeyValue"].as_str(),
3890 ) {
3891 builder = builder.header(name, value);
3892 }
3893 }
3894 }
3895 "BASIC" => {
3896 if let Some(params) = conn.auth_parameters.get("BasicAuthParameters") {
3897 if let (Some(user), Some(pass)) =
3898 (params["Username"].as_str(), params["Password"].as_str())
3899 {
3900 builder = builder.basic_auth(user, Some(pass));
3901 }
3902 }
3903 }
3904 "OAUTH_CLIENT_CREDENTIALS" => {
3905 if let Some(params) = conn.auth_parameters.get("OAuthParameters") {
3908 if let (Some(client_id), Some(client_secret)) = (
3909 params["ClientParameters"]["ClientID"].as_str(),
3910 params["ClientParameters"]["ClientSecret"].as_str(),
3911 ) {
3912 builder = builder.basic_auth(client_id, Some(client_secret));
3913 }
3914 }
3915 }
3916 _ => {}
3917 }
3918 builder
3919}
3920
3921#[cfg(test)]
3922mod tests {
3923 use super::*;
3924
3925 fn test_matches(
3927 pattern_json: Option<&str>,
3928 source: &str,
3929 detail_type: &str,
3930 detail: &str,
3931 ) -> bool {
3932 matches_pattern(
3933 pattern_json,
3934 source,
3935 detail_type,
3936 detail,
3937 "123456789012",
3938 "us-east-1",
3939 &[],
3940 )
3941 }
3942
3943 #[test]
3944 fn pattern_matches_source() {
3945 assert!(test_matches(
3946 Some(r#"{"source": ["my.app"]}"#),
3947 "my.app",
3948 "OrderPlaced",
3949 "{}"
3950 ));
3951 assert!(!test_matches(
3952 Some(r#"{"source": ["other.app"]}"#),
3953 "my.app",
3954 "OrderPlaced",
3955 "{}"
3956 ));
3957 }
3958
3959 #[test]
3960 fn pattern_matches_detail_type() {
3961 assert!(test_matches(
3962 Some(r#"{"detail-type": ["OrderPlaced"]}"#),
3963 "my.app",
3964 "OrderPlaced",
3965 "{}"
3966 ));
3967 assert!(!test_matches(
3968 Some(r#"{"detail-type": ["OrderShipped"]}"#),
3969 "my.app",
3970 "OrderPlaced",
3971 "{}"
3972 ));
3973 }
3974
3975 #[test]
3976 fn pattern_matches_detail_field() {
3977 assert!(test_matches(
3978 Some(r#"{"detail": {"status": ["ACTIVE"]}}"#),
3979 "my.app",
3980 "StatusChange",
3981 r#"{"status": "ACTIVE"}"#
3982 ));
3983 assert!(!test_matches(
3984 Some(r#"{"detail": {"status": ["ACTIVE"]}}"#),
3985 "my.app",
3986 "StatusChange",
3987 r#"{"status": "INACTIVE"}"#
3988 ));
3989 }
3990
3991 #[test]
3992 fn no_pattern_matches_everything() {
3993 assert!(test_matches(None, "any", "any", "{}"));
3994 }
3995
3996 #[test]
3997 fn combined_pattern() {
3998 let pattern = r#"{"source": ["orders"], "detail-type": ["OrderPlaced"]}"#;
3999 assert!(test_matches(Some(pattern), "orders", "OrderPlaced", "{}"));
4000 assert!(!test_matches(Some(pattern), "orders", "OrderShipped", "{}"));
4001 assert!(!test_matches(Some(pattern), "other", "OrderPlaced", "{}"));
4002 }
4003
4004 #[test]
4005 fn nested_detail_pattern() {
4006 let pattern = r#"{"detail": {"order": {"status": ["PLACED"]}}}"#;
4007 assert!(test_matches(
4008 Some(pattern),
4009 "my.app",
4010 "OrderEvent",
4011 r#"{"order": {"status": "PLACED", "id": "123"}}"#
4012 ));
4013 assert!(!test_matches(
4014 Some(pattern),
4015 "my.app",
4016 "OrderEvent",
4017 r#"{"order": {"status": "SHIPPED", "id": "123"}}"#
4018 ));
4019 assert!(!test_matches(
4020 Some(pattern),
4021 "my.app",
4022 "OrderEvent",
4023 r#"{"order": {"id": "123"}}"#
4024 ));
4025 }
4026
4027 #[test]
4028 fn deeply_nested_detail_pattern() {
4029 let pattern = r#"{"detail": {"a": {"b": {"c": ["deep"]}}}}"#;
4030 assert!(test_matches(
4031 Some(pattern),
4032 "src",
4033 "type",
4034 r#"{"a": {"b": {"c": "deep"}}}"#
4035 ));
4036 assert!(!test_matches(
4037 Some(pattern),
4038 "src",
4039 "type",
4040 r#"{"a": {"b": {"c": "shallow"}}}"#
4041 ));
4042 }
4043
4044 #[test]
4045 fn prefix_matcher() {
4046 let pattern = r#"{"source": [{"prefix": "com.myapp"}]}"#;
4047 assert!(test_matches(
4048 Some(pattern),
4049 "com.myapp.orders",
4050 "OrderPlaced",
4051 "{}"
4052 ));
4053 assert!(test_matches(
4054 Some(pattern),
4055 "com.myapp",
4056 "OrderPlaced",
4057 "{}"
4058 ));
4059 assert!(!test_matches(
4060 Some(pattern),
4061 "com.other",
4062 "OrderPlaced",
4063 "{}"
4064 ));
4065 }
4066
4067 #[test]
4068 fn prefix_matcher_in_detail() {
4069 let pattern = r#"{"detail": {"region": [{"prefix": "us-"}]}}"#;
4070 assert!(test_matches(
4071 Some(pattern),
4072 "src",
4073 "type",
4074 r#"{"region": "us-east-1"}"#
4075 ));
4076 assert!(!test_matches(
4077 Some(pattern),
4078 "src",
4079 "type",
4080 r#"{"region": "eu-west-1"}"#
4081 ));
4082 }
4083
4084 #[test]
4085 fn exists_matcher() {
4086 let pattern = r#"{"detail": {"error": [{"exists": true}]}}"#;
4087 assert!(test_matches(
4088 Some(pattern),
4089 "src",
4090 "type",
4091 r#"{"error": "something broke"}"#
4092 ));
4093 assert!(!test_matches(
4094 Some(pattern),
4095 "src",
4096 "type",
4097 r#"{"status": "ok"}"#
4098 ));
4099
4100 let pattern = r#"{"detail": {"error": [{"exists": false}]}}"#;
4101 assert!(test_matches(
4102 Some(pattern),
4103 "src",
4104 "type",
4105 r#"{"status": "ok"}"#
4106 ));
4107 assert!(!test_matches(
4108 Some(pattern),
4109 "src",
4110 "type",
4111 r#"{"error": "something broke"}"#
4112 ));
4113 }
4114
4115 #[test]
4116 fn anything_but_matcher() {
4117 let pattern = r#"{"source": [{"anything-but": "internal"}]}"#;
4118 assert!(test_matches(Some(pattern), "external", "Event", "{}"));
4119 assert!(!test_matches(Some(pattern), "internal", "Event", "{}"));
4120
4121 let pattern = r#"{"source": [{"anything-but": ["internal", "test"]}]}"#;
4122 assert!(test_matches(Some(pattern), "external", "Event", "{}"));
4123 assert!(!test_matches(Some(pattern), "internal", "Event", "{}"));
4124 assert!(!test_matches(Some(pattern), "test", "Event", "{}"));
4125 }
4126
4127 #[test]
4128 fn anything_but_in_detail() {
4129 let pattern = r#"{"detail": {"env": [{"anything-but": "prod"}]}}"#;
4130 assert!(test_matches(
4131 Some(pattern),
4132 "src",
4133 "type",
4134 r#"{"env": "staging"}"#
4135 ));
4136 assert!(!test_matches(
4137 Some(pattern),
4138 "src",
4139 "type",
4140 r#"{"env": "prod"}"#
4141 ));
4142 }
4143
4144 #[test]
4145 fn numeric_greater_than() {
4146 let pattern = r#"{"detail": {"count": [{"numeric": [">", 100]}]}}"#;
4147 assert!(test_matches(
4148 Some(pattern),
4149 "src",
4150 "type",
4151 r#"{"count": 150}"#
4152 ));
4153 assert!(!test_matches(
4154 Some(pattern),
4155 "src",
4156 "type",
4157 r#"{"count": 100}"#
4158 ));
4159 assert!(!test_matches(
4160 Some(pattern),
4161 "src",
4162 "type",
4163 r#"{"count": 50}"#
4164 ));
4165 }
4166
4167 #[test]
4168 fn numeric_less_than() {
4169 let pattern = r#"{"detail": {"count": [{"numeric": ["<", 10]}]}}"#;
4170 assert!(test_matches(
4171 Some(pattern),
4172 "src",
4173 "type",
4174 r#"{"count": 5}"#
4175 ));
4176 assert!(!test_matches(
4177 Some(pattern),
4178 "src",
4179 "type",
4180 r#"{"count": 10}"#
4181 ));
4182 assert!(!test_matches(
4183 Some(pattern),
4184 "src",
4185 "type",
4186 r#"{"count": 15}"#
4187 ));
4188 }
4189
4190 #[test]
4191 fn numeric_range() {
4192 let pattern = r#"{"detail": {"count": [{"numeric": [">=", 50, "<", 200]}]}}"#;
4193 assert!(test_matches(
4194 Some(pattern),
4195 "src",
4196 "type",
4197 r#"{"count": 50}"#
4198 ));
4199 assert!(test_matches(
4200 Some(pattern),
4201 "src",
4202 "type",
4203 r#"{"count": 100}"#
4204 ));
4205 assert!(!test_matches(
4206 Some(pattern),
4207 "src",
4208 "type",
4209 r#"{"count": 200}"#
4210 ));
4211 assert!(!test_matches(
4212 Some(pattern),
4213 "src",
4214 "type",
4215 r#"{"count": 49}"#
4216 ));
4217 }
4218
4219 #[test]
4220 fn mixed_matchers_and_literals() {
4221 let pattern = r#"{"source": ["exact.match", {"prefix": "com.myapp"}]}"#;
4222 assert!(test_matches(Some(pattern), "exact.match", "Event", "{}"));
4223 assert!(test_matches(
4224 Some(pattern),
4225 "com.myapp.orders",
4226 "Event",
4227 "{}"
4228 ));
4229 assert!(!test_matches(Some(pattern), "other.source", "Event", "{}"));
4230 }
4231
4232 use crate::state::EventBridgeState;
4235 use fakecloud_core::delivery::DeliveryBus;
4236 use parking_lot::RwLock;
4237
4238 fn make_service() -> EventBridgeService {
4239 let state = Arc::new(RwLock::new(EventBridgeState::new(
4240 "123456789012",
4241 "us-east-1",
4242 )));
4243 let delivery = Arc::new(DeliveryBus::new());
4244 EventBridgeService::new(state, delivery)
4245 }
4246
4247 fn make_request(action: &str, body: Value) -> AwsRequest {
4248 AwsRequest {
4249 service: "events".to_string(),
4250 action: action.to_string(),
4251 region: "us-east-1".to_string(),
4252 account_id: "123456789012".to_string(),
4253 request_id: "test-id".to_string(),
4254 headers: http::HeaderMap::new(),
4255 query_params: HashMap::new(),
4256 body: serde_json::to_vec(&body).unwrap().into(),
4257 path_segments: vec![],
4258 raw_path: "/".to_string(),
4259 raw_query: String::new(),
4260 method: http::Method::POST,
4261 is_query_protocol: false,
4262 access_key_id: None,
4263 }
4264 }
4265
4266 fn create_connection(svc: &EventBridgeService, name: &str) {
4267 let req = make_request(
4268 "CreateConnection",
4269 json!({
4270 "Name": name,
4271 "AuthorizationType": "API_KEY",
4272 "AuthParameters": {
4273 "ApiKeyAuthParameters": {
4274 "ApiKeyName": "x-api-key",
4275 "ApiKeyValue": "secret"
4276 }
4277 }
4278 }),
4279 );
4280 svc.create_connection(&req).unwrap();
4281 }
4282
4283 fn create_api_destination(svc: &EventBridgeService, name: &str, conn_name: &str) {
4284 let conn_arn_field = {
4285 let state = svc.state.read();
4286 state.connections.get(conn_name).unwrap().arn.clone()
4287 };
4288 let req = make_request(
4289 "CreateApiDestination",
4290 json!({
4291 "Name": name,
4292 "ConnectionArn": conn_arn_field,
4293 "InvocationEndpoint": "https://example.com",
4294 "HttpMethod": "POST"
4295 }),
4296 );
4297 svc.create_api_destination(&req).unwrap();
4298 }
4299
4300 #[test]
4303 fn list_connections_returns_all_by_default() {
4304 let svc = make_service();
4305 create_connection(&svc, "conn-alpha");
4306 create_connection(&svc, "conn-beta");
4307 create_connection(&svc, "conn-gamma");
4308
4309 let req = make_request("ListConnections", json!({}));
4310 let resp = svc.list_connections(&req).unwrap();
4311 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4312 assert_eq!(body["Connections"].as_array().unwrap().len(), 3);
4313 assert!(body["NextToken"].is_null());
4314 }
4315
4316 #[test]
4317 fn list_connections_name_prefix_filter() {
4318 let svc = make_service();
4319 create_connection(&svc, "prod-conn-1");
4320 create_connection(&svc, "prod-conn-2");
4321 create_connection(&svc, "dev-conn-1");
4322
4323 let req = make_request("ListConnections", json!({ "NamePrefix": "prod-" }));
4324 let resp = svc.list_connections(&req).unwrap();
4325 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4326 let names: Vec<&str> = body["Connections"]
4327 .as_array()
4328 .unwrap()
4329 .iter()
4330 .map(|c| c["Name"].as_str().unwrap())
4331 .collect();
4332 assert_eq!(names.len(), 2);
4333 assert!(names.iter().all(|n| n.starts_with("prod-")));
4334 }
4335
4336 #[test]
4337 fn list_connections_state_filter() {
4338 let svc = make_service();
4339 create_connection(&svc, "conn-a");
4340 create_connection(&svc, "conn-b");
4341
4342 {
4344 let mut state = svc.state.write();
4345 state
4346 .connections
4347 .get_mut("conn-b")
4348 .unwrap()
4349 .connection_state = "DEAUTHORIZED".to_string();
4350 }
4351
4352 let req = make_request(
4353 "ListConnections",
4354 json!({ "ConnectionState": "AUTHORIZED" }),
4355 );
4356 let resp = svc.list_connections(&req).unwrap();
4357 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4358 let conns = body["Connections"].as_array().unwrap();
4359 assert_eq!(conns.len(), 1);
4360 assert_eq!(conns[0]["Name"].as_str().unwrap(), "conn-a");
4361 }
4362
4363 #[test]
4364 fn list_connections_pagination() {
4365 let svc = make_service();
4366 for i in 0..5 {
4367 create_connection(&svc, &format!("conn-{i:02}"));
4368 }
4369
4370 let req = make_request("ListConnections", json!({ "Limit": 2 }));
4372 let resp = svc.list_connections(&req).unwrap();
4373 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4374 assert_eq!(body["Connections"].as_array().unwrap().len(), 2);
4375 let token = body["NextToken"].as_str().unwrap();
4376 assert_eq!(token, "2");
4377
4378 let req = make_request("ListConnections", json!({ "Limit": 2, "NextToken": token }));
4380 let resp = svc.list_connections(&req).unwrap();
4381 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4382 assert_eq!(body["Connections"].as_array().unwrap().len(), 2);
4383 let token = body["NextToken"].as_str().unwrap();
4384 assert_eq!(token, "4");
4385
4386 let req = make_request("ListConnections", json!({ "Limit": 2, "NextToken": token }));
4388 let resp = svc.list_connections(&req).unwrap();
4389 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4390 assert_eq!(body["Connections"].as_array().unwrap().len(), 1);
4391 assert!(body["NextToken"].is_null());
4392 }
4393
4394 #[test]
4395 fn list_connections_pagination_with_filter() {
4396 let svc = make_service();
4397 for i in 0..4 {
4398 create_connection(&svc, &format!("prod-{i:02}"));
4399 }
4400 create_connection(&svc, "dev-00");
4401
4402 let req = make_request(
4403 "ListConnections",
4404 json!({ "NamePrefix": "prod-", "Limit": 2 }),
4405 );
4406 let resp = svc.list_connections(&req).unwrap();
4407 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4408 assert_eq!(body["Connections"].as_array().unwrap().len(), 2);
4409 assert!(body["NextToken"].as_str().is_some());
4410 }
4411
4412 #[test]
4415 fn list_api_destinations_returns_all_by_default() {
4416 let svc = make_service();
4417 create_connection(&svc, "my-conn");
4418 create_api_destination(&svc, "dest-alpha", "my-conn");
4419 create_api_destination(&svc, "dest-beta", "my-conn");
4420
4421 let req = make_request("ListApiDestinations", json!({}));
4422 let resp = svc.list_api_destinations(&req).unwrap();
4423 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4424 assert_eq!(body["ApiDestinations"].as_array().unwrap().len(), 2);
4425 assert!(body["NextToken"].is_null());
4426 }
4427
4428 #[test]
4429 fn list_api_destinations_name_prefix_filter() {
4430 let svc = make_service();
4431 create_connection(&svc, "my-conn");
4432 create_api_destination(&svc, "prod-dest-1", "my-conn");
4433 create_api_destination(&svc, "prod-dest-2", "my-conn");
4434 create_api_destination(&svc, "dev-dest-1", "my-conn");
4435
4436 let req = make_request("ListApiDestinations", json!({ "NamePrefix": "prod-" }));
4437 let resp = svc.list_api_destinations(&req).unwrap();
4438 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4439 let names: Vec<&str> = body["ApiDestinations"]
4440 .as_array()
4441 .unwrap()
4442 .iter()
4443 .map(|d| d["Name"].as_str().unwrap())
4444 .collect();
4445 assert_eq!(names.len(), 2);
4446 assert!(names.iter().all(|n| n.starts_with("prod-")));
4447 }
4448
4449 #[test]
4450 fn list_api_destinations_connection_arn_filter() {
4451 let svc = make_service();
4452 create_connection(&svc, "conn-a");
4453 create_connection(&svc, "conn-b");
4454 create_api_destination(&svc, "dest-1", "conn-a");
4455 create_api_destination(&svc, "dest-2", "conn-b");
4456 create_api_destination(&svc, "dest-3", "conn-a");
4457
4458 let conn_a_arn = {
4459 let state = svc.state.read();
4460 state.connections.get("conn-a").unwrap().arn.clone()
4461 };
4462
4463 let req = make_request(
4464 "ListApiDestinations",
4465 json!({ "ConnectionArn": conn_a_arn }),
4466 );
4467 let resp = svc.list_api_destinations(&req).unwrap();
4468 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4469 let names: Vec<&str> = body["ApiDestinations"]
4470 .as_array()
4471 .unwrap()
4472 .iter()
4473 .map(|d| d["Name"].as_str().unwrap())
4474 .collect();
4475 assert_eq!(names.len(), 2);
4476 assert!(names.contains(&"dest-1"));
4477 assert!(names.contains(&"dest-3"));
4478 }
4479
4480 #[test]
4481 fn list_api_destinations_pagination() {
4482 let svc = make_service();
4483 create_connection(&svc, "my-conn");
4484 for i in 0..5 {
4485 create_api_destination(&svc, &format!("dest-{i:02}"), "my-conn");
4486 }
4487
4488 let req = make_request("ListApiDestinations", json!({ "Limit": 2 }));
4490 let resp = svc.list_api_destinations(&req).unwrap();
4491 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4492 assert_eq!(body["ApiDestinations"].as_array().unwrap().len(), 2);
4493 let token = body["NextToken"].as_str().unwrap();
4494 assert_eq!(token, "2");
4495
4496 let req = make_request(
4498 "ListApiDestinations",
4499 json!({ "Limit": 2, "NextToken": token }),
4500 );
4501 let resp = svc.list_api_destinations(&req).unwrap();
4502 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4503 assert_eq!(body["ApiDestinations"].as_array().unwrap().len(), 2);
4504 let token = body["NextToken"].as_str().unwrap();
4505 assert_eq!(token, "4");
4506
4507 let req = make_request(
4509 "ListApiDestinations",
4510 json!({ "Limit": 2, "NextToken": token }),
4511 );
4512 let resp = svc.list_api_destinations(&req).unwrap();
4513 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4514 assert_eq!(body["ApiDestinations"].as_array().unwrap().len(), 1);
4515 assert!(body["NextToken"].is_null());
4516 }
4517
4518 fn create_event_bus(svc: &EventBridgeService, name: &str) {
4521 let req = make_request("CreateEventBus", json!({ "Name": name }));
4522 svc.create_event_bus(&req).unwrap();
4523 }
4524
4525 #[test]
4526 fn list_event_buses_pagination() {
4527 let svc = make_service();
4528 for i in 0..4 {
4530 create_event_bus(&svc, &format!("bus-{i:02}"));
4531 }
4532
4533 let req = make_request("ListEventBuses", json!({ "Limit": 2 }));
4535 let resp = svc.list_event_buses(&req).unwrap();
4536 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4537 assert_eq!(body["EventBuses"].as_array().unwrap().len(), 2);
4538 let token = body["NextToken"].as_str().unwrap();
4539 assert_eq!(token, "2");
4540
4541 let req = make_request("ListEventBuses", json!({ "Limit": 2, "NextToken": token }));
4543 let resp = svc.list_event_buses(&req).unwrap();
4544 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4545 assert_eq!(body["EventBuses"].as_array().unwrap().len(), 2);
4546 let token = body["NextToken"].as_str().unwrap();
4547 assert_eq!(token, "4");
4548
4549 let req = make_request("ListEventBuses", json!({ "Limit": 2, "NextToken": token }));
4551 let resp = svc.list_event_buses(&req).unwrap();
4552 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4553 assert_eq!(body["EventBuses"].as_array().unwrap().len(), 1);
4554 assert!(body["NextToken"].is_null());
4555 }
4556
4557 #[test]
4558 fn list_event_buses_no_pagination_returns_all() {
4559 let svc = make_service();
4560 create_event_bus(&svc, "bus-alpha");
4561 create_event_bus(&svc, "bus-beta");
4562
4563 let req = make_request("ListEventBuses", json!({}));
4564 let resp = svc.list_event_buses(&req).unwrap();
4565 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4566 assert_eq!(body["EventBuses"].as_array().unwrap().len(), 3);
4568 assert!(body["NextToken"].is_null());
4569 }
4570
4571 #[test]
4574 fn put_events_never_includes_endpoint_id_in_response() {
4575 let svc = make_service();
4576 let req = make_request(
4578 "PutEvents",
4579 json!({
4580 "EndpointId": "my-endpoint.abc123",
4581 "Entries": [{
4582 "Source": "my.source",
4583 "DetailType": "MyType",
4584 "Detail": "{}",
4585 "EventBusName": "default"
4586 }]
4587 }),
4588 );
4589 let resp = svc.put_events(&req).unwrap();
4590 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4591 assert!(
4592 !body.as_object().unwrap().contains_key("EndpointId"),
4593 "EndpointId should never be in the PutEvents response"
4594 );
4595 assert_eq!(body["FailedEntryCount"], 0);
4596 }
4597
4598 fn create_archive(svc: &EventBridgeService, name: &str) {
4601 let req = make_request(
4602 "CreateArchive",
4603 json!({
4604 "ArchiveName": name,
4605 "EventSourceArn": "arn:aws:events:us-east-1:123456789012:event-bus/default"
4606 }),
4607 );
4608 svc.create_archive(&req).unwrap();
4609 }
4610
4611 #[test]
4612 fn list_archives_pagination() {
4613 let svc = make_service();
4614 for i in 0..5 {
4615 create_archive(&svc, &format!("archive-{i:02}"));
4616 }
4617
4618 let req = make_request("ListArchives", json!({ "Limit": 2 }));
4620 let resp = svc.list_archives(&req).unwrap();
4621 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4622 assert_eq!(body["Archives"].as_array().unwrap().len(), 2);
4623 let token = body["NextToken"].as_str().unwrap();
4624 assert_eq!(token, "2");
4625
4626 let req = make_request("ListArchives", json!({ "Limit": 2, "NextToken": token }));
4628 let resp = svc.list_archives(&req).unwrap();
4629 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4630 assert_eq!(body["Archives"].as_array().unwrap().len(), 2);
4631 let token = body["NextToken"].as_str().unwrap();
4632 assert_eq!(token, "4");
4633
4634 let req = make_request("ListArchives", json!({ "Limit": 2, "NextToken": token }));
4636 let resp = svc.list_archives(&req).unwrap();
4637 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4638 assert_eq!(body["Archives"].as_array().unwrap().len(), 1);
4639 assert!(body["NextToken"].is_null());
4640 }
4641
4642 fn create_replay(svc: &EventBridgeService, name: &str) {
4645 let archive_arn = {
4647 let state = svc.state.read();
4648 if state.archives.contains_key("replay-archive") {
4649 state.archives["replay-archive"].arn.clone()
4650 } else {
4651 drop(state);
4652 create_archive(svc, "replay-archive");
4653 svc.state.read().archives["replay-archive"].arn.clone()
4654 }
4655 };
4656 let req = make_request(
4657 "StartReplay",
4658 json!({
4659 "ReplayName": name,
4660 "EventSourceArn": archive_arn,
4661 "EventStartTime": 1000000.0,
4662 "EventEndTime": 2000000.0,
4663 "Destination": {
4664 "Arn": "arn:aws:events:us-east-1:123456789012:event-bus/default"
4665 }
4666 }),
4667 );
4668 svc.start_replay(&req).unwrap();
4669 }
4670
4671 #[test]
4672 fn list_replays_pagination() {
4673 let svc = make_service();
4674 for i in 0..5 {
4675 create_replay(&svc, &format!("replay-{i:02}"));
4676 }
4677
4678 let req = make_request("ListReplays", json!({ "Limit": 2 }));
4680 let resp = svc.list_replays(&req).unwrap();
4681 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4682 assert_eq!(body["Replays"].as_array().unwrap().len(), 2);
4683 let token = body["NextToken"].as_str().unwrap();
4684 assert_eq!(token, "2");
4685
4686 let req = make_request("ListReplays", json!({ "Limit": 2, "NextToken": token }));
4688 let resp = svc.list_replays(&req).unwrap();
4689 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4690 assert_eq!(body["Replays"].as_array().unwrap().len(), 2);
4691 let token = body["NextToken"].as_str().unwrap();
4692 assert_eq!(token, "4");
4693
4694 let req = make_request("ListReplays", json!({ "Limit": 2, "NextToken": token }));
4696 let resp = svc.list_replays(&req).unwrap();
4697 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4698 assert_eq!(body["Replays"].as_array().unwrap().len(), 1);
4699 assert!(body["NextToken"].is_null());
4700 }
4701
4702 #[test]
4703 fn list_event_buses_invalid_next_token_returns_error() {
4704 let svc = make_service();
4705
4706 let req = make_request("ListEventBuses", json!({ "NextToken": "not-a-number" }));
4707 let result = svc.list_event_buses(&req);
4708 assert!(
4709 result.is_err(),
4710 "non-numeric NextToken should return an error"
4711 );
4712 }
4713
4714 #[test]
4717 fn test_event_pattern_match() {
4718 let svc = make_service();
4719 let req = make_request(
4720 "TestEventPattern",
4721 json!({
4722 "EventPattern": r#"{"source": ["my.app"]}"#,
4723 "Event": r#"{"source": "my.app", "detail-type": "Test", "detail": {}}"#
4724 }),
4725 );
4726 let resp = svc.test_event_pattern(&req).unwrap();
4727 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4728 assert_eq!(body["Result"], true);
4729 }
4730
4731 #[test]
4732 fn test_event_pattern_no_match() {
4733 let svc = make_service();
4734 let req = make_request(
4735 "TestEventPattern",
4736 json!({
4737 "EventPattern": r#"{"source": ["other.app"]}"#,
4738 "Event": r#"{"source": "my.app", "detail-type": "Test", "detail": {}}"#
4739 }),
4740 );
4741 let resp = svc.test_event_pattern(&req).unwrap();
4742 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4743 assert_eq!(body["Result"], false);
4744 }
4745
4746 #[test]
4747 fn test_event_pattern_detail_match() {
4748 let svc = make_service();
4749 let req = make_request(
4750 "TestEventPattern",
4751 json!({
4752 "EventPattern": r#"{"detail": {"status": ["PLACED"]}}"#,
4753 "Event": r#"{"source": "my.app", "detail-type": "Order", "detail": {"status": "PLACED", "id": "123"}}"#
4754 }),
4755 );
4756 let resp = svc.test_event_pattern(&req).unwrap();
4757 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4758 assert_eq!(body["Result"], true);
4759 }
4760
4761 #[test]
4764 fn update_event_bus_description() {
4765 let svc = make_service();
4766 create_event_bus(&svc, "my-bus");
4767
4768 let req = make_request(
4769 "UpdateEventBus",
4770 json!({ "Name": "my-bus", "Description": "Updated desc" }),
4771 );
4772 let resp = svc.update_event_bus(&req).unwrap();
4773 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4774 assert_eq!(body["Name"], "my-bus");
4775
4776 let req = make_request("DescribeEventBus", json!({ "Name": "my-bus" }));
4778 let resp = svc.describe_event_bus(&req).unwrap();
4779 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4780 assert_eq!(body["Description"], "Updated desc");
4781 }
4782
4783 #[test]
4784 fn update_event_bus_not_found() {
4785 let svc = make_service();
4786 let req = make_request(
4787 "UpdateEventBus",
4788 json!({ "Name": "ghost-bus", "Description": "nope" }),
4789 );
4790 assert!(svc.update_event_bus(&req).is_err());
4791 }
4792
4793 fn create_endpoint_helper(svc: &EventBridgeService, name: &str) {
4796 let req = make_request(
4797 "CreateEndpoint",
4798 json!({
4799 "Name": name,
4800 "RoutingConfig": {
4801 "FailoverConfig": {
4802 "Primary": { "HealthCheck": "" },
4803 "Secondary": { "Route": "us-west-2" }
4804 }
4805 },
4806 "EventBuses": [
4807 { "EventBusArn": "arn:aws:events:us-east-1:123456789012:event-bus/default" }
4808 ]
4809 }),
4810 );
4811 svc.create_endpoint(&req).unwrap();
4812 }
4813
4814 #[test]
4815 fn endpoint_create_describe_delete() {
4816 let svc = make_service();
4817 create_endpoint_helper(&svc, "my-endpoint");
4818
4819 let req = make_request("DescribeEndpoint", json!({ "Name": "my-endpoint" }));
4821 let resp = svc.describe_endpoint(&req).unwrap();
4822 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4823 assert_eq!(body["Name"], "my-endpoint");
4824 assert_eq!(body["State"], "ACTIVE");
4825 assert!(body["EndpointId"].as_str().unwrap().contains("my-endpoint"));
4826
4827 let req = make_request("DeleteEndpoint", json!({ "Name": "my-endpoint" }));
4829 svc.delete_endpoint(&req).unwrap();
4830
4831 let req = make_request("DescribeEndpoint", json!({ "Name": "my-endpoint" }));
4833 assert!(svc.describe_endpoint(&req).is_err());
4834 }
4835
4836 #[test]
4837 fn endpoint_list_and_update() {
4838 let svc = make_service();
4839 create_endpoint_helper(&svc, "ep-alpha");
4840 create_endpoint_helper(&svc, "ep-beta");
4841
4842 let req = make_request("ListEndpoints", json!({}));
4844 let resp = svc.list_endpoints(&req).unwrap();
4845 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4846 assert_eq!(body["Endpoints"].as_array().unwrap().len(), 2);
4847
4848 let req = make_request(
4850 "UpdateEndpoint",
4851 json!({ "Name": "ep-alpha", "Description": "updated" }),
4852 );
4853 let resp = svc.update_endpoint(&req).unwrap();
4854 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4855 assert_eq!(body["Name"], "ep-alpha");
4856
4857 let req = make_request("DescribeEndpoint", json!({ "Name": "ep-alpha" }));
4859 let resp = svc.describe_endpoint(&req).unwrap();
4860 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4861 assert_eq!(body["Description"], "updated");
4862 }
4863
4864 #[test]
4865 fn endpoint_duplicate_fails() {
4866 let svc = make_service();
4867 create_endpoint_helper(&svc, "dup-ep");
4868 let req = make_request(
4869 "CreateEndpoint",
4870 json!({
4871 "Name": "dup-ep",
4872 "RoutingConfig": {},
4873 "EventBuses": []
4874 }),
4875 );
4876 assert!(svc.create_endpoint(&req).is_err());
4877 }
4878
4879 #[test]
4882 fn deauthorize_connection_sets_state() {
4883 let svc = make_service();
4884 create_connection(&svc, "deauth-conn");
4885
4886 let req = make_request("DeauthorizeConnection", json!({ "Name": "deauth-conn" }));
4887 let resp = svc.deauthorize_connection(&req).unwrap();
4888 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4889 assert_eq!(body["ConnectionState"], "DEAUTHORIZING");
4890 assert!(body["ConnectionArn"]
4891 .as_str()
4892 .unwrap()
4893 .contains("deauth-conn"));
4894
4895 let req = make_request("DescribeConnection", json!({ "Name": "deauth-conn" }));
4897 let resp = svc.describe_connection(&req).unwrap();
4898 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4899 assert_eq!(body["ConnectionState"], "DEAUTHORIZING");
4900 }
4901
4902 #[test]
4903 fn deauthorize_connection_not_found() {
4904 let svc = make_service();
4905 let req = make_request("DeauthorizeConnection", json!({ "Name": "ghost-conn" }));
4906 assert!(svc.deauthorize_connection(&req).is_err());
4907 }
4908
4909 #[test]
4912 fn partner_event_source_crud() {
4913 let svc = make_service();
4914
4915 let req = make_request(
4917 "CreatePartnerEventSource",
4918 json!({ "Name": "partner/test", "Account": "123456789012" }),
4919 );
4920 svc.create_partner_event_source(&req).unwrap();
4921
4922 let req = make_request(
4924 "DescribePartnerEventSource",
4925 json!({ "Name": "partner/test" }),
4926 );
4927 let resp = svc.describe_partner_event_source(&req).unwrap();
4928 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4929 assert_eq!(body["Name"], "partner/test");
4930
4931 let req = make_request("ListPartnerEventSources", json!({"NamePrefix": "partner/"}));
4933 let resp = svc.list_partner_event_sources(&req).unwrap();
4934 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4935 assert_eq!(body["PartnerEventSources"].as_array().unwrap().len(), 1);
4936
4937 let req = make_request(
4939 "ListPartnerEventSourceAccounts",
4940 json!({ "EventSourceName": "partner/test" }),
4941 );
4942 let resp = svc.list_partner_event_source_accounts(&req).unwrap();
4943 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4944 assert_eq!(
4945 body["PartnerEventSourceAccounts"].as_array().unwrap().len(),
4946 1
4947 );
4948
4949 let req = make_request("DescribeEventSource", json!({ "Name": "partner/test" }));
4951 let resp = svc.describe_event_source(&req).unwrap();
4952 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4953 assert_eq!(body["Name"], "partner/test");
4954 assert_eq!(body["State"], "ACTIVE");
4955
4956 let req = make_request("ListEventSources", json!({}));
4958 let resp = svc.list_event_sources(&req).unwrap();
4959 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4960 assert_eq!(body["EventSources"].as_array().unwrap().len(), 1);
4961
4962 let req = make_request(
4964 "DeletePartnerEventSource",
4965 json!({ "Name": "partner/test", "Account": "123456789012" }),
4966 );
4967 svc.delete_partner_event_source(&req).unwrap();
4968
4969 let req = make_request(
4971 "DescribePartnerEventSource",
4972 json!({ "Name": "partner/test" }),
4973 );
4974 assert!(svc.describe_partner_event_source(&req).is_err());
4975 }
4976
4977 #[test]
4978 fn activate_deactivate_event_source() {
4979 let svc = make_service();
4980
4981 let req = make_request(
4983 "CreatePartnerEventSource",
4984 json!({ "Name": "aws.partner/test", "Account": "123456789012" }),
4985 );
4986 svc.create_partner_event_source(&req).unwrap();
4987
4988 let req = make_request(
4990 "DeactivateEventSource",
4991 json!({ "Name": "aws.partner/test" }),
4992 );
4993 svc.deactivate_event_source(&req).unwrap();
4994 {
4995 let state = svc.state.read();
4996 assert_eq!(
4997 state.partner_event_sources["aws.partner/test"].state,
4998 "INACTIVE"
4999 );
5000 }
5001
5002 let req = make_request("ActivateEventSource", json!({ "Name": "aws.partner/test" }));
5004 svc.activate_event_source(&req).unwrap();
5005 {
5006 let state = svc.state.read();
5007 assert_eq!(
5008 state.partner_event_sources["aws.partner/test"].state,
5009 "ACTIVE"
5010 );
5011 }
5012
5013 let req = make_request("ActivateEventSource", json!({ "Name": "nonexistent" }));
5015 assert!(svc.activate_event_source(&req).is_err());
5016
5017 let req = make_request("DeactivateEventSource", json!({ "Name": "nonexistent" }));
5018 assert!(svc.deactivate_event_source(&req).is_err());
5019 }
5020
5021 #[test]
5022 fn delete_partner_event_source_verifies_account() {
5023 let svc = make_service();
5024
5025 let req = make_request(
5027 "CreatePartnerEventSource",
5028 json!({ "Name": "aws.partner/test", "Account": "123456789012" }),
5029 );
5030 svc.create_partner_event_source(&req).unwrap();
5031
5032 let req = make_request(
5034 "DeletePartnerEventSource",
5035 json!({ "Name": "aws.partner/test", "Account": "999999999999" }),
5036 );
5037 assert!(svc.delete_partner_event_source(&req).is_err());
5038 assert!(svc
5040 .state
5041 .read()
5042 .partner_event_sources
5043 .contains_key("aws.partner/test"));
5044
5045 let req = make_request(
5047 "DeletePartnerEventSource",
5048 json!({ "Name": "aws.partner/test", "Account": "123456789012" }),
5049 );
5050 svc.delete_partner_event_source(&req).unwrap();
5051 assert!(!svc
5052 .state
5053 .read()
5054 .partner_event_sources
5055 .contains_key("aws.partner/test"));
5056
5057 let req = make_request(
5059 "DeletePartnerEventSource",
5060 json!({ "Name": "aws.partner/test", "Account": "123456789012" }),
5061 );
5062 assert!(svc.delete_partner_event_source(&req).is_err());
5063 }
5064
5065 #[test]
5066 fn put_partner_events() {
5067 let svc = make_service();
5068 let req = make_request(
5069 "PutPartnerEvents",
5070 json!({
5071 "Entries": [
5072 { "Source": "partner.app", "DetailType": "Test", "Detail": "{}" }
5073 ]
5074 }),
5075 );
5076 let resp = svc.put_partner_events(&req).unwrap();
5077 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5078 assert_eq!(body["FailedEntryCount"], 0);
5079 assert_eq!(body["Entries"].as_array().unwrap().len(), 1);
5080 assert!(body["Entries"][0]["EventId"].as_str().is_some());
5081 }
5082
5083 #[allow(clippy::type_complexity)]
5087 fn make_service_with_sqs_recorder() -> (
5088 EventBridgeService,
5089 Arc<parking_lot::Mutex<Vec<(String, String)>>>,
5090 ) {
5091 use fakecloud_core::delivery::SqsDelivery;
5092
5093 struct RecordingSqsDelivery {
5094 messages: Arc<parking_lot::Mutex<Vec<(String, String)>>>,
5095 }
5096
5097 impl SqsDelivery for RecordingSqsDelivery {
5098 fn deliver_to_queue(
5099 &self,
5100 queue_arn: &str,
5101 message_body: &str,
5102 _attributes: &HashMap<String, String>,
5103 ) {
5104 self.messages
5105 .lock()
5106 .push((queue_arn.to_string(), message_body.to_string()));
5107 }
5108 }
5109
5110 let messages: Arc<parking_lot::Mutex<Vec<(String, String)>>> =
5111 Arc::new(parking_lot::Mutex::new(Vec::new()));
5112 let state = Arc::new(RwLock::new(EventBridgeState::new(
5113 "123456789012",
5114 "us-east-1",
5115 )));
5116 let delivery = Arc::new(DeliveryBus::new().with_sqs(Arc::new(RecordingSqsDelivery {
5117 messages: messages.clone(),
5118 })));
5119 let svc = EventBridgeService::new(state, delivery);
5120 (svc, messages)
5121 }
5122
5123 #[test]
5124 fn start_replay_delivers_archived_events_to_sqs_target() {
5125 let (svc, messages) = make_service_with_sqs_recorder();
5126 let queue_arn = "arn:aws:sqs:us-east-1:123456789012:replay-queue";
5127
5128 let req = make_request(
5130 "PutRule",
5131 json!({
5132 "Name": "replay-test-rule",
5133 "EventPattern": r#"{"source": ["my.app"]}"#,
5134 "State": "ENABLED"
5135 }),
5136 );
5137 svc.put_rule(&req).unwrap();
5138
5139 let req = make_request(
5140 "PutTargets",
5141 json!({
5142 "Rule": "replay-test-rule",
5143 "Targets": [{
5144 "Id": "sqs-target",
5145 "Arn": queue_arn
5146 }]
5147 }),
5148 );
5149 svc.put_targets(&req).unwrap();
5150
5151 let req = make_request(
5153 "CreateArchive",
5154 json!({
5155 "ArchiveName": "test-archive",
5156 "EventSourceArn": "arn:aws:events:us-east-1:123456789012:event-bus/default"
5157 }),
5158 );
5159 svc.create_archive(&req).unwrap();
5160
5161 let req = make_request(
5163 "PutEvents",
5164 json!({
5165 "Entries": [
5166 {
5167 "Source": "my.app",
5168 "DetailType": "OrderCreated",
5169 "Detail": "{\"orderId\": \"1\"}",
5170 "EventBusName": "default"
5171 },
5172 {
5173 "Source": "my.app",
5174 "DetailType": "OrderShipped",
5175 "Detail": "{\"orderId\": \"2\"}",
5176 "EventBusName": "default"
5177 }
5178 ]
5179 }),
5180 );
5181 let resp = svc.put_events(&req).unwrap();
5182 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5183 assert_eq!(body["FailedEntryCount"], 0);
5184
5185 {
5187 let state = svc.state.read();
5188 let archive = state.archives.get("test-archive").unwrap();
5189 assert_eq!(archive.events.len(), 2);
5190 assert_eq!(archive.event_count, 2);
5191 }
5192
5193 messages.lock().clear();
5195
5196 let archive_arn = {
5198 let state = svc.state.read();
5199 state.archives.get("test-archive").unwrap().arn.clone()
5200 };
5201
5202 let start_ts = 0.0_f64;
5204 let end_ts = (chrono::Utc::now().timestamp() + 3600) as f64;
5205
5206 let req = make_request(
5207 "StartReplay",
5208 json!({
5209 "ReplayName": "my-replay",
5210 "EventSourceArn": archive_arn,
5211 "Destination": {
5212 "Arn": "arn:aws:events:us-east-1:123456789012:event-bus/default"
5213 },
5214 "EventStartTime": start_ts,
5215 "EventEndTime": end_ts
5216 }),
5217 );
5218 let resp = svc.start_replay(&req).unwrap();
5219 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5220 assert_eq!(body["State"], "STARTING");
5221
5222 let delivered = messages.lock();
5224 assert_eq!(
5225 delivered.len(),
5226 2,
5227 "expected 2 replayed events delivered to SQS"
5228 );
5229 for (arn, msg) in delivered.iter() {
5230 assert_eq!(arn, queue_arn);
5231 let event: Value = serde_json::from_str(msg).unwrap();
5232 assert_eq!(event["source"], "my.app");
5233 assert!(event["replay-name"].as_str().is_some());
5235 }
5236
5237 let state = svc.state.read();
5239 let replay = state.replays.get("my-replay").unwrap();
5240 assert_eq!(replay.state, "COMPLETED");
5241 }
5242
5243 #[test]
5244 fn apply_connection_auth_api_key() {
5245 let conn = Connection {
5246 name: "test-conn".to_string(),
5247 arn: "arn:aws:events:us-east-1:123456789012:connection/test-conn/uuid".to_string(),
5248 description: None,
5249 authorization_type: "API_KEY".to_string(),
5250 auth_parameters: json!({
5251 "ApiKeyAuthParameters": {
5252 "ApiKeyName": "x-api-key",
5253 "ApiKeyValue": "my-secret"
5254 }
5255 }),
5256 connection_state: "AUTHORIZED".to_string(),
5257 secret_arn: "arn:aws:secretsmanager:us-east-1:123456789012:secret:test".to_string(),
5258 creation_time: Utc::now(),
5259 last_modified_time: Utc::now(),
5260 last_authorized_time: Utc::now(),
5261 };
5262
5263 let client = reqwest::Client::new();
5264 let builder = client
5265 .post("http://localhost:12345/test")
5266 .header("Content-Type", "application/json");
5267 let builder = apply_connection_auth(builder, &conn);
5268
5269 let request = builder.body("{}").build().unwrap();
5271 assert_eq!(
5272 request
5273 .headers()
5274 .get("x-api-key")
5275 .unwrap()
5276 .to_str()
5277 .unwrap(),
5278 "my-secret"
5279 );
5280 }
5281
5282 #[test]
5283 fn apply_connection_auth_basic() {
5284 let conn = Connection {
5285 name: "basic-conn".to_string(),
5286 arn: "arn:aws:events:us-east-1:123456789012:connection/basic-conn/uuid".to_string(),
5287 description: None,
5288 authorization_type: "BASIC".to_string(),
5289 auth_parameters: json!({
5290 "BasicAuthParameters": {
5291 "Username": "user",
5292 "Password": "pass"
5293 }
5294 }),
5295 connection_state: "AUTHORIZED".to_string(),
5296 secret_arn: "arn:aws:secretsmanager:us-east-1:123456789012:secret:test".to_string(),
5297 creation_time: Utc::now(),
5298 last_modified_time: Utc::now(),
5299 last_authorized_time: Utc::now(),
5300 };
5301
5302 let client = reqwest::Client::new();
5303 let builder = client.post("http://localhost:12345/test");
5304 let builder = apply_connection_auth(builder, &conn);
5305
5306 let request = builder.body("{}").build().unwrap();
5307 let auth_header = request
5308 .headers()
5309 .get("authorization")
5310 .unwrap()
5311 .to_str()
5312 .unwrap();
5313 assert!(
5314 auth_header.starts_with("Basic "),
5315 "Expected Basic auth header, got: {auth_header}"
5316 );
5317 }
5318
5319 #[tokio::test]
5320 async fn put_events_with_api_destination_target_resolves_destination() {
5321 let state = Arc::new(RwLock::new(EventBridgeState::new(
5325 "123456789012",
5326 "us-east-1",
5327 )));
5328 let delivery = Arc::new(DeliveryBus::new());
5329 let svc = EventBridgeService::new(state, delivery);
5330
5331 create_connection(&svc, "my-conn");
5333 let conn_arn = {
5334 let state = svc.state.read();
5335 state.connections.get("my-conn").unwrap().arn.clone()
5336 };
5337 let req = make_request(
5338 "CreateApiDestination",
5339 json!({
5340 "Name": "my-dest",
5341 "ConnectionArn": conn_arn,
5342 "InvocationEndpoint": "http://127.0.0.1:1/noop",
5343 "HttpMethod": "POST"
5344 }),
5345 );
5346 svc.create_api_destination(&req).unwrap();
5347
5348 let dest_arn = {
5349 let state = svc.state.read();
5350 state.api_destinations.get("my-dest").unwrap().arn.clone()
5351 };
5352
5353 let req = make_request(
5355 "PutRule",
5356 json!({
5357 "Name": "api-dest-rule",
5358 "EventPattern": r#"{"source":["test.app"]}"#,
5359 "State": "ENABLED"
5360 }),
5361 );
5362 svc.put_rule(&req).unwrap();
5363
5364 let req = make_request(
5365 "PutTargets",
5366 json!({
5367 "Rule": "api-dest-rule",
5368 "Targets": [{ "Id": "dest-target", "Arn": dest_arn }]
5369 }),
5370 );
5371 svc.put_targets(&req).unwrap();
5372
5373 let req = make_request(
5375 "PutEvents",
5376 json!({
5377 "Entries": [{
5378 "Source": "test.app",
5379 "DetailType": "TestEvent",
5380 "Detail": r#"{"key":"value"}"#
5381 }]
5382 }),
5383 );
5384 let resp = svc.put_events(&req).unwrap();
5385 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5386 assert_eq!(body["FailedEntryCount"], 0);
5387 assert_eq!(body["Entries"].as_array().unwrap().len(), 1);
5388 assert!(body["Entries"][0]["EventId"].as_str().is_some());
5389 }
5390
5391 #[test]
5392 fn test_function_name_from_arn() {
5393 assert_eq!(
5395 super::function_name_from_arn("arn:aws:lambda:us-east-1:123456789012:function:my-func"),
5396 "my-func"
5397 );
5398 assert_eq!(
5400 super::function_name_from_arn(
5401 "arn:aws:lambda:us-east-1:123456789012:function:my-func:prod"
5402 ),
5403 "my-func"
5404 );
5405 assert_eq!(
5407 super::function_name_from_arn(
5408 "arn:aws:lambda:us-east-1:123456789012:function:my-func:42"
5409 ),
5410 "my-func"
5411 );
5412 assert_eq!(super::function_name_from_arn("my-func"), "my-func");
5414 }
5415}