1use async_trait::async_trait;
2use chrono::{DateTime, Utc};
3use http::StatusCode;
4use serde_json::{json, Value};
5
6use std::collections::HashMap;
7use std::sync::Arc;
8
9use fakecloud_aws::arn::Arn;
10use fakecloud_core::delivery::DeliveryBus;
11use fakecloud_core::pagination::paginate;
12use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
13use fakecloud_core::validation::*;
14
15use fakecloud_lambda::runtime::ContainerRuntime;
16use fakecloud_lambda::state::{LambdaInvocation, SharedLambdaState};
17use fakecloud_logs::state::SharedLogsState;
18
19use crate::state::{
20 ApiDestination, Archive, Connection, Endpoint, EventBus, EventRule, EventTarget,
21 PartnerEventSource, PutEvent, Replay, SharedEventBridgeState,
22};
23
24fn validate_put_events_entry(source: &str, detail_type: &str, detail: &str) -> Result<(), Value> {
29 if source.is_empty() {
30 return Err(json!({
31 "ErrorCode": "InvalidArgument",
32 "ErrorMessage": "Parameter Source is not valid. Reason: Source is a required argument.",
33 }));
34 }
35 if detail_type.is_empty() {
36 return Err(json!({
37 "ErrorCode": "InvalidArgument",
38 "ErrorMessage": "Parameter DetailType is not valid. Reason: DetailType is a required argument.",
39 }));
40 }
41 if detail.is_empty() {
42 return Err(json!({
43 "ErrorCode": "InvalidArgument",
44 "ErrorMessage": "Parameter Detail is not valid. Reason: Detail is a required argument.",
45 }));
46 }
47 if serde_json::from_str::<Value>(detail).is_err() {
48 return Err(json!({
49 "ErrorCode": "MalformedDetail",
50 "ErrorMessage": "Detail is malformed.",
51 }));
52 }
53 Ok(())
54}
55
56fn parse_put_events_time(raw: &Value) -> DateTime<Utc> {
61 if let Some(s) = raw.as_str() {
62 return DateTime::parse_from_rfc3339(s)
63 .map(|dt| dt.with_timezone(&Utc))
64 .unwrap_or_else(|_| Utc::now());
65 }
66 if let Some(ts) = raw.as_f64() {
67 return DateTime::from_timestamp(ts as i64, ((ts.fract()) * 1_000_000_000.0) as u32)
68 .unwrap_or_else(Utc::now);
69 }
70 if let Some(ts) = raw.as_i64() {
71 return DateTime::from_timestamp(ts, 0).unwrap_or_else(Utc::now);
72 }
73 Utc::now()
74}
75
76pub struct EventBridgeService {
77 state: SharedEventBridgeState,
78 delivery: Arc<DeliveryBus>,
79 lambda_state: Option<SharedLambdaState>,
80 logs_state: Option<SharedLogsState>,
81 container_runtime: Option<Arc<ContainerRuntime>>,
82}
83
84impl EventBridgeService {
85 pub fn new(state: SharedEventBridgeState, delivery: Arc<DeliveryBus>) -> Self {
86 Self {
87 state,
88 delivery,
89 lambda_state: None,
90 logs_state: None,
91 container_runtime: None,
92 }
93 }
94
95 pub fn with_lambda(mut self, lambda_state: SharedLambdaState) -> Self {
96 self.lambda_state = Some(lambda_state);
97 self
98 }
99
100 pub fn with_logs(mut self, logs_state: SharedLogsState) -> Self {
101 self.logs_state = Some(logs_state);
102 self
103 }
104
105 pub fn with_runtime(mut self, runtime: Arc<ContainerRuntime>) -> Self {
106 self.container_runtime = Some(runtime);
107 self
108 }
109}
110
111#[async_trait]
112impl AwsService for EventBridgeService {
113 fn service_name(&self) -> &str {
114 "events"
115 }
116
117 async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
118 match req.action.as_str() {
119 "CreateEventBus" => self.create_event_bus(&req),
120 "DeleteEventBus" => self.delete_event_bus(&req),
121 "ListEventBuses" => self.list_event_buses(&req),
122 "DescribeEventBus" => self.describe_event_bus(&req),
123 "PutRule" => self.put_rule(&req),
124 "DeleteRule" => self.delete_rule(&req),
125 "ListRules" => self.list_rules(&req),
126 "DescribeRule" => self.describe_rule(&req),
127 "EnableRule" => self.enable_rule(&req),
128 "DisableRule" => self.disable_rule(&req),
129 "PutTargets" => self.put_targets(&req),
130 "RemoveTargets" => self.remove_targets(&req),
131 "ListTargetsByRule" => self.list_targets_by_rule(&req),
132 "ListRuleNamesByTarget" => self.list_rule_names_by_target(&req),
133 "PutEvents" => self.put_events(&req),
134 "PutPermission" => self.put_permission(&req),
135 "RemovePermission" => self.remove_permission(&req),
136 "TagResource" => self.tag_resource(&req),
137 "UntagResource" => self.untag_resource(&req),
138 "ListTagsForResource" => self.list_tags_for_resource(&req),
139 "CreateArchive" => self.create_archive(&req),
140 "DescribeArchive" => self.describe_archive(&req),
141 "ListArchives" => self.list_archives(&req),
142 "UpdateArchive" => self.update_archive(&req),
143 "DeleteArchive" => self.delete_archive(&req),
144 "CreateConnection" => self.create_connection(&req),
145 "DescribeConnection" => self.describe_connection(&req),
146 "ListConnections" => self.list_connections(&req),
147 "UpdateConnection" => self.update_connection(&req),
148 "DeleteConnection" => self.delete_connection(&req),
149 "CreateApiDestination" => self.create_api_destination(&req),
150 "DescribeApiDestination" => self.describe_api_destination(&req),
151 "ListApiDestinations" => self.list_api_destinations(&req),
152 "UpdateApiDestination" => self.update_api_destination(&req),
153 "DeleteApiDestination" => self.delete_api_destination(&req),
154 "StartReplay" => self.start_replay(&req),
155 "DescribeReplay" => self.describe_replay(&req),
156 "ListReplays" => self.list_replays(&req),
157 "CancelReplay" => self.cancel_replay(&req),
158 "CreatePartnerEventSource" => self.create_partner_event_source(&req),
159 "DeletePartnerEventSource" => self.delete_partner_event_source(&req),
160 "DescribePartnerEventSource" => self.describe_partner_event_source(&req),
161 "ListPartnerEventSources" => self.list_partner_event_sources(&req),
162 "ListPartnerEventSourceAccounts" => self.list_partner_event_source_accounts(&req),
163 "ActivateEventSource" => self.activate_event_source(&req),
164 "DeactivateEventSource" => self.deactivate_event_source(&req),
165 "DescribeEventSource" => self.describe_event_source(&req),
166 "ListEventSources" => self.list_event_sources(&req),
167 "PutPartnerEvents" => self.put_partner_events(&req),
168 "TestEventPattern" => self.test_event_pattern(&req),
169 "UpdateEventBus" => self.update_event_bus(&req),
170 "CreateEndpoint" => self.create_endpoint(&req),
171 "DeleteEndpoint" => self.delete_endpoint(&req),
172 "DescribeEndpoint" => self.describe_endpoint(&req),
173 "ListEndpoints" => self.list_endpoints(&req),
174 "UpdateEndpoint" => self.update_endpoint(&req),
175 "DeauthorizeConnection" => self.deauthorize_connection(&req),
176 _ => Err(AwsServiceError::action_not_implemented(
177 "events",
178 &req.action,
179 )),
180 }
181 }
182
183 fn supported_actions(&self) -> &[&str] {
184 &[
185 "CreateEventBus",
186 "DeleteEventBus",
187 "ListEventBuses",
188 "DescribeEventBus",
189 "PutRule",
190 "DeleteRule",
191 "ListRules",
192 "DescribeRule",
193 "EnableRule",
194 "DisableRule",
195 "PutTargets",
196 "RemoveTargets",
197 "ListTargetsByRule",
198 "ListRuleNamesByTarget",
199 "PutEvents",
200 "PutPermission",
201 "RemovePermission",
202 "TagResource",
203 "UntagResource",
204 "ListTagsForResource",
205 "CreateArchive",
206 "DescribeArchive",
207 "ListArchives",
208 "UpdateArchive",
209 "DeleteArchive",
210 "CreateConnection",
211 "DescribeConnection",
212 "ListConnections",
213 "UpdateConnection",
214 "DeleteConnection",
215 "CreateApiDestination",
216 "DescribeApiDestination",
217 "ListApiDestinations",
218 "UpdateApiDestination",
219 "DeleteApiDestination",
220 "StartReplay",
221 "DescribeReplay",
222 "ListReplays",
223 "CancelReplay",
224 "CreatePartnerEventSource",
225 "DeletePartnerEventSource",
226 "DescribePartnerEventSource",
227 "ListPartnerEventSources",
228 "ListPartnerEventSourceAccounts",
229 "ActivateEventSource",
230 "DeactivateEventSource",
231 "DescribeEventSource",
232 "ListEventSources",
233 "PutPartnerEvents",
234 "TestEventPattern",
235 "UpdateEventBus",
236 "CreateEndpoint",
237 "DeleteEndpoint",
238 "DescribeEndpoint",
239 "ListEndpoints",
240 "UpdateEndpoint",
241 "DeauthorizeConnection",
242 ]
243 }
244}
245
246fn parse_tags(body: &Value) -> HashMap<String, String> {
247 let mut tags = HashMap::new();
248 if let Some(arr) = body["Tags"].as_array() {
249 for tag in arr {
250 if let (Some(key), Some(val)) = (tag["Key"].as_str(), tag["Value"].as_str()) {
251 tags.insert(key.to_string(), val.to_string());
252 }
253 }
254 }
255 tags
256}
257
258fn parse_target(target: &Value) -> EventTarget {
259 EventTarget {
260 id: target["Id"].as_str().unwrap_or("").to_string(),
261 arn: target["Arn"].as_str().unwrap_or("").to_string(),
262 input: target["Input"].as_str().map(|s| s.to_string()),
263 input_path: target["InputPath"].as_str().map(|s| s.to_string()),
264 input_transformer: target.get("InputTransformer").cloned(),
265 sqs_parameters: target.get("SqsParameters").cloned(),
266 }
267}
268
269fn target_to_json(t: &EventTarget) -> Value {
270 let mut obj = json!({ "Id": t.id, "Arn": t.arn });
271 if let Some(ref input) = t.input {
272 obj["Input"] = json!(input);
273 }
274 if let Some(ref input_path) = t.input_path {
275 obj["InputPath"] = json!(input_path);
276 }
277 if let Some(ref it) = t.input_transformer {
278 obj["InputTransformer"] = it.clone();
279 }
280 if let Some(ref sp) = t.sqs_parameters {
281 obj["SqsParameters"] = sp.clone();
282 }
283 obj
284}
285
286impl EventBridgeService {
288 fn create_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
289 let body = req.json_body();
290 validate_required("Name", &body["Name"])?;
291 let name = body["Name"]
292 .as_str()
293 .ok_or_else(|| missing("Name"))?
294 .to_string();
295 validate_string_length("name", &name, 1, 256)?;
296 validate_optional_string_length(
297 "eventSourceName",
298 body["EventSourceName"].as_str(),
299 1,
300 256,
301 )?;
302 validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
303 validate_optional_string_length(
304 "kmsKeyIdentifier",
305 body["KmsKeyIdentifier"].as_str(),
306 0,
307 2048,
308 )?;
309
310 if name.contains('/') && !name.starts_with("aws.partner/") {
312 return Err(AwsServiceError::aws_error(
313 StatusCode::BAD_REQUEST,
314 "ValidationException",
315 "Event bus name must not contain '/'.",
316 ));
317 }
318
319 if name.starts_with("aws.partner/") {
321 let event_source = body["EventSourceName"].as_str().unwrap_or("");
322 let state_r = self.state.read();
323 let has_source = state_r.partner_event_sources.contains_key(event_source);
324 drop(state_r);
325 if !has_source {
326 return Err(AwsServiceError::aws_error(
327 StatusCode::BAD_REQUEST,
328 "ResourceNotFoundException",
329 format!("Event source {event_source} does not exist."),
330 ));
331 }
332 }
333
334 let mut state = self.state.write();
335
336 if state.buses.contains_key(&name) {
337 return Err(AwsServiceError::aws_error(
338 StatusCode::BAD_REQUEST,
339 "ResourceAlreadyExistsException",
340 format!("Event bus {name} already exists."),
341 ));
342 }
343
344 let arn = format!(
345 "arn:aws:events:{}:{}:event-bus/{}",
346 req.region, state.account_id, name
347 );
348 let now = Utc::now();
349 let description = body["Description"].as_str().map(|s| s.to_string());
350 let kms_key_identifier = body["KmsKeyIdentifier"].as_str().map(|s| s.to_string());
351 let dead_letter_config = body.get("DeadLetterConfig").cloned();
352
353 let tags = parse_tags(&body);
354
355 let bus = EventBus {
356 name: name.clone(),
357 arn: arn.clone(),
358 tags,
359 policy: None,
360 description,
361 kms_key_identifier,
362 dead_letter_config,
363 creation_time: now,
364 last_modified_time: now,
365 };
366 state.buses.insert(name, bus);
367
368 Ok(AwsResponse::ok_json(json!({ "EventBusArn": arn })))
369 }
370
371 fn delete_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
372 let body = req.json_body();
373 validate_required("Name", &body["Name"])?;
374 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
375 validate_string_length("name", name, 1, 256)?;
376
377 if name == "default" {
378 return Err(AwsServiceError::aws_error(
379 StatusCode::BAD_REQUEST,
380 "ValidationException",
381 format!("Cannot delete event bus {name}."),
382 ));
383 }
384
385 let mut state = self.state.write();
386 state.buses.remove(name);
387 state.rules.retain(|k, _| k.0 != name);
388
389 Ok(AwsResponse::ok_json(json!({})))
390 }
391
392 fn list_event_buses(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
393 let body = req.json_body();
394 validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 256)?;
395 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
396 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
397 let name_prefix = body["NamePrefix"].as_str();
398 let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
399 if let Some(t) = body["NextToken"].as_str() {
400 t.parse::<usize>().map_err(|_| {
401 AwsServiceError::aws_error(
402 StatusCode::BAD_REQUEST,
403 "InvalidNextTokenException",
404 format!("Invalid NextToken value: '{t}'"),
405 )
406 })?;
407 }
408
409 let state = self.state.read();
410 let filtered: Vec<&_> = state
411 .buses
412 .values()
413 .filter(|b| match name_prefix {
414 Some(prefix) => b.name.starts_with(prefix),
415 None => true,
416 })
417 .collect();
418
419 let (page, next_token) = paginate(&filtered, body["NextToken"].as_str(), limit);
420 let buses: Vec<Value> = page
421 .iter()
422 .map(|b| json!({ "Name": b.name, "Arn": b.arn }))
423 .collect();
424 let mut resp = json!({ "EventBuses": buses });
425 if let Some(token) = next_token {
426 resp["NextToken"] = json!(token);
427 }
428
429 Ok(AwsResponse::ok_json(resp))
430 }
431
432 fn describe_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
433 let body = req.json_body();
434 validate_optional_string_length("name", body["Name"].as_str(), 1, 1600)?;
435 let name = body["Name"].as_str().unwrap_or("default");
436
437 let state = self.state.read();
438 let bus = state.buses.get(name).ok_or_else(|| {
439 AwsServiceError::aws_error(
440 StatusCode::BAD_REQUEST,
441 "ResourceNotFoundException",
442 format!("Event bus {name} does not exist."),
443 )
444 })?;
445
446 let mut resp = json!({
447 "Name": bus.name,
448 "Arn": bus.arn,
449 "CreationTime": bus.creation_time.timestamp() as f64,
450 "LastModifiedTime": bus.last_modified_time.timestamp() as f64,
451 });
452
453 if let Some(ref policy) = bus.policy {
454 resp["Policy"] = Value::String(serde_json::to_string(policy).unwrap());
455 }
456 if let Some(ref desc) = bus.description {
457 resp["Description"] = json!(desc);
458 }
459 if let Some(ref kms) = bus.kms_key_identifier {
460 resp["KmsKeyIdentifier"] = json!(kms);
461 }
462 if let Some(ref dlc) = bus.dead_letter_config {
463 resp["DeadLetterConfig"] = dlc.clone();
464 }
465
466 Ok(AwsResponse::ok_json(resp))
467 }
468
469 fn put_permission(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
472 let body = req.json_body();
473 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 256)?;
474 validate_optional_string_length("action", body["Action"].as_str(), 1, 64)?;
475 validate_optional_string_length("principal", body["Principal"].as_str(), 1, 12)?;
476 validate_optional_string_length("statementId", body["StatementId"].as_str(), 1, 64)?;
477 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
478
479 let mut state = self.state.write();
480
481 let bus = state.buses.get_mut(event_bus_name).ok_or_else(|| {
482 AwsServiceError::aws_error(
483 StatusCode::BAD_REQUEST,
484 "ResourceNotFoundException",
485 format!("Event bus {event_bus_name} does not exist."),
486 )
487 })?;
488
489 if let Some(policy_str) = body["Policy"].as_str() {
491 if let Ok(policy) = serde_json::from_str::<Value>(policy_str) {
492 bus.policy = Some(policy);
493 return Ok(AwsResponse::ok_json(json!({})));
494 }
495 }
496
497 let action = body["Action"].as_str().unwrap_or("");
499 let principal = body["Principal"].as_str().unwrap_or("");
500 let statement_id = body["StatementId"].as_str().unwrap_or("");
501
502 if action != "events:PutEvents" {
504 return Err(AwsServiceError::aws_error(
505 StatusCode::BAD_REQUEST,
506 "ValidationException",
507 "Provided value in parameter 'action' is not supported.",
508 ));
509 }
510
511 let statement = json!({
512 "Sid": statement_id,
513 "Effect": "Allow",
514 "Principal": { "AWS": Arn::global("iam", principal, "root").to_string() },
515 "Action": action,
516 "Resource": bus.arn,
517 });
518
519 let policy = bus.policy.get_or_insert_with(|| {
520 json!({
521 "Version": "2012-10-17",
522 "Statement": [],
523 })
524 });
525
526 if let Some(stmts) = policy["Statement"].as_array_mut() {
527 stmts.push(statement);
528 }
529
530 Ok(AwsResponse::ok_json(json!({})))
531 }
532
533 fn remove_permission(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
534 let body = req.json_body();
535 validate_optional_string_length("statementId", body["StatementId"].as_str(), 1, 64)?;
536 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 256)?;
537 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
538 let statement_id = body["StatementId"].as_str().unwrap_or("");
539 let remove_all = body["RemoveAllPermissions"].as_bool().unwrap_or(false);
540
541 let mut state = self.state.write();
542
543 let bus = state.buses.get_mut(event_bus_name).ok_or_else(|| {
544 AwsServiceError::aws_error(
545 StatusCode::BAD_REQUEST,
546 "ResourceNotFoundException",
547 format!("Event bus {event_bus_name} does not exist."),
548 )
549 })?;
550
551 if remove_all {
552 bus.policy = None;
553 return Ok(AwsResponse::ok_json(json!({})));
554 }
555
556 let policy = bus.policy.as_mut().ok_or_else(|| {
557 AwsServiceError::aws_error(
558 StatusCode::BAD_REQUEST,
559 "ResourceNotFoundException",
560 "EventBus does not have a policy.",
561 )
562 })?;
563
564 if let Some(stmts) = policy["Statement"].as_array_mut() {
565 let before = stmts.len();
566 stmts.retain(|s| s["Sid"].as_str() != Some(statement_id));
567 if stmts.len() == before {
568 return Err(AwsServiceError::aws_error(
569 StatusCode::BAD_REQUEST,
570 "ResourceNotFoundException",
571 "Statement with the provided id does not exist.",
572 ));
573 }
574 if stmts.is_empty() {
575 bus.policy = None;
576 }
577 }
578
579 Ok(AwsResponse::ok_json(json!({})))
580 }
581
582 fn put_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
585 let body = req.json_body();
586 validate_required("Name", &body["Name"])?;
587 let name = body["Name"]
588 .as_str()
589 .ok_or_else(|| missing("Name"))?
590 .to_string();
591 validate_string_length("name", &name, 1, 64)?;
592 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
593 validate_optional_string_length(
594 "scheduleExpression",
595 body["ScheduleExpression"].as_str(),
596 0,
597 256,
598 )?;
599 validate_optional_string_length("eventPattern", body["EventPattern"].as_str(), 0, 4096)?;
600 validate_optional_enum(
601 "state",
602 body["State"].as_str(),
603 &[
604 "ENABLED",
605 "DISABLED",
606 "ENABLED_WITH_ALL_CLOUDTRAIL_MANAGEMENT_EVENTS",
607 ],
608 )?;
609 validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
610 validate_optional_string_length("roleArn", body["RoleArn"].as_str(), 1, 1600)?;
611
612 let raw_bus = body["EventBusName"]
613 .as_str()
614 .unwrap_or("default")
615 .to_string();
616
617 let mut state = self.state.write();
618 let event_bus_name = state.resolve_bus_name(&raw_bus);
619
620 let event_pattern = body["EventPattern"].as_str().and_then(|s| {
621 if s.is_empty() {
622 None
623 } else {
624 Some(s.to_string())
625 }
626 });
627 let schedule_expression = body["ScheduleExpression"].as_str().and_then(|s| {
628 if s.is_empty() {
629 None
630 } else {
631 Some(s.to_string())
632 }
633 });
634 let description = body["Description"].as_str().map(|s| s.to_string());
635 let role_arn = body["RoleArn"].as_str().map(|s| s.to_string());
636 let rule_state = body["State"].as_str().unwrap_or("ENABLED").to_string();
637
638 if schedule_expression.is_some() && event_bus_name != "default" {
640 return Err(AwsServiceError::aws_error(
641 StatusCode::BAD_REQUEST,
642 "ValidationException",
643 "ScheduleExpression is supported only on the default event bus.",
644 ));
645 }
646
647 if !state.buses.contains_key(&event_bus_name) {
648 return Err(AwsServiceError::aws_error(
649 StatusCode::BAD_REQUEST,
650 "ResourceNotFoundException",
651 format!("Event bus {event_bus_name} does not exist."),
652 ));
653 }
654
655 let arn = if event_bus_name == "default" {
656 format!(
657 "arn:aws:events:{}:{}:rule/{}",
658 req.region, state.account_id, name
659 )
660 } else {
661 format!(
662 "arn:aws:events:{}:{}:rule/{}/{}",
663 req.region, state.account_id, event_bus_name, name
664 )
665 };
666
667 let key = (event_bus_name.clone(), name.clone());
668 let targets = state
669 .rules
670 .get(&key)
671 .map(|r| r.targets.clone())
672 .unwrap_or_default();
673
674 let tags = parse_tags(&body);
675
676 let rule = EventRule {
677 name: name.clone(),
678 arn: arn.clone(),
679 event_bus_name,
680 event_pattern,
681 schedule_expression,
682 state: rule_state,
683 description,
684 role_arn,
685 managed_by: None,
686 created_by: None,
687 targets,
688 tags,
689 last_fired: None,
690 };
691
692 state.rules.insert(key, rule);
693 Ok(AwsResponse::ok_json(json!({ "RuleArn": arn })))
694 }
695
696 fn delete_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
697 let body = req.json_body();
698 validate_required("Name", &body["Name"])?;
699 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
700 validate_string_length("name", name, 1, 64)?;
701 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
702 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
703
704 let mut state = self.state.write();
705 let bus_name = state.resolve_bus_name(event_bus_name);
706 let key = (bus_name, name.to_string());
707
708 if let Some(rule) = state.rules.get(&key) {
710 if !rule.targets.is_empty() {
711 return Err(AwsServiceError::aws_error(
712 StatusCode::BAD_REQUEST,
713 "ValidationException",
714 "Rule can't be deleted since it has targets.",
715 ));
716 }
717 }
718
719 state.rules.remove(&key);
720 Ok(AwsResponse::ok_json(json!({})))
721 }
722
723 fn list_rules(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
724 let body = req.json_body();
725 validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 64)?;
726 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
727 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
728 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
729 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
730 let name_prefix = body["NamePrefix"].as_str();
731 let limit = body["Limit"].as_u64().map(|n| n as usize);
732 let next_token = body["NextToken"].as_str();
733
734 let state = self.state.read();
735 let bus_name = state.resolve_bus_name(event_bus_name);
736
737 let mut rules: Vec<&EventRule> = state
738 .rules
739 .values()
740 .filter(|r| r.event_bus_name == bus_name)
741 .filter(|r| match name_prefix {
742 Some(prefix) => r.name.starts_with(prefix),
743 None => true,
744 })
745 .collect();
746 rules.sort_by(|a, b| a.name.cmp(&b.name));
747
748 let start = next_token
750 .and_then(|t| t.parse::<usize>().ok())
751 .unwrap_or(0)
752 .min(rules.len());
753 let rules_slice = &rules[start..];
754
755 let (page, new_next_token) = if let Some(lim) = limit {
756 if rules_slice.len() > lim {
757 (&rules_slice[..lim], Some((start + lim).to_string()))
758 } else {
759 (rules_slice, None)
760 }
761 } else {
762 (rules_slice, None)
763 };
764
765 let rules_json: Vec<Value> = page
766 .iter()
767 .map(|r| {
768 let mut obj = json!({
769 "Name": r.name,
770 "Arn": r.arn,
771 "EventBusName": r.event_bus_name,
772 "State": r.state,
773 });
774 if let Some(ref desc) = r.description {
775 obj["Description"] = json!(desc);
776 }
777 if let Some(ref ep) = r.event_pattern {
778 obj["EventPattern"] = json!(ep);
779 }
780 if let Some(ref se) = r.schedule_expression {
781 obj["ScheduleExpression"] = json!(se);
782 }
783 if let Some(ref mb) = r.managed_by {
784 obj["ManagedBy"] = json!(mb);
785 }
786 obj
787 })
788 .collect();
789
790 let mut resp = json!({ "Rules": rules_json });
791 if let Some(token) = new_next_token {
792 resp["NextToken"] = json!(token);
793 }
794
795 Ok(AwsResponse::ok_json(resp))
796 }
797
798 fn describe_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
799 let body = req.json_body();
800 validate_required("Name", &body["Name"])?;
801 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
802 validate_string_length("name", name, 1, 64)?;
803 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
804 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
805
806 let state = self.state.read();
807 let bus_name = state.resolve_bus_name(event_bus_name);
808 let key = (bus_name.clone(), name.to_string());
809
810 let rule = state.rules.get(&key).ok_or_else(|| {
811 AwsServiceError::aws_error(
812 StatusCode::BAD_REQUEST,
813 "ResourceNotFoundException",
814 format!("Rule {name} does not exist."),
815 )
816 })?;
817
818 let mut resp = json!({
819 "Name": rule.name,
820 "Arn": rule.arn,
821 "EventBusName": rule.event_bus_name,
822 "State": rule.state,
823 });
824
825 if let Some(ref desc) = rule.description {
826 resp["Description"] = json!(desc);
827 }
828 if let Some(ref ep) = rule.event_pattern {
829 resp["EventPattern"] = json!(ep);
830 }
831 if let Some(ref se) = rule.schedule_expression {
832 resp["ScheduleExpression"] = json!(se);
833 }
834 if let Some(ref role) = rule.role_arn {
835 resp["RoleArn"] = json!(role);
836 }
837 if let Some(ref mb) = rule.managed_by {
838 resp["ManagedBy"] = json!(mb);
839 }
840 if let Some(ref cb) = rule.created_by {
841 resp["CreatedBy"] = json!(cb);
842 }
843 if rule.event_bus_name != "default" && rule.created_by.is_none() {
845 resp["CreatedBy"] = json!(state.account_id);
846 }
847
848 Ok(AwsResponse::ok_json(resp))
849 }
850
851 fn enable_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
852 let body = req.json_body();
853 validate_required("Name", &body["Name"])?;
854 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
855 validate_string_length("name", name, 1, 64)?;
856 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
857 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
858
859 let mut state = self.state.write();
860 let bus_name = state.resolve_bus_name(event_bus_name);
861 let key = (bus_name, name.to_string());
862
863 let rule = state.rules.get_mut(&key).ok_or_else(|| {
864 AwsServiceError::aws_error(
865 StatusCode::BAD_REQUEST,
866 "ResourceNotFoundException",
867 format!("Rule {name} does not exist."),
868 )
869 })?;
870
871 rule.state = "ENABLED".to_string();
872 Ok(AwsResponse::ok_json(json!({})))
873 }
874
875 fn disable_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
876 let body = req.json_body();
877 validate_required("Name", &body["Name"])?;
878 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
879 validate_string_length("name", name, 1, 64)?;
880 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
881 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
882
883 let mut state = self.state.write();
884 let bus_name = state.resolve_bus_name(event_bus_name);
885 let key = (bus_name, name.to_string());
886
887 let rule = state.rules.get_mut(&key).ok_or_else(|| {
888 AwsServiceError::aws_error(
889 StatusCode::BAD_REQUEST,
890 "ResourceNotFoundException",
891 format!("Rule {name} does not exist."),
892 )
893 })?;
894
895 rule.state = "DISABLED".to_string();
896 Ok(AwsResponse::ok_json(json!({})))
897 }
898
899 fn put_targets(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
902 let body = req.json_body();
903 validate_required("Rule", &body["Rule"])?;
904 let rule_name = body["Rule"].as_str().ok_or_else(|| missing("Rule"))?;
905 validate_string_length("rule", rule_name, 1, 64)?;
906 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
907 validate_required("Targets", &body["Targets"])?;
908 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
909 let targets = body["Targets"]
910 .as_array()
911 .ok_or_else(|| missing("Targets"))?;
912
913 for target in targets {
915 let target_id = target["Id"].as_str().unwrap_or("");
916 let target_arn = target["Arn"].as_str().unwrap_or("");
917
918 if target_arn.ends_with(".fifo") && target.get("SqsParameters").is_none() {
919 return Err(AwsServiceError::aws_error(
920 StatusCode::BAD_REQUEST,
921 "ValidationException",
922 format!(
923 "Parameter(s) SqsParameters must be specified for target: {target_id}."
924 ),
925 ));
926 }
927
928 if !target_arn.starts_with("arn:") {
930 return Err(AwsServiceError::aws_error(
931 StatusCode::BAD_REQUEST,
932 "ValidationException",
933 format!(
934 "Parameter {target_arn} is not valid. Reason: Provided Arn is not in correct format."
935 ),
936 ));
937 }
938 }
939
940 let mut state = self.state.write();
941 let bus_name = state.resolve_bus_name(event_bus_name);
942 let key = (bus_name.clone(), rule_name.to_string());
943
944 let rule = state.rules.get_mut(&key).ok_or_else(|| {
945 AwsServiceError::aws_error(
946 StatusCode::BAD_REQUEST,
947 "ResourceNotFoundException",
948 format!("Rule {rule_name} does not exist on EventBus {bus_name}."),
949 )
950 })?;
951
952 for target in targets {
953 let et = parse_target(target);
954 rule.targets.retain(|t| t.id != et.id);
956 rule.targets.push(et);
957 }
958
959 Ok(AwsResponse::ok_json(json!({
960 "FailedEntryCount": 0,
961 "FailedEntries": [],
962 })))
963 }
964
965 fn remove_targets(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
966 let body = req.json_body();
967 validate_required("Rule", &body["Rule"])?;
968 let rule_name = body["Rule"].as_str().ok_or_else(|| missing("Rule"))?;
969 validate_string_length("rule", rule_name, 1, 64)?;
970 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
971 validate_required("Ids", &body["Ids"])?;
972 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
973 let ids = body["Ids"].as_array().ok_or_else(|| missing("Ids"))?;
974
975 let target_ids: Vec<String> = ids
976 .iter()
977 .filter_map(|v| v.as_str().map(|s| s.to_string()))
978 .collect();
979
980 let mut state = self.state.write();
981 let bus_name = state.resolve_bus_name(event_bus_name);
982 let key = (bus_name.clone(), rule_name.to_string());
983
984 let rule = state.rules.get_mut(&key).ok_or_else(|| {
985 AwsServiceError::aws_error(
986 StatusCode::BAD_REQUEST,
987 "ResourceNotFoundException",
988 format!("Rule {rule_name} does not exist on EventBus {bus_name}."),
989 )
990 })?;
991
992 rule.targets.retain(|t| !target_ids.contains(&t.id));
993
994 Ok(AwsResponse::ok_json(json!({
995 "FailedEntryCount": 0,
996 "FailedEntries": [],
997 })))
998 }
999
1000 fn list_targets_by_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1001 let body = req.json_body();
1002 validate_required("Rule", &body["Rule"])?;
1003 let rule_name = body["Rule"].as_str().ok_or_else(|| missing("Rule"))?;
1004 validate_string_length("rule", rule_name, 1, 64)?;
1005 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
1006 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1007 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1008 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
1009 let limit = body["Limit"].as_u64().map(|n| n as usize);
1010 let next_token = body["NextToken"].as_str();
1011
1012 let state = self.state.read();
1013 let bus_name = state.resolve_bus_name(event_bus_name);
1014 let key = (bus_name, rule_name.to_string());
1015
1016 let rule = state.rules.get(&key).ok_or_else(|| {
1017 AwsServiceError::aws_error(
1018 StatusCode::BAD_REQUEST,
1019 "ResourceNotFoundException",
1020 format!("Rule {rule_name} does not exist."),
1021 )
1022 })?;
1023
1024 let all_targets = &rule.targets;
1025 let start = next_token
1026 .and_then(|t| t.parse::<usize>().ok())
1027 .unwrap_or(0)
1028 .min(all_targets.len());
1029 let slice = &all_targets[start..];
1030
1031 let (page, new_next_token) = if let Some(lim) = limit {
1032 if slice.len() > lim {
1033 (&slice[..lim], Some((start + lim).to_string()))
1034 } else {
1035 (slice, None)
1036 }
1037 } else {
1038 (slice, None)
1039 };
1040
1041 let targets: Vec<Value> = page.iter().map(target_to_json).collect();
1042
1043 let mut resp = json!({ "Targets": targets });
1044 if let Some(token) = new_next_token {
1045 resp["NextToken"] = json!(token);
1046 }
1047
1048 Ok(AwsResponse::ok_json(resp))
1049 }
1050
1051 fn list_rule_names_by_target(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1052 let body = req.json_body();
1053 validate_required("TargetArn", &body["TargetArn"])?;
1054 let target_arn = body["TargetArn"]
1055 .as_str()
1056 .ok_or_else(|| missing("TargetArn"))?;
1057 validate_string_length("targetArn", target_arn, 1, 1600)?;
1058 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
1059 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1060 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1061 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
1062 let limit = body["Limit"].as_u64().map(|n| n as usize);
1063 let next_token = body["NextToken"].as_str();
1064
1065 let state = self.state.read();
1066 let bus_name = state.resolve_bus_name(event_bus_name);
1067
1068 let mut rule_names: Vec<String> = Vec::new();
1070 for rule in state.rules.values() {
1071 if rule.event_bus_name == bus_name
1072 && rule.targets.iter().any(|t| t.arn == target_arn)
1073 && !rule_names.contains(&rule.name)
1074 {
1075 rule_names.push(rule.name.clone());
1076 }
1077 }
1078 rule_names.sort();
1079
1080 let start = next_token
1081 .and_then(|t| t.parse::<usize>().ok())
1082 .unwrap_or(0)
1083 .min(rule_names.len());
1084 let slice = &rule_names[start..];
1085
1086 let (page, new_next_token) = if let Some(lim) = limit {
1087 if slice.len() > lim {
1088 (&slice[..lim], Some((start + lim).to_string()))
1089 } else {
1090 (slice, None)
1091 }
1092 } else {
1093 (slice, None)
1094 };
1095
1096 let mut resp = json!({ "RuleNames": page });
1097 if let Some(token) = new_next_token {
1098 resp["NextToken"] = json!(token);
1099 }
1100
1101 Ok(AwsResponse::ok_json(resp))
1102 }
1103
1104 fn create_partner_event_source(
1107 &self,
1108 req: &AwsRequest,
1109 ) -> Result<AwsResponse, AwsServiceError> {
1110 let body = req.json_body();
1111 validate_required("Name", &body["Name"])?;
1112 let name = body["Name"]
1113 .as_str()
1114 .ok_or_else(|| missing("Name"))?
1115 .to_string();
1116 validate_string_length("name", &name, 1, 256)?;
1117 validate_required("Account", &body["Account"])?;
1118 let account = body["Account"]
1119 .as_str()
1120 .ok_or_else(|| missing("Account"))?
1121 .to_string();
1122 validate_string_length("account", &account, 12, 12)?;
1123
1124 let mut state = self.state.write();
1125 if state.partner_event_sources.contains_key(&name) {
1126 return Err(AwsServiceError::aws_error(
1127 StatusCode::CONFLICT,
1128 "ResourceAlreadyExistsException",
1129 format!("Partner event source {name} already exists."),
1130 ));
1131 }
1132 let arn = format!(
1133 "arn:aws:events:{}::event-source/aws.partner/{}",
1134 state.region, name
1135 );
1136 let now = Utc::now();
1137 let ps = PartnerEventSource {
1138 name: name.clone(),
1139 arn: arn.clone(),
1140 account,
1141 creation_time: now,
1142 expiration_time: None,
1143 state: "ACTIVE".to_string(),
1144 };
1145 state.partner_event_sources.insert(name.clone(), ps);
1146
1147 Ok(AwsResponse::ok_json(json!({ "EventSourceArn": arn })))
1148 }
1149
1150 fn delete_partner_event_source(
1151 &self,
1152 req: &AwsRequest,
1153 ) -> Result<AwsResponse, AwsServiceError> {
1154 let body = req.json_body();
1155 validate_required("Name", &body["Name"])?;
1156 let name = body["Name"]
1157 .as_str()
1158 .ok_or_else(|| missing("Name"))?
1159 .to_string();
1160 validate_required("Account", &body["Account"])?;
1161 let account = body["Account"]
1162 .as_str()
1163 .ok_or_else(|| missing("Account"))?
1164 .to_string();
1165
1166 let mut state = self.state.write();
1167 match state.partner_event_sources.get(&name) {
1168 Some(ps) if ps.account == account => {
1169 state.partner_event_sources.remove(&name);
1170 }
1171 Some(_) => {
1172 return Err(AwsServiceError::aws_error(
1173 StatusCode::NOT_FOUND,
1174 "ResourceNotFoundException",
1175 format!("Partner event source {name} does not exist for account {account}."),
1176 ));
1177 }
1178 None => {
1179 return Err(AwsServiceError::aws_error(
1180 StatusCode::NOT_FOUND,
1181 "ResourceNotFoundException",
1182 format!("Partner event source {name} does not exist."),
1183 ));
1184 }
1185 }
1186
1187 Ok(AwsResponse::ok_json(json!({})))
1188 }
1189
1190 fn describe_partner_event_source(
1191 &self,
1192 req: &AwsRequest,
1193 ) -> Result<AwsResponse, AwsServiceError> {
1194 let body = req.json_body();
1195 validate_required("Name", &body["Name"])?;
1196 let name = body["Name"]
1197 .as_str()
1198 .ok_or_else(|| missing("Name"))?
1199 .to_string();
1200 validate_string_length("name", &name, 1, 256)?;
1201
1202 let state = self.state.read();
1203 let ps = state.partner_event_sources.get(&name).ok_or_else(|| {
1204 AwsServiceError::aws_error(
1205 StatusCode::NOT_FOUND,
1206 "ResourceNotFoundException",
1207 format!("Partner event source {name} does not exist."),
1208 )
1209 })?;
1210
1211 Ok(AwsResponse::ok_json(json!({
1212 "Arn": ps.arn,
1213 "Name": ps.name,
1214 })))
1215 }
1216
1217 fn list_partner_event_sources(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1218 let body = req.json_body();
1219 validate_required("namePrefix", &body["NamePrefix"])?;
1220 let name_prefix = body["NamePrefix"]
1221 .as_str()
1222 .ok_or_else(|| missing("NamePrefix"))?;
1223 validate_string_length("namePrefix", name_prefix, 1, 256)?;
1224 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1225 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1226 let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
1227
1228 let state = self.state.read();
1229 let all: Vec<Value> = state
1230 .partner_event_sources
1231 .values()
1232 .filter(|ps| ps.name.starts_with(name_prefix))
1233 .map(|ps| {
1234 json!({
1235 "Arn": ps.arn,
1236 "Name": ps.name,
1237 })
1238 })
1239 .collect();
1240
1241 let (sources, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
1242 let mut resp = json!({ "PartnerEventSources": sources });
1243 if let Some(token) = next_token {
1244 resp["NextToken"] = json!(token);
1245 }
1246
1247 Ok(AwsResponse::ok_json(resp))
1248 }
1249
1250 fn list_partner_event_source_accounts(
1251 &self,
1252 req: &AwsRequest,
1253 ) -> Result<AwsResponse, AwsServiceError> {
1254 let body = req.json_body();
1255 validate_required("EventSourceName", &body["EventSourceName"])?;
1256 let event_source_name = body["EventSourceName"]
1257 .as_str()
1258 .ok_or_else(|| missing("EventSourceName"))?;
1259 validate_string_length("eventSourceName", event_source_name, 1, 256)?;
1260 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1261 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1262
1263 let state = self.state.read();
1264 let accounts: Vec<Value> = state
1265 .partner_event_sources
1266 .values()
1267 .filter(|ps| ps.name == event_source_name)
1268 .map(|ps| json!({ "Account": ps.account }))
1269 .collect();
1270
1271 Ok(AwsResponse::ok_json(json!({
1272 "PartnerEventSourceAccounts": accounts
1273 })))
1274 }
1275
1276 fn activate_event_source(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1277 let body = req.json_body();
1278 validate_required("Name", &body["Name"])?;
1279 let name = body["Name"]
1280 .as_str()
1281 .ok_or_else(|| missing("Name"))?
1282 .to_string();
1283
1284 let mut state = self.state.write();
1285 let ps = state.partner_event_sources.get_mut(&name).ok_or_else(|| {
1286 AwsServiceError::aws_error(
1287 StatusCode::NOT_FOUND,
1288 "ResourceNotFoundException",
1289 format!("Event source {name} does not exist."),
1290 )
1291 })?;
1292 ps.state = "ACTIVE".to_string();
1293
1294 Ok(AwsResponse::ok_json(json!({})))
1295 }
1296
1297 fn deactivate_event_source(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1298 let body = req.json_body();
1299 validate_required("Name", &body["Name"])?;
1300 let name = body["Name"]
1301 .as_str()
1302 .ok_or_else(|| missing("Name"))?
1303 .to_string();
1304
1305 let mut state = self.state.write();
1306 let ps = state.partner_event_sources.get_mut(&name).ok_or_else(|| {
1307 AwsServiceError::aws_error(
1308 StatusCode::NOT_FOUND,
1309 "ResourceNotFoundException",
1310 format!("Event source {name} does not exist."),
1311 )
1312 })?;
1313 ps.state = "INACTIVE".to_string();
1314
1315 Ok(AwsResponse::ok_json(json!({})))
1316 }
1317
1318 fn describe_event_source(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1319 let body = req.json_body();
1320 validate_required("Name", &body["Name"])?;
1321 let name = body["Name"]
1322 .as_str()
1323 .ok_or_else(|| missing("Name"))?
1324 .to_string();
1325
1326 let state = self.state.read();
1327 let ps = state.partner_event_sources.get(&name).ok_or_else(|| {
1328 AwsServiceError::aws_error(
1329 StatusCode::NOT_FOUND,
1330 "ResourceNotFoundException",
1331 format!("Event source {name} does not exist."),
1332 )
1333 })?;
1334
1335 Ok(AwsResponse::ok_json(json!({
1336 "Arn": ps.arn,
1337 "Name": ps.name,
1338 "CreatedBy": ps.account,
1339 "CreationTime": ps.creation_time.timestamp() as f64,
1340 "State": ps.state,
1341 })))
1342 }
1343
1344 fn list_event_sources(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1345 let body = req.json_body();
1346 validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 256)?;
1347 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1348 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1349 let name_prefix = body["NamePrefix"].as_str();
1350 let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
1351
1352 let state = self.state.read();
1353 let all: Vec<Value> = state
1354 .partner_event_sources
1355 .values()
1356 .filter(|ps| match name_prefix {
1357 Some(prefix) => ps.name.starts_with(prefix),
1358 None => true,
1359 })
1360 .map(|ps| {
1361 json!({
1362 "Arn": ps.arn,
1363 "Name": ps.name,
1364 "CreatedBy": ps.account,
1365 "CreationTime": ps.creation_time.timestamp() as f64,
1366 "State": ps.state,
1367 })
1368 })
1369 .collect();
1370
1371 let (sources, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
1372 let mut resp = json!({ "EventSources": sources });
1373 if let Some(token) = next_token {
1374 resp["NextToken"] = json!(token);
1375 }
1376
1377 Ok(AwsResponse::ok_json(resp))
1378 }
1379
1380 fn put_partner_events(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1381 let body = req.json_body();
1382 validate_required("Entries", &body["Entries"])?;
1383 let entries = body["Entries"]
1384 .as_array()
1385 .ok_or_else(|| missing("Entries"))?;
1386
1387 let mut result_entries = Vec::new();
1388 for _entry in entries {
1389 let event_id = uuid::Uuid::new_v4().to_string();
1390 result_entries.push(json!({ "EventId": event_id }));
1391 }
1392
1393 Ok(AwsResponse::ok_json(json!({
1394 "FailedEntryCount": 0,
1395 "Entries": result_entries,
1396 })))
1397 }
1398
1399 fn test_event_pattern(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1402 let body = req.json_body();
1403 validate_required("EventPattern", &body["EventPattern"])?;
1404 validate_required("Event", &body["Event"])?;
1405 let event_pattern = body["EventPattern"]
1406 .as_str()
1407 .ok_or_else(|| missing("EventPattern"))?;
1408 let event_str = body["Event"].as_str().ok_or_else(|| missing("Event"))?;
1409
1410 let event: Value = serde_json::from_str(event_str).map_err(|_| {
1412 AwsServiceError::aws_error(
1413 StatusCode::BAD_REQUEST,
1414 "InvalidEventPatternException",
1415 "Event is not valid JSON.",
1416 )
1417 })?;
1418
1419 let _pattern: Value = serde_json::from_str(event_pattern).map_err(|_| {
1421 AwsServiceError::aws_error(
1422 StatusCode::BAD_REQUEST,
1423 "InvalidEventPatternException",
1424 "Event pattern is not valid JSON.",
1425 )
1426 })?;
1427
1428 let source = event["source"].as_str().unwrap_or("");
1429 let detail_type = event["detail-type"].as_str().unwrap_or("");
1430 let detail = event
1431 .get("detail")
1432 .map(|v| serde_json::to_string(v).unwrap_or_default())
1433 .unwrap_or_else(|| "{}".to_string());
1434 let account = event["account"].as_str().unwrap_or("");
1435 let region = event["region"].as_str().unwrap_or("");
1436 let resources: Vec<String> = event["resources"]
1437 .as_array()
1438 .map(|arr| {
1439 arr.iter()
1440 .filter_map(|v| v.as_str().map(|s| s.to_string()))
1441 .collect()
1442 })
1443 .unwrap_or_default();
1444
1445 let result = matches_pattern(
1446 Some(event_pattern),
1447 source,
1448 detail_type,
1449 &detail,
1450 account,
1451 region,
1452 &resources,
1453 );
1454
1455 Ok(AwsResponse::ok_json(json!({ "Result": result })))
1456 }
1457
1458 fn update_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1461 let body = req.json_body();
1462 validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
1463 validate_optional_string_length(
1464 "kmsKeyIdentifier",
1465 body["KmsKeyIdentifier"].as_str(),
1466 0,
1467 2048,
1468 )?;
1469 let name = body["Name"].as_str().unwrap_or("default");
1470
1471 let mut state = self.state.write();
1472 let bus = state.buses.get_mut(name).ok_or_else(|| {
1473 AwsServiceError::aws_error(
1474 StatusCode::BAD_REQUEST,
1475 "ResourceNotFoundException",
1476 format!("Event bus {name} does not exist."),
1477 )
1478 })?;
1479
1480 if let Some(desc) = body["Description"].as_str() {
1481 bus.description = Some(desc.to_string());
1482 }
1483 if let Some(kms) = body["KmsKeyIdentifier"].as_str() {
1484 bus.kms_key_identifier = Some(kms.to_string());
1485 }
1486 if let Some(dlc) = body.get("DeadLetterConfig") {
1487 bus.dead_letter_config = Some(dlc.clone());
1488 }
1489 bus.last_modified_time = Utc::now();
1490
1491 let arn = bus.arn.clone();
1492 let bus_name = bus.name.clone();
1493
1494 Ok(AwsResponse::ok_json(json!({
1495 "Arn": arn,
1496 "Name": bus_name,
1497 })))
1498 }
1499
1500 fn create_endpoint(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1503 let body = req.json_body();
1504 validate_required("Name", &body["Name"])?;
1505 let name = body["Name"]
1506 .as_str()
1507 .ok_or_else(|| missing("Name"))?
1508 .to_string();
1509 validate_string_length("name", &name, 1, 64)?;
1510 validate_required("RoutingConfig", &body["RoutingConfig"])?;
1511 validate_required("EventBuses", &body["EventBuses"])?;
1512
1513 let description = body["Description"].as_str().map(|s| s.to_string());
1514 let routing_config = body["RoutingConfig"].clone();
1515 let replication_config = body.get("ReplicationConfig").cloned();
1516 let event_buses = body["EventBuses"].as_array().cloned().unwrap_or_default();
1517 let role_arn = body["RoleArn"].as_str().map(|s| s.to_string());
1518
1519 let mut state = self.state.write();
1520 if state.endpoints.contains_key(&name) {
1521 return Err(AwsServiceError::aws_error(
1522 StatusCode::CONFLICT,
1523 "ResourceAlreadyExistsException",
1524 format!("Endpoint {name} already exists."),
1525 ));
1526 }
1527
1528 let endpoint_id = format!("{}.abc123", name);
1529 let arn = format!(
1530 "arn:aws:events:{}:{}:endpoint/{}",
1531 req.region, state.account_id, name
1532 );
1533 let endpoint_url = format!(
1534 "https://{}.endpoint.events.{}.amazonaws.com",
1535 endpoint_id, req.region
1536 );
1537 let now = Utc::now();
1538
1539 let endpoint = Endpoint {
1540 name: name.clone(),
1541 arn: arn.clone(),
1542 endpoint_id: endpoint_id.clone(),
1543 endpoint_url: Some(endpoint_url),
1544 description,
1545 routing_config: routing_config.clone(),
1546 replication_config: replication_config.clone(),
1547 event_buses: event_buses.clone(),
1548 role_arn: role_arn.clone(),
1549 state: "ACTIVE".to_string(),
1550 creation_time: now,
1551 last_modified_time: now,
1552 };
1553 state.endpoints.insert(name.clone(), endpoint);
1554
1555 let mut resp = json!({
1556 "Name": name,
1557 "Arn": arn,
1558 "State": "ACTIVE",
1559 "RoutingConfig": routing_config,
1560 "EventBuses": event_buses,
1561 });
1562 if let Some(ref rc) = replication_config {
1563 resp["ReplicationConfig"] = rc.clone();
1564 }
1565 if let Some(ref ra) = role_arn {
1566 resp["RoleArn"] = json!(ra);
1567 }
1568
1569 Ok(AwsResponse::ok_json(resp))
1570 }
1571
1572 fn delete_endpoint(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1573 let body = req.json_body();
1574 validate_required("Name", &body["Name"])?;
1575 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
1576
1577 let mut state = self.state.write();
1578 state.endpoints.remove(name).ok_or_else(|| {
1579 AwsServiceError::aws_error(
1580 StatusCode::BAD_REQUEST,
1581 "ResourceNotFoundException",
1582 format!("Endpoint '{name}' does not exist."),
1583 )
1584 })?;
1585
1586 Ok(AwsResponse::ok_json(json!({})))
1587 }
1588
1589 fn describe_endpoint(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1590 let body = req.json_body();
1591 validate_required("Name", &body["Name"])?;
1592 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
1593
1594 let state = self.state.read();
1595 let ep = state.endpoints.get(name).ok_or_else(|| {
1596 AwsServiceError::aws_error(
1597 StatusCode::BAD_REQUEST,
1598 "ResourceNotFoundException",
1599 format!("Endpoint '{name}' does not exist."),
1600 )
1601 })?;
1602
1603 let mut resp = json!({
1604 "Name": ep.name,
1605 "Arn": ep.arn,
1606 "EndpointId": ep.endpoint_id,
1607 "State": ep.state,
1608 "RoutingConfig": ep.routing_config,
1609 "EventBuses": ep.event_buses,
1610 "CreationTime": ep.creation_time.timestamp() as f64,
1611 "LastModifiedTime": ep.last_modified_time.timestamp() as f64,
1612 });
1613 if let Some(ref url) = ep.endpoint_url {
1614 resp["EndpointUrl"] = json!(url);
1615 }
1616 if let Some(ref desc) = ep.description {
1617 resp["Description"] = json!(desc);
1618 }
1619 if let Some(ref rc) = ep.replication_config {
1620 resp["ReplicationConfig"] = rc.clone();
1621 }
1622 if let Some(ref ra) = ep.role_arn {
1623 resp["RoleArn"] = json!(ra);
1624 }
1625
1626 Ok(AwsResponse::ok_json(resp))
1627 }
1628
1629 fn list_endpoints(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1630 let body = req.json_body();
1631 validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 64)?;
1632 validate_optional_string_length("homeRegion", body["HomeRegion"].as_str(), 9, 20)?;
1633 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1634 validate_optional_range_i64("maxResults", body["MaxResults"].as_i64(), 1, 100)?;
1635 let name_prefix = body["NamePrefix"].as_str();
1636 let limit = body["MaxResults"].as_i64().unwrap_or(100) as usize;
1637
1638 let state = self.state.read();
1639 let all: Vec<Value> = state
1640 .endpoints
1641 .values()
1642 .filter(|ep| match name_prefix {
1643 Some(prefix) => ep.name.starts_with(prefix),
1644 None => true,
1645 })
1646 .map(|ep| {
1647 let mut obj = json!({
1648 "Name": ep.name,
1649 "Arn": ep.arn,
1650 "EndpointId": ep.endpoint_id,
1651 "State": ep.state,
1652 "RoutingConfig": ep.routing_config,
1653 "EventBuses": ep.event_buses,
1654 "CreationTime": ep.creation_time.timestamp() as f64,
1655 "LastModifiedTime": ep.last_modified_time.timestamp() as f64,
1656 });
1657 if let Some(ref url) = ep.endpoint_url {
1658 obj["EndpointUrl"] = json!(url);
1659 }
1660 obj
1661 })
1662 .collect();
1663
1664 let (endpoints, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
1665 let mut resp = json!({ "Endpoints": endpoints });
1666 if let Some(token) = next_token {
1667 resp["NextToken"] = json!(token);
1668 }
1669
1670 Ok(AwsResponse::ok_json(resp))
1671 }
1672
1673 fn update_endpoint(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1674 let body = req.json_body();
1675 validate_required("Name", &body["Name"])?;
1676 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
1677
1678 let mut state = self.state.write();
1679 let ep = state.endpoints.get_mut(name).ok_or_else(|| {
1680 AwsServiceError::aws_error(
1681 StatusCode::BAD_REQUEST,
1682 "ResourceNotFoundException",
1683 format!("Endpoint '{name}' does not exist."),
1684 )
1685 })?;
1686
1687 if let Some(desc) = body["Description"].as_str() {
1688 ep.description = Some(desc.to_string());
1689 }
1690 if !body["RoutingConfig"].is_null() {
1691 ep.routing_config = body["RoutingConfig"].clone();
1692 }
1693 if let Some(rc) = body.get("ReplicationConfig") {
1694 ep.replication_config = Some(rc.clone());
1695 }
1696 if let Some(buses) = body["EventBuses"].as_array() {
1697 ep.event_buses = buses.clone();
1698 }
1699 if let Some(ra) = body["RoleArn"].as_str() {
1700 ep.role_arn = Some(ra.to_string());
1701 }
1702 ep.last_modified_time = Utc::now();
1703
1704 let resp = json!({
1705 "Name": ep.name,
1706 "Arn": ep.arn,
1707 "EndpointId": ep.endpoint_id,
1708 "State": ep.state,
1709 "RoutingConfig": ep.routing_config,
1710 "EventBuses": ep.event_buses,
1711 });
1712
1713 Ok(AwsResponse::ok_json(resp))
1714 }
1715
1716 fn deauthorize_connection(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1719 let body = req.json_body();
1720 validate_required("Name", &body["Name"])?;
1721 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
1722 validate_string_length("name", name, 1, 64)?;
1723
1724 let mut state = self.state.write();
1725 let conn = state.connections.get_mut(name).ok_or_else(|| {
1726 AwsServiceError::aws_error(
1727 StatusCode::BAD_REQUEST,
1728 "ResourceNotFoundException",
1729 format!("Connection '{name}' does not exist."),
1730 )
1731 })?;
1732
1733 conn.connection_state = "DEAUTHORIZING".to_string();
1734 conn.last_modified_time = Utc::now();
1735
1736 let resp = json!({
1737 "ConnectionArn": conn.arn,
1738 "ConnectionState": conn.connection_state,
1739 "CreationTime": conn.creation_time.timestamp() as f64,
1740 "LastModifiedTime": conn.last_modified_time.timestamp() as f64,
1741 "LastAuthorizedTime": conn.last_authorized_time.timestamp() as f64,
1742 });
1743
1744 Ok(AwsResponse::ok_json(resp))
1745 }
1746
1747 fn put_events(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1750 let body = req.json_body();
1751 validate_required("Entries", &body["Entries"])?;
1752 validate_optional_string_length("endpointId", body["EndpointId"].as_str(), 1, 50)?;
1753 let entries = body["Entries"]
1754 .as_array()
1755 .ok_or_else(|| missing("Entries"))?;
1756
1757 if entries.is_empty() {
1759 return Err(AwsServiceError::aws_error(
1760 StatusCode::BAD_REQUEST,
1761 "ValidationException",
1762 "1 validation error detected: Value '[PutEventsRequestEntry]' at 'entries' failed to satisfy constraint: Member must have length greater than or equal to 1",
1763 ));
1764 }
1765 if entries.len() > 10 {
1766 return Err(AwsServiceError::aws_error(
1767 StatusCode::BAD_REQUEST,
1768 "ValidationException",
1769 "1 validation error detected: Value '[PutEventsRequestEntry]' at 'entries' failed to satisfy constraint: Member must have length less than or equal to 10",
1770 ));
1771 }
1772
1773 let mut state = self.state.write();
1774 let mut result_entries = Vec::new();
1775 let mut events_to_deliver = Vec::new();
1776 let mut failed_count = 0;
1777
1778 for entry in entries {
1779 let source = entry["Source"].as_str().unwrap_or("").to_string();
1780 let detail_type = entry["DetailType"].as_str().unwrap_or("").to_string();
1781 let detail = entry["Detail"].as_str().unwrap_or("").to_string();
1782
1783 if let Err(error) = validate_put_events_entry(&source, &detail_type, &detail) {
1784 failed_count += 1;
1785 result_entries.push(error);
1786 continue;
1787 }
1788
1789 let event_id = uuid::Uuid::new_v4().to_string();
1790 let raw_bus = entry["EventBusName"]
1791 .as_str()
1792 .unwrap_or("default")
1793 .to_string();
1794 let event_bus_name = state.resolve_bus_name(&raw_bus);
1795 let time = parse_put_events_time(&entry["Time"]);
1796 let resources: Vec<String> = entry["Resources"]
1797 .as_array()
1798 .map(|arr| {
1799 arr.iter()
1800 .filter_map(|v| v.as_str().map(|s| s.to_string()))
1801 .collect()
1802 })
1803 .unwrap_or_default();
1804
1805 let event = PutEvent {
1806 event_id: event_id.clone(),
1807 source: source.clone(),
1808 detail_type: detail_type.clone(),
1809 detail: detail.clone(),
1810 event_bus_name: event_bus_name.clone(),
1811 time,
1812 resources: resources.clone(),
1813 };
1814
1815 archive_matching_event(
1816 &mut state,
1817 &event,
1818 &event_bus_name,
1819 &source,
1820 &detail_type,
1821 &detail,
1822 &req.account_id,
1823 &req.region,
1824 &resources,
1825 );
1826
1827 state.events.push(event);
1828
1829 let matching_targets: Vec<EventTarget> = state
1831 .rules
1832 .values()
1833 .filter(|r| {
1834 r.event_bus_name == event_bus_name
1835 && r.state == "ENABLED"
1836 && matches_pattern(
1837 r.event_pattern.as_deref(),
1838 &source,
1839 &detail_type,
1840 &detail,
1841 &req.account_id,
1842 &req.region,
1843 &resources,
1844 )
1845 })
1846 .flat_map(|r| r.targets.clone())
1847 .collect();
1848
1849 if !matching_targets.is_empty() {
1850 events_to_deliver.push((
1851 event_id.clone(),
1852 source,
1853 detail_type,
1854 detail,
1855 time,
1856 resources,
1857 matching_targets,
1858 ));
1859 }
1860
1861 result_entries.push(json!({ "EventId": event_id }));
1862 }
1863
1864 drop(state);
1866
1867 for (event_id, source, detail_type, detail, time, resources, targets) in events_to_deliver {
1869 let detail_value: Value = serde_json::from_str(&detail).unwrap_or(json!({}));
1870 let event_json = json!({
1871 "version": "0",
1872 "id": event_id,
1873 "source": source,
1874 "account": req.account_id,
1875 "detail-type": detail_type,
1876 "detail": detail_value,
1877 "time": time.format("%Y-%m-%dT%H:%M:%SZ").to_string(),
1878 "region": req.region,
1879 "resources": resources,
1880 });
1881 let event_str = event_json.to_string();
1882
1883 for target in targets {
1884 let arn = &target.arn;
1885 let body_str = if let Some(ref transformer) = target.input_transformer {
1887 apply_input_transformer(transformer, &event_json)
1888 } else if let Some(ref input) = target.input {
1889 input.clone()
1890 } else if let Some(ref input_path) = target.input_path {
1891 resolve_json_path(&event_json, input_path)
1892 .map(|v| v.to_string())
1893 .unwrap_or_else(|| event_str.clone())
1894 } else {
1895 event_str.clone()
1896 };
1897
1898 if arn.contains(":sqs:") {
1899 let group_id = target
1901 .sqs_parameters
1902 .as_ref()
1903 .and_then(|p| p["MessageGroupId"].as_str())
1904 .map(|s| s.to_string());
1905 if group_id.is_some() {
1906 self.delivery.send_to_sqs_with_attrs(
1910 arn,
1911 &body_str,
1912 &HashMap::new(),
1913 group_id.as_deref(),
1914 None,
1915 );
1916 } else {
1917 self.delivery.send_to_sqs(arn, &body_str, &HashMap::new());
1918 }
1919 } else if arn.contains(":sns:") {
1920 self.delivery
1921 .publish_to_sns(arn, &body_str, Some(&detail_type));
1922 } else if arn.contains(":lambda:") {
1923 tracing::info!(
1924 function_arn = %arn,
1925 payload = %body_str,
1926 "EventBridge delivering to Lambda function"
1927 );
1928 let now = Utc::now();
1929 let mut state = self.state.write();
1930 state
1931 .lambda_invocations
1932 .push(crate::state::LambdaInvocation {
1933 function_arn: arn.clone(),
1934 payload: body_str.clone(),
1935 timestamp: now,
1936 });
1937 drop(state);
1938 if let Some(ref ls) = self.lambda_state {
1940 ls.write().invocations.push(LambdaInvocation {
1941 function_arn: arn.clone(),
1942 payload: body_str.clone(),
1943 timestamp: now,
1944 source: "aws:events".to_string(),
1945 });
1946 }
1947 invoke_lambda_async(
1949 &self.container_runtime,
1950 &self.lambda_state,
1951 arn,
1952 &body_str,
1953 );
1954 } else if arn.contains(":logs:") {
1955 tracing::info!(
1956 log_group_arn = %arn,
1957 payload = %body_str,
1958 "EventBridge delivering to CloudWatch Logs"
1959 );
1960 let now = Utc::now();
1961 let mut state = self.state.write();
1962 state.log_deliveries.push(crate::state::LogDelivery {
1963 log_group_arn: arn.clone(),
1964 payload: body_str.clone(),
1965 timestamp: now,
1966 });
1967 drop(state);
1968 if let Some(ref log_state) = self.logs_state {
1970 deliver_to_logs(log_state, arn, &body_str, now);
1971 }
1972 } else if arn.contains(":kinesis:") {
1973 tracing::info!(
1974 stream_arn = %arn,
1975 "EventBridge delivering to Kinesis stream"
1976 );
1977 self.delivery.send_to_kinesis(arn, &body_str, &event_id);
1979 } else if arn.contains(":states:") {
1980 tracing::info!(
1981 state_machine_arn = %arn,
1982 "EventBridge delivering to Step Functions"
1983 );
1984 self.delivery.start_stepfunctions_execution(arn, &body_str);
1985 let mut state = self.state.write();
1986 state
1987 .step_function_executions
1988 .push(crate::state::StepFunctionExecution {
1989 state_machine_arn: arn.clone(),
1990 payload: body_str.clone(),
1991 timestamp: Utc::now(),
1992 });
1993 } else if arn.contains(":api-destination/") {
1994 let state = self.state.read();
1996 let dest = state.api_destinations.values().find(|d| d.arn == *arn);
1997 if let Some(dest) = dest {
1998 let url = dest.invocation_endpoint.clone();
1999 let method = dest.http_method.clone();
2000 let conn = state
2001 .connections
2002 .values()
2003 .find(|c| c.arn == dest.connection_arn)
2004 .cloned();
2005 drop(state);
2006
2007 let payload = body_str.clone();
2008 tokio::spawn(async move {
2009 let client = reqwest::Client::new();
2010 let mut req_builder = match method.as_str() {
2011 "GET" => client.get(&url),
2012 "PUT" => client.put(&url),
2013 "DELETE" => client.delete(&url),
2014 "PATCH" => client.patch(&url),
2015 "HEAD" => client.head(&url),
2016 _ => client.post(&url),
2017 };
2018 req_builder = req_builder.header("Content-Type", "application/json");
2019
2020 if let Some(conn) = conn {
2022 req_builder = apply_connection_auth(req_builder, &conn);
2023 }
2024
2025 let result = req_builder.body(payload).send().await;
2026 if let Err(e) = result {
2027 tracing::warn!(
2028 endpoint = %url,
2029 error = %e,
2030 "EventBridge ApiDestination delivery failed"
2031 );
2032 }
2033 });
2034 }
2035 } else if arn.starts_with("https://") || arn.starts_with("http://") {
2036 let url = arn.clone();
2038 let payload = body_str.clone();
2039 tokio::spawn(async move {
2040 let client = reqwest::Client::new();
2041 let result = client
2042 .post(&url)
2043 .header("Content-Type", "application/json")
2044 .body(payload)
2045 .send()
2046 .await;
2047 if let Err(e) = result {
2048 tracing::warn!(
2049 endpoint = %url,
2050 error = %e,
2051 "EventBridge HTTP target delivery failed"
2052 );
2053 }
2054 });
2055 }
2056 }
2057 }
2058
2059 let resp = json!({
2060 "FailedEntryCount": failed_count,
2061 "Entries": result_entries,
2062 });
2063
2064 Ok(AwsResponse::ok_json(resp))
2065 }
2066
2067 fn tag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2070 let body = req.json_body();
2071 validate_required("ResourceARN", &body["ResourceARN"])?;
2072 let arn = body["ResourceARN"]
2073 .as_str()
2074 .ok_or_else(|| missing("ResourceARN"))?;
2075 validate_string_length("resourceARN", arn, 1, 1600)?;
2076 validate_required("Tags", &body["Tags"])?;
2077
2078 let mut state = self.state.write();
2079 let tag_map = find_tags_mut(&mut state, arn)?;
2080
2081 fakecloud_core::tags::apply_tags(tag_map, &body, "Tags", "Key", "Value").map_err(|f| {
2082 AwsServiceError::aws_error(
2083 StatusCode::BAD_REQUEST,
2084 "ValidationException",
2085 format!("{f} must be a list"),
2086 )
2087 })?;
2088
2089 Ok(AwsResponse::ok_json(json!({})))
2090 }
2091
2092 fn untag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2093 let body = req.json_body();
2094 validate_required("ResourceARN", &body["ResourceARN"])?;
2095 let arn = body["ResourceARN"]
2096 .as_str()
2097 .ok_or_else(|| missing("ResourceARN"))?;
2098 validate_string_length("resourceARN", arn, 1, 1600)?;
2099 validate_required("TagKeys", &body["TagKeys"])?;
2100
2101 let mut state = self.state.write();
2102 let tag_map = find_tags_mut(&mut state, arn)?;
2103
2104 fakecloud_core::tags::remove_tags(tag_map, &body, "TagKeys").map_err(|f| {
2105 AwsServiceError::aws_error(
2106 StatusCode::BAD_REQUEST,
2107 "ValidationException",
2108 format!("{f} must be a list"),
2109 )
2110 })?;
2111
2112 Ok(AwsResponse::ok_json(json!({})))
2113 }
2114
2115 fn list_tags_for_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2116 let body = req.json_body();
2117 validate_required("ResourceARN", &body["ResourceARN"])?;
2118 let arn = body["ResourceARN"]
2119 .as_str()
2120 .ok_or_else(|| missing("ResourceARN"))?;
2121 validate_string_length("resourceARN", arn, 1, 1600)?;
2122
2123 let state = self.state.read();
2124 let tag_map = find_tags(&state, arn)?;
2125
2126 let tags = fakecloud_core::tags::tags_to_json(tag_map, "Key", "Value");
2127
2128 Ok(AwsResponse::ok_json(json!({ "Tags": tags })))
2129 }
2130
2131 fn create_archive(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2134 let body = req.json_body();
2135 validate_required("ArchiveName", &body["ArchiveName"])?;
2136 let name = body["ArchiveName"]
2137 .as_str()
2138 .ok_or_else(|| missing("ArchiveName"))?
2139 .to_string();
2140 validate_string_length("archiveName", &name, 1, 48)?;
2141 validate_required("EventSourceArn", &body["EventSourceArn"])?;
2142 let event_source_arn = body["EventSourceArn"]
2143 .as_str()
2144 .ok_or_else(|| missing("EventSourceArn"))?
2145 .to_string();
2146 validate_string_length("eventSourceArn", &event_source_arn, 1, 1600)?;
2147 validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2148 validate_optional_string_length("eventPattern", body["EventPattern"].as_str(), 0, 4096)?;
2149 if let Some(rd) = body["RetentionDays"].as_i64() {
2150 validate_range_i64("retentionDays", rd, 0, i64::MAX)?;
2151 }
2152 let description = body["Description"].as_str().map(|s| s.to_string());
2153 let event_pattern = body["EventPattern"].as_str().map(|s| s.to_string());
2154 let retention_days = body["RetentionDays"].as_i64().unwrap_or(0);
2155
2156 if let Some(ref pattern) = event_pattern {
2158 validate_event_pattern(pattern)?;
2159 }
2160
2161 let mut state = self.state.write();
2162
2163 let bus_name = state.resolve_bus_name(&event_source_arn);
2165 if !state.buses.contains_key(&bus_name) {
2166 return Err(AwsServiceError::aws_error(
2167 StatusCode::BAD_REQUEST,
2168 "ResourceNotFoundException",
2169 format!("Event bus {bus_name} does not exist."),
2170 ));
2171 }
2172
2173 if state.archives.contains_key(&name) {
2175 return Err(AwsServiceError::aws_error(
2176 StatusCode::BAD_REQUEST,
2177 "ResourceAlreadyExistsException",
2178 format!("Archive {name} already exists."),
2179 ));
2180 }
2181
2182 let now = Utc::now();
2183 let arn = format!(
2184 "arn:aws:events:{}:{}:archive/{}",
2185 req.region, state.account_id, name
2186 );
2187
2188 let archive = Archive {
2189 name: name.clone(),
2190 arn: arn.clone(),
2191 event_source_arn: event_source_arn.clone(),
2192 description,
2193 event_pattern: event_pattern.clone(),
2194 retention_days,
2195 state: "ENABLED".to_string(),
2196 creation_time: now,
2197 event_count: 0,
2198 size_bytes: 0,
2199 events: Vec::new(),
2200 };
2201 state.archives.insert(name.clone(), archive);
2202
2203 let rule_name = format!("Events-Archive-{name}");
2205 let rule_arn = format!(
2206 "arn:aws:events:{}:{}:rule/{}",
2207 req.region, state.account_id, rule_name
2208 );
2209 let rule_event_pattern = {
2211 let mut merged = if let Some(ref ep) = event_pattern {
2212 serde_json::from_str::<Value>(ep).unwrap_or_else(|_| json!({}))
2213 } else {
2214 json!({})
2215 };
2216 if let Some(obj) = merged.as_object_mut() {
2217 obj.insert("replay-name".to_string(), json!([{"exists": false}]));
2218 }
2219 serde_json::to_string(&merged).unwrap_or_default()
2220 };
2221
2222 let archive_target = EventTarget {
2224 id: name.clone(),
2225 arn: format!("arn:aws:events:{}:::", req.region),
2226 input: None,
2227 input_path: None,
2228 input_transformer: Some(json!({
2229 "InputPathsMap": {},
2230 "InputTemplate": format!(
2231 "{{\"archive-arn\": \"{}\", \"event\": <aws.events.event.json>, \"ingestion-time\": <aws.events.event.ingestion-time>}}",
2232 arn
2233 )
2234 })),
2235 sqs_parameters: None,
2236 };
2237
2238 let archive_rule = EventRule {
2239 name: rule_name.clone(),
2240 arn: rule_arn,
2241 event_bus_name: bus_name.clone(),
2242 event_pattern: Some(rule_event_pattern),
2243 schedule_expression: None,
2244 state: "ENABLED".to_string(),
2245 description: None,
2246 role_arn: None,
2247 managed_by: Some("prod.vhs.events.aws.internal".to_string()),
2248 created_by: Some(state.account_id.clone()),
2249 targets: vec![archive_target],
2250 tags: HashMap::new(),
2251 last_fired: None,
2252 };
2253 let key = (bus_name, rule_name);
2254 state.rules.insert(key, archive_rule);
2255
2256 Ok(AwsResponse::ok_json(json!({
2257 "ArchiveArn": arn,
2258 "CreationTime": now.timestamp() as f64,
2259 "State": "ENABLED",
2260 })))
2261 }
2262
2263 fn describe_archive(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2264 let body = req.json_body();
2265 validate_required("ArchiveName", &body["ArchiveName"])?;
2266 let name = body["ArchiveName"]
2267 .as_str()
2268 .ok_or_else(|| missing("ArchiveName"))?;
2269 validate_string_length("archiveName", name, 1, 48)?;
2270
2271 let state = self.state.read();
2272 let archive = state.archives.get(name).ok_or_else(|| {
2273 AwsServiceError::aws_error(
2274 StatusCode::BAD_REQUEST,
2275 "ResourceNotFoundException",
2276 format!("Archive {name} does not exist."),
2277 )
2278 })?;
2279
2280 let mut resp = json!({
2281 "ArchiveArn": archive.arn,
2282 "ArchiveName": archive.name,
2283 "CreationTime": archive.creation_time.timestamp() as f64,
2284 "EventCount": archive.event_count,
2285 "EventSourceArn": archive.event_source_arn,
2286 "RetentionDays": archive.retention_days,
2287 "SizeBytes": archive.size_bytes,
2288 "State": archive.state,
2289 });
2290 if let Some(ref desc) = archive.description {
2291 resp["Description"] = json!(desc);
2292 }
2293 if let Some(ref ep) = archive.event_pattern {
2294 resp["EventPattern"] = json!(ep);
2295 }
2296
2297 Ok(AwsResponse::ok_json(resp))
2298 }
2299
2300 fn list_archives(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2301 let body = req.json_body();
2302 validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 48)?;
2303 validate_optional_string_length(
2304 "eventSourceArn",
2305 body["EventSourceArn"].as_str(),
2306 1,
2307 1600,
2308 )?;
2309 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
2310 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
2311 let name_prefix = body["NamePrefix"].as_str();
2312 let source_arn = body["EventSourceArn"].as_str();
2313 let archive_state = body["State"].as_str();
2314
2315 let filter_count = [
2317 name_prefix.is_some(),
2318 source_arn.is_some(),
2319 archive_state.is_some(),
2320 ]
2321 .iter()
2322 .filter(|&&x| x)
2323 .count();
2324 if filter_count > 1 {
2325 return Err(AwsServiceError::aws_error(
2326 StatusCode::BAD_REQUEST,
2327 "ValidationException",
2328 "At most one filter is allowed for ListArchives. Use either : State, EventSourceArn, or NamePrefix.",
2329 ));
2330 }
2331
2332 if let Some(s) = archive_state {
2334 let valid = [
2335 "ENABLED",
2336 "DISABLED",
2337 "CREATING",
2338 "UPDATING",
2339 "CREATE_FAILED",
2340 "UPDATE_FAILED",
2341 ];
2342 if !valid.contains(&s) {
2343 return Err(AwsServiceError::aws_error(
2344 StatusCode::BAD_REQUEST,
2345 "ValidationException",
2346 format!(
2347 "1 validation error detected: Value '{}' at 'state' failed to satisfy constraint: Member must satisfy enum value set: [ENABLED, DISABLED, CREATING, UPDATING, CREATE_FAILED, UPDATE_FAILED]",
2348 s
2349 ),
2350 ));
2351 }
2352 }
2353
2354 let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
2355
2356 let state = self.state.read();
2357 let all: Vec<Value> = state
2358 .archives
2359 .values()
2360 .filter(|a| {
2361 if let Some(prefix) = name_prefix {
2362 a.name.starts_with(prefix)
2363 } else if let Some(arn) = source_arn {
2364 a.event_source_arn == arn
2365 } else if let Some(s) = archive_state {
2366 a.state == s
2367 } else {
2368 true
2369 }
2370 })
2371 .map(|a| {
2372 json!({
2373 "ArchiveName": a.name,
2374 "CreationTime": a.creation_time.timestamp() as f64,
2375 "EventCount": a.event_count,
2376 "EventSourceArn": a.event_source_arn,
2377 "RetentionDays": a.retention_days,
2378 "SizeBytes": a.size_bytes,
2379 "State": a.state,
2380 })
2381 })
2382 .collect();
2383
2384 let (archives, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
2385 let mut resp = json!({ "Archives": archives });
2386 if let Some(token) = next_token {
2387 resp["NextToken"] = json!(token);
2388 }
2389
2390 Ok(AwsResponse::ok_json(resp))
2391 }
2392
2393 fn update_archive(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2394 let body = req.json_body();
2395 validate_required("ArchiveName", &body["ArchiveName"])?;
2396 let name = body["ArchiveName"]
2397 .as_str()
2398 .ok_or_else(|| missing("ArchiveName"))?;
2399 validate_string_length("archiveName", name, 1, 48)?;
2400 validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2401 validate_optional_string_length("eventPattern", body["EventPattern"].as_str(), 0, 4096)?;
2402 if let Some(rd) = body["RetentionDays"].as_i64() {
2403 validate_range_i64("retentionDays", rd, 0, i64::MAX)?;
2404 }
2405
2406 if let Some(pattern) = body["EventPattern"].as_str() {
2408 validate_event_pattern(pattern)?;
2409 }
2410
2411 let mut state = self.state.write();
2412 let archive = state.archives.get_mut(name).ok_or_else(|| {
2413 AwsServiceError::aws_error(
2414 StatusCode::BAD_REQUEST,
2415 "ResourceNotFoundException",
2416 format!("Archive {name} does not exist."),
2417 )
2418 })?;
2419
2420 if let Some(desc) = body["Description"].as_str() {
2421 archive.description = Some(desc.to_string());
2422 }
2423 if let Some(pattern) = body["EventPattern"].as_str() {
2424 archive.event_pattern = Some(pattern.to_string());
2425 }
2426 if let Some(days) = body["RetentionDays"].as_i64() {
2427 archive.retention_days = days;
2428 }
2429
2430 Ok(AwsResponse::ok_json(json!({
2431 "ArchiveArn": archive.arn,
2432 "CreationTime": archive.creation_time.timestamp() as f64,
2433 "State": archive.state,
2434 })))
2435 }
2436
2437 fn delete_archive(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2438 let body = req.json_body();
2439 validate_required("ArchiveName", &body["ArchiveName"])?;
2440 let name = body["ArchiveName"]
2441 .as_str()
2442 .ok_or_else(|| missing("ArchiveName"))?;
2443 validate_string_length("archiveName", name, 1, 48)?;
2444
2445 let mut state = self.state.write();
2446 if !state.archives.contains_key(name) {
2447 return Err(AwsServiceError::aws_error(
2448 StatusCode::BAD_REQUEST,
2449 "ResourceNotFoundException",
2450 format!("Archive {name} does not exist."),
2451 ));
2452 }
2453
2454 state.archives.remove(name);
2455
2456 let rule_name = format!("Events-Archive-{name}");
2458 state.rules.retain(|k, _| k.1 != rule_name);
2459
2460 Ok(AwsResponse::ok_json(json!({})))
2461 }
2462
2463 fn create_connection(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2466 let body = req.json_body();
2467 validate_required("Name", &body["Name"])?;
2468 let name = body["Name"]
2469 .as_str()
2470 .ok_or_else(|| missing("Name"))?
2471 .to_string();
2472 validate_string_length("name", &name, 1, 64)?;
2473 validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2474 validate_required("AuthorizationType", &body["AuthorizationType"])?;
2475 let description = body["Description"].as_str().map(|s| s.to_string());
2476 let auth_type = body["AuthorizationType"]
2477 .as_str()
2478 .ok_or_else(|| missing("AuthorizationType"))?
2479 .to_string();
2480 validate_enum(
2481 "authorizationType",
2482 &auth_type,
2483 &["BASIC", "OAUTH_CLIENT_CREDENTIALS", "API_KEY"],
2484 )?;
2485 validate_optional_string_length(
2486 "kmsKeyIdentifier",
2487 body["KmsKeyIdentifier"].as_str(),
2488 0,
2489 2048,
2490 )?;
2491 validate_required("AuthParameters", &body["AuthParameters"])?;
2492 let auth_params = body["AuthParameters"].clone();
2493
2494 let mut state = self.state.write();
2495 let now = Utc::now();
2496 let conn_uuid = uuid::Uuid::new_v4();
2497 let arn = format!(
2498 "arn:aws:events:{}:{}:connection/{}/{}",
2499 req.region, state.account_id, name, conn_uuid
2500 );
2501 let secret_arn = format!(
2502 "arn:aws:secretsmanager:{}:{}:secret:events!connection/{}/{}",
2503 req.region, state.account_id, name, conn_uuid
2504 );
2505
2506 let conn = Connection {
2507 name: name.clone(),
2508 arn: arn.clone(),
2509 description,
2510 authorization_type: auth_type.clone(),
2511 auth_parameters: auth_params,
2512 connection_state: "AUTHORIZED".to_string(),
2513 secret_arn: secret_arn.clone(),
2514 creation_time: now,
2515 last_modified_time: now,
2516 last_authorized_time: now,
2517 };
2518 state.connections.insert(name, conn);
2519
2520 Ok(AwsResponse::ok_json(json!({
2521 "ConnectionArn": arn,
2522 "ConnectionState": "AUTHORIZED",
2523 "CreationTime": now.timestamp() as f64,
2524 "LastModifiedTime": now.timestamp() as f64,
2525 })))
2526 }
2527
2528 fn describe_connection(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2529 let body = req.json_body();
2530 validate_required("Name", &body["Name"])?;
2531 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
2532 validate_string_length("name", name, 1, 64)?;
2533
2534 let state = self.state.read();
2535 let conn = state.connections.get(name).ok_or_else(|| {
2536 AwsServiceError::aws_error(
2537 StatusCode::BAD_REQUEST,
2538 "ResourceNotFoundException",
2539 format!("Connection '{name}' does not exist."),
2540 )
2541 })?;
2542
2543 let auth_params_response =
2545 build_auth_params_response(&conn.authorization_type, &conn.auth_parameters);
2546
2547 let mut resp = json!({
2548 "ConnectionArn": conn.arn,
2549 "Name": conn.name,
2550 "AuthorizationType": conn.authorization_type,
2551 "AuthParameters": auth_params_response,
2552 "ConnectionState": conn.connection_state,
2553 "SecretArn": conn.secret_arn,
2554 "CreationTime": conn.creation_time.timestamp() as f64,
2555 "LastModifiedTime": conn.last_modified_time.timestamp() as f64,
2556 "LastAuthorizedTime": conn.last_authorized_time.timestamp() as f64,
2557 });
2558 if let Some(ref desc) = conn.description {
2559 resp["Description"] = json!(desc);
2560 }
2561
2562 Ok(AwsResponse::ok_json(resp))
2563 }
2564
2565 fn list_connections(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2566 let body = req.json_body();
2567 validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 64)?;
2568 validate_optional_enum(
2569 "connectionState",
2570 body["ConnectionState"].as_str(),
2571 &[
2572 "CREATING",
2573 "UPDATING",
2574 "DELETING",
2575 "AUTHORIZED",
2576 "DEAUTHORIZED",
2577 "AUTHORIZING",
2578 "DEAUTHORIZING",
2579 "ACTIVE",
2580 "FAILED_CONNECTIVITY",
2581 ],
2582 )?;
2583 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
2584 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
2585
2586 let name_prefix = body["NamePrefix"].as_str();
2587 let connection_state = body["ConnectionState"].as_str();
2588 let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
2589
2590 let state = self.state.read();
2591 let all: Vec<Value> = state
2592 .connections
2593 .values()
2594 .filter(|c| {
2595 if let Some(prefix) = name_prefix {
2596 if !c.name.starts_with(prefix) {
2597 return false;
2598 }
2599 }
2600 if let Some(cs) = connection_state {
2601 if c.connection_state != cs {
2602 return false;
2603 }
2604 }
2605 true
2606 })
2607 .map(|c| {
2608 json!({
2609 "ConnectionArn": c.arn,
2610 "Name": c.name,
2611 "AuthorizationType": c.authorization_type,
2612 "ConnectionState": c.connection_state,
2613 "CreationTime": c.creation_time.timestamp() as f64,
2614 "LastModifiedTime": c.last_modified_time.timestamp() as f64,
2615 "LastAuthorizedTime": c.last_authorized_time.timestamp() as f64,
2616 })
2617 })
2618 .collect();
2619
2620 let (conns, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
2621 let mut resp = json!({ "Connections": conns });
2622 if let Some(token) = next_token {
2623 resp["NextToken"] = json!(token);
2624 }
2625
2626 Ok(AwsResponse::ok_json(resp))
2627 }
2628
2629 fn update_connection(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2630 let body = req.json_body();
2631 validate_required("Name", &body["Name"])?;
2632 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
2633 validate_string_length("name", name, 1, 64)?;
2634 validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2635 validate_optional_enum(
2636 "authorizationType",
2637 body["AuthorizationType"].as_str(),
2638 &["BASIC", "OAUTH_CLIENT_CREDENTIALS", "API_KEY"],
2639 )?;
2640
2641 let mut state = self.state.write();
2642 let conn = state.connections.get_mut(name).ok_or_else(|| {
2643 AwsServiceError::aws_error(
2644 StatusCode::BAD_REQUEST,
2645 "ResourceNotFoundException",
2646 format!("Connection '{name}' does not exist."),
2647 )
2648 })?;
2649
2650 if let Some(desc) = body["Description"].as_str() {
2651 conn.description = Some(desc.to_string());
2652 }
2653 if let Some(auth_type) = body["AuthorizationType"].as_str() {
2654 conn.authorization_type = auth_type.to_string();
2655 }
2656 if body.get("AuthParameters").is_some() {
2657 conn.auth_parameters = body["AuthParameters"].clone();
2658 }
2659 conn.last_modified_time = Utc::now();
2660
2661 Ok(AwsResponse::ok_json(json!({
2662 "ConnectionArn": conn.arn,
2663 "ConnectionState": conn.connection_state,
2664 "CreationTime": conn.creation_time.timestamp() as f64,
2665 "LastModifiedTime": conn.last_modified_time.timestamp() as f64,
2666 "LastAuthorizedTime": conn.last_authorized_time.timestamp() as f64,
2667 })))
2668 }
2669
2670 fn delete_connection(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2671 let body = req.json_body();
2672 validate_required("Name", &body["Name"])?;
2673 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
2674 validate_string_length("name", name, 1, 64)?;
2675
2676 let mut state = self.state.write();
2677 let conn = state.connections.remove(name).ok_or_else(|| {
2678 AwsServiceError::aws_error(
2679 StatusCode::BAD_REQUEST,
2680 "ResourceNotFoundException",
2681 format!("Connection '{name}' does not exist."),
2682 )
2683 })?;
2684
2685 Ok(AwsResponse::ok_json(json!({
2686 "ConnectionArn": conn.arn,
2687 "ConnectionState": conn.connection_state,
2688 "CreationTime": conn.creation_time.timestamp() as f64,
2689 "LastModifiedTime": conn.last_modified_time.timestamp() as f64,
2690 "LastAuthorizedTime": conn.last_authorized_time.timestamp() as f64,
2691 })))
2692 }
2693
2694 fn create_api_destination(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2697 let body = req.json_body();
2698 validate_required("Name", &body["Name"])?;
2699 let name = body["Name"]
2700 .as_str()
2701 .ok_or_else(|| missing("Name"))?
2702 .to_string();
2703 validate_string_length("name", &name, 1, 64)?;
2704 validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2705 validate_required("ConnectionArn", &body["ConnectionArn"])?;
2706 let description = body["Description"].as_str().map(|s| s.to_string());
2707 let connection_arn = body["ConnectionArn"]
2708 .as_str()
2709 .ok_or_else(|| missing("ConnectionArn"))?
2710 .to_string();
2711 validate_string_length("connectionArn", &connection_arn, 1, 1600)?;
2712 validate_required("InvocationEndpoint", &body["InvocationEndpoint"])?;
2713 let endpoint = body["InvocationEndpoint"]
2714 .as_str()
2715 .ok_or_else(|| missing("InvocationEndpoint"))?
2716 .to_string();
2717 validate_string_length("invocationEndpoint", &endpoint, 1, 2048)?;
2718 validate_required("HttpMethod", &body["HttpMethod"])?;
2719 let http_method = body["HttpMethod"]
2720 .as_str()
2721 .ok_or_else(|| missing("HttpMethod"))?
2722 .to_string();
2723 validate_enum(
2724 "httpMethod",
2725 &http_method,
2726 &["POST", "GET", "HEAD", "OPTIONS", "PUT", "PATCH", "DELETE"],
2727 )?;
2728 let rate_limit = body["InvocationRateLimitPerSecond"].as_i64();
2729 if let Some(r) = rate_limit {
2730 validate_range_i64("invocationRateLimitPerSecond", r, 1, i64::MAX)?;
2731 }
2732
2733 let mut state = self.state.write();
2734 let now = Utc::now();
2735 let dest_uuid = uuid::Uuid::new_v4();
2736 let arn = format!(
2737 "arn:aws:events:{}:{}:api-destination/{}/{}",
2738 req.region, state.account_id, name, dest_uuid
2739 );
2740
2741 let dest = ApiDestination {
2742 name: name.clone(),
2743 arn: arn.clone(),
2744 description,
2745 connection_arn,
2746 invocation_endpoint: endpoint,
2747 http_method,
2748 invocation_rate_limit_per_second: rate_limit,
2749 state: "ACTIVE".to_string(),
2750 creation_time: now,
2751 last_modified_time: now,
2752 };
2753 state.api_destinations.insert(name, dest);
2754
2755 Ok(AwsResponse::ok_json(json!({
2756 "ApiDestinationArn": arn,
2757 "ApiDestinationState": "ACTIVE",
2758 "CreationTime": now.timestamp() as f64,
2759 "LastModifiedTime": now.timestamp() as f64,
2760 })))
2761 }
2762
2763 fn describe_api_destination(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2764 let body = req.json_body();
2765 validate_required("Name", &body["Name"])?;
2766 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
2767 validate_string_length("name", name, 1, 64)?;
2768
2769 let state = self.state.read();
2770 let dest = state.api_destinations.get(name).ok_or_else(|| {
2771 AwsServiceError::aws_error(
2772 StatusCode::BAD_REQUEST,
2773 "ResourceNotFoundException",
2774 format!("An api-destination '{name}' does not exist."),
2775 )
2776 })?;
2777
2778 let mut resp = json!({
2779 "ApiDestinationArn": dest.arn,
2780 "Name": dest.name,
2781 "ConnectionArn": dest.connection_arn,
2782 "InvocationEndpoint": dest.invocation_endpoint,
2783 "HttpMethod": dest.http_method,
2784 "ApiDestinationState": dest.state,
2785 "CreationTime": dest.creation_time.timestamp() as f64,
2786 "LastModifiedTime": dest.last_modified_time.timestamp() as f64,
2787 });
2788 if let Some(ref desc) = dest.description {
2789 resp["Description"] = json!(desc);
2790 }
2791 if let Some(rate) = dest.invocation_rate_limit_per_second {
2792 resp["InvocationRateLimitPerSecond"] = json!(rate);
2793 }
2794
2795 Ok(AwsResponse::ok_json(resp))
2796 }
2797
2798 fn list_api_destinations(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2799 let body = req.json_body();
2800 validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 64)?;
2801 validate_optional_string_length("connectionArn", body["ConnectionArn"].as_str(), 1, 1600)?;
2802 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
2803 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
2804
2805 let name_prefix = body["NamePrefix"].as_str();
2806 let connection_arn = body["ConnectionArn"].as_str();
2807 let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
2808
2809 let state = self.state.read();
2810 let all: Vec<Value> = state
2811 .api_destinations
2812 .values()
2813 .filter(|d| {
2814 if let Some(prefix) = name_prefix {
2815 if !d.name.starts_with(prefix) {
2816 return false;
2817 }
2818 }
2819 if let Some(arn) = connection_arn {
2820 if d.connection_arn != arn {
2821 return false;
2822 }
2823 }
2824 true
2825 })
2826 .map(|d| {
2827 let mut obj = json!({
2828 "ApiDestinationArn": d.arn,
2829 "Name": d.name,
2830 "ConnectionArn": d.connection_arn,
2831 "InvocationEndpoint": d.invocation_endpoint,
2832 "HttpMethod": d.http_method,
2833 "ApiDestinationState": d.state,
2834 "CreationTime": d.creation_time.timestamp() as f64,
2835 "LastModifiedTime": d.last_modified_time.timestamp() as f64,
2836 });
2837 if let Some(rate) = d.invocation_rate_limit_per_second {
2838 obj["InvocationRateLimitPerSecond"] = json!(rate);
2839 }
2840 obj
2841 })
2842 .collect();
2843
2844 let (dests, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
2845 let mut resp = json!({ "ApiDestinations": dests });
2846 if let Some(token) = next_token {
2847 resp["NextToken"] = json!(token);
2848 }
2849
2850 Ok(AwsResponse::ok_json(resp))
2851 }
2852
2853 fn update_api_destination(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2854 let body = req.json_body();
2855 validate_required("Name", &body["Name"])?;
2856 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
2857 validate_string_length("name", name, 1, 64)?;
2858 validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2859 validate_optional_string_length("connectionArn", body["ConnectionArn"].as_str(), 1, 1600)?;
2860 validate_optional_string_length(
2861 "invocationEndpoint",
2862 body["InvocationEndpoint"].as_str(),
2863 1,
2864 2048,
2865 )?;
2866 validate_optional_enum(
2867 "httpMethod",
2868 body["HttpMethod"].as_str(),
2869 &["POST", "GET", "HEAD", "OPTIONS", "PUT", "PATCH", "DELETE"],
2870 )?;
2871 if let Some(r) = body["InvocationRateLimitPerSecond"].as_i64() {
2872 validate_range_i64("invocationRateLimitPerSecond", r, 1, i64::MAX)?;
2873 }
2874
2875 let mut state = self.state.write();
2876 let dest = state.api_destinations.get_mut(name).ok_or_else(|| {
2877 AwsServiceError::aws_error(
2878 StatusCode::BAD_REQUEST,
2879 "ResourceNotFoundException",
2880 format!("An api-destination '{name}' does not exist."),
2881 )
2882 })?;
2883
2884 if let Some(desc) = body["Description"].as_str() {
2885 dest.description = Some(desc.to_string());
2886 }
2887 if let Some(endpoint) = body["InvocationEndpoint"].as_str() {
2888 dest.invocation_endpoint = endpoint.to_string();
2889 }
2890 if let Some(method) = body["HttpMethod"].as_str() {
2891 dest.http_method = method.to_string();
2892 }
2893 if let Some(rate) = body["InvocationRateLimitPerSecond"].as_i64() {
2894 dest.invocation_rate_limit_per_second = Some(rate);
2895 }
2896 if let Some(conn) = body["ConnectionArn"].as_str() {
2897 dest.connection_arn = conn.to_string();
2898 }
2899 dest.last_modified_time = Utc::now();
2900
2901 Ok(AwsResponse::ok_json(json!({
2902 "ApiDestinationArn": dest.arn,
2903 "ApiDestinationState": dest.state,
2904 "CreationTime": dest.creation_time.timestamp() as f64,
2905 "LastModifiedTime": dest.last_modified_time.timestamp() as f64,
2906 })))
2907 }
2908
2909 fn delete_api_destination(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2910 let body = req.json_body();
2911 validate_required("Name", &body["Name"])?;
2912 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
2913 validate_string_length("name", name, 1, 64)?;
2914
2915 let mut state = self.state.write();
2916 if !state.api_destinations.contains_key(name) {
2917 return Err(AwsServiceError::aws_error(
2918 StatusCode::BAD_REQUEST,
2919 "ResourceNotFoundException",
2920 format!("An api-destination '{name}' does not exist."),
2921 ));
2922 }
2923 state.api_destinations.remove(name);
2924
2925 Ok(AwsResponse::ok_json(json!({})))
2926 }
2927
2928 fn start_replay(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2931 let input = StartReplayInput::from_body(&req.json_body())?;
2932
2933 let mut state = self.state.write();
2934
2935 let bus_name = state.resolve_bus_name(&input.destination_arn);
2937 if !state.buses.contains_key(&bus_name) {
2938 return Err(AwsServiceError::aws_error(
2939 StatusCode::BAD_REQUEST,
2940 "ResourceNotFoundException",
2941 format!("Event bus {bus_name} does not exist."),
2942 ));
2943 }
2944
2945 let archive_name = input
2946 .event_source_arn
2947 .rsplit_once("archive/")
2948 .map(|(_, n)| n.to_string())
2949 .unwrap_or_default();
2950 let archive = state.archives.get(&archive_name).ok_or_else(|| {
2951 AwsServiceError::aws_error(
2952 StatusCode::BAD_REQUEST,
2953 "ValidationException",
2954 format!(
2955 "Parameter EventSourceArn is not valid. Reason: Archive {archive_name} does not exist."
2956 ),
2957 )
2958 })?;
2959 let archive_bus = state.resolve_bus_name(&archive.event_source_arn);
2960 if archive_bus != bus_name {
2961 return Err(AwsServiceError::aws_error(
2962 StatusCode::BAD_REQUEST,
2963 "ValidationException",
2964 "Parameter Destination.Arn is not valid. Reason: Cross event bus replay is not permitted.",
2965 ));
2966 }
2967
2968 if input.event_end_time <= input.event_start_time {
2969 return Err(AwsServiceError::aws_error(
2970 StatusCode::BAD_REQUEST,
2971 "ValidationException",
2972 "Parameter EventEndTime is not valid. Reason: EventStartTime must be before EventEndTime.",
2973 ));
2974 }
2975
2976 if state.replays.contains_key(&input.name) {
2977 return Err(AwsServiceError::aws_error(
2978 StatusCode::BAD_REQUEST,
2979 "ResourceAlreadyExistsException",
2980 format!("Replay {} already exists.", input.name),
2981 ));
2982 }
2983
2984 let now = Utc::now();
2985 let arn = format!(
2986 "arn:aws:events:{}:{}:replay/{}",
2987 req.region, state.account_id, input.name
2988 );
2989
2990 let events_to_deliver = collect_replay_events_with_targets(
2991 &state,
2992 &archive_name,
2993 &bus_name,
2994 input.event_start_time,
2995 input.event_end_time,
2996 &req.account_id,
2997 &req.region,
2998 );
2999
3000 let replay = Replay {
3001 name: input.name.clone(),
3002 arn: arn.clone(),
3003 description: input.description,
3004 event_source_arn: input.event_source_arn,
3005 destination: input.destination,
3006 event_start_time: input.event_start_time,
3007 event_end_time: input.event_end_time,
3008 state: "COMPLETED".to_string(),
3009 replay_start_time: now,
3010 replay_end_time: Some(now),
3011 };
3012 state.replays.insert(input.name, replay);
3013
3014 drop(state);
3015
3016 for (event, targets) in events_to_deliver {
3017 let detail_value: Value = serde_json::from_str(&event.detail).unwrap_or(json!({}));
3018 let event_json = json!({
3019 "version": "0",
3020 "id": event.event_id,
3021 "source": event.source,
3022 "account": req.account_id,
3023 "detail-type": event.detail_type,
3024 "detail": detail_value,
3025 "time": event.time.format("%Y-%m-%dT%H:%M:%SZ").to_string(),
3026 "region": req.region,
3027 "resources": event.resources,
3028 "replay-name": arn,
3029 });
3030 let event_str = event_json.to_string();
3031
3032 for target in targets {
3033 self.deliver_replay_event_to_target(&target, &event, &event_json, &event_str);
3034 }
3035 }
3036
3037 Ok(AwsResponse::ok_json(json!({
3038 "ReplayArn": arn,
3039 "ReplayStartTime": now.timestamp() as f64,
3040 "State": "STARTING",
3041 })))
3042 }
3043
3044 fn deliver_replay_event_to_target(
3045 &self,
3046 target: &EventTarget,
3047 event: &PutEvent,
3048 event_json: &Value,
3049 event_str: &str,
3050 ) {
3051 let target_arn = &target.arn;
3052 let body_str = if let Some(ref transformer) = target.input_transformer {
3053 apply_input_transformer(transformer, event_json)
3054 } else if let Some(ref input) = target.input {
3055 input.clone()
3056 } else if let Some(ref input_path) = target.input_path {
3057 resolve_json_path(event_json, input_path)
3058 .map(|v| v.to_string())
3059 .unwrap_or_else(|| event_str.to_string())
3060 } else {
3061 event_str.to_string()
3062 };
3063
3064 if target_arn.contains(":sqs:") {
3065 let group_id = target
3066 .sqs_parameters
3067 .as_ref()
3068 .and_then(|p| p["MessageGroupId"].as_str())
3069 .map(|s| s.to_string());
3070 if group_id.is_some() {
3071 self.delivery.send_to_sqs_with_attrs(
3072 target_arn,
3073 &body_str,
3074 &HashMap::new(),
3075 group_id.as_deref(),
3076 None,
3077 );
3078 } else {
3079 self.delivery
3080 .send_to_sqs(target_arn, &body_str, &HashMap::new());
3081 }
3082 } else if target_arn.contains(":sns:") {
3083 self.delivery
3084 .publish_to_sns(target_arn, &body_str, Some(&event.detail_type));
3085 } else if target_arn.contains(":lambda:") {
3086 let mut state = self.state.write();
3087 state
3088 .lambda_invocations
3089 .push(crate::state::LambdaInvocation {
3090 function_arn: target_arn.clone(),
3091 payload: body_str.clone(),
3092 timestamp: Utc::now(),
3093 });
3094 drop(state);
3095 if let Some(ref ls) = self.lambda_state {
3096 ls.write().invocations.push(LambdaInvocation {
3097 function_arn: target_arn.clone(),
3098 payload: body_str.clone(),
3099 timestamp: Utc::now(),
3100 source: "aws:events".to_string(),
3101 });
3102 }
3103 invoke_lambda_async(
3104 &self.container_runtime,
3105 &self.lambda_state,
3106 target_arn,
3107 &body_str,
3108 );
3109 } else if target_arn.contains(":logs:") {
3110 let mut state = self.state.write();
3111 state.log_deliveries.push(crate::state::LogDelivery {
3112 log_group_arn: target_arn.clone(),
3113 payload: body_str.clone(),
3114 timestamp: Utc::now(),
3115 });
3116 drop(state);
3117 if let Some(ref log_state) = self.logs_state {
3118 deliver_to_logs(log_state, target_arn, &body_str, Utc::now());
3119 }
3120 } else if target_arn.contains(":states:") {
3121 self.delivery
3122 .start_stepfunctions_execution(target_arn, &body_str);
3123 let mut state = self.state.write();
3124 state
3125 .step_function_executions
3126 .push(crate::state::StepFunctionExecution {
3127 state_machine_arn: target_arn.clone(),
3128 payload: body_str.clone(),
3129 timestamp: Utc::now(),
3130 });
3131 } else if target_arn.starts_with("https://") || target_arn.starts_with("http://") {
3132 let url = target_arn.clone();
3133 let payload = body_str.clone();
3134 tokio::spawn(async move {
3135 let client = reqwest::Client::new();
3136 let _ = client
3137 .post(&url)
3138 .header("Content-Type", "application/json")
3139 .body(payload)
3140 .send()
3141 .await;
3142 });
3143 }
3144 }
3145
3146 fn describe_replay(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3147 let body = req.json_body();
3148 validate_required("ReplayName", &body["ReplayName"])?;
3149 let name = body["ReplayName"]
3150 .as_str()
3151 .ok_or_else(|| missing("ReplayName"))?;
3152 validate_string_length("replayName", name, 1, 64)?;
3153
3154 let state = self.state.read();
3155 let replay = state.replays.get(name).ok_or_else(|| {
3156 AwsServiceError::aws_error(
3157 StatusCode::BAD_REQUEST,
3158 "ResourceNotFoundException",
3159 format!("Replay {name} does not exist."),
3160 )
3161 })?;
3162
3163 let mut resp = json!({
3164 "Destination": replay.destination,
3165 "EventSourceArn": replay.event_source_arn,
3166 "EventStartTime": replay.event_start_time.timestamp() as f64,
3167 "EventEndTime": replay.event_end_time.timestamp() as f64,
3168 "ReplayArn": replay.arn,
3169 "ReplayName": replay.name,
3170 "ReplayStartTime": replay.replay_start_time.timestamp() as f64,
3171 "State": replay.state,
3172 });
3173 if let Some(ref desc) = replay.description {
3174 resp["Description"] = json!(desc);
3175 }
3176 if let Some(ref end) = replay.replay_end_time {
3177 resp["ReplayEndTime"] = json!(end.timestamp() as f64);
3178 }
3179
3180 Ok(AwsResponse::ok_json(resp))
3181 }
3182
3183 fn list_replays(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3184 let body = req.json_body();
3185 validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 64)?;
3186 validate_optional_string_length(
3187 "eventSourceArn",
3188 body["EventSourceArn"].as_str(),
3189 1,
3190 1600,
3191 )?;
3192 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
3193 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
3194 let name_prefix = body["NamePrefix"].as_str();
3195 let source_arn = body["EventSourceArn"].as_str();
3196 let replay_state = body["State"].as_str();
3197
3198 let filter_count = [
3200 name_prefix.is_some(),
3201 source_arn.is_some(),
3202 replay_state.is_some(),
3203 ]
3204 .iter()
3205 .filter(|&&x| x)
3206 .count();
3207 if filter_count > 1 {
3208 return Err(AwsServiceError::aws_error(
3209 StatusCode::BAD_REQUEST,
3210 "ValidationException",
3211 "At most one filter is allowed for ListReplays. Use either : State, EventSourceArn, or NamePrefix.",
3212 ));
3213 }
3214
3215 if let Some(s) = replay_state {
3217 let valid = [
3218 "CANCELLED",
3219 "CANCELLING",
3220 "COMPLETED",
3221 "FAILED",
3222 "RUNNING",
3223 "STARTING",
3224 ];
3225 if !valid.contains(&s) {
3226 return Err(AwsServiceError::aws_error(
3227 StatusCode::BAD_REQUEST,
3228 "ValidationException",
3229 format!(
3230 "1 validation error detected: Value '{}' at 'state' failed to satisfy constraint: Member must satisfy enum value set: [CANCELLED, CANCELLING, COMPLETED, FAILED, RUNNING, STARTING]",
3231 s
3232 ),
3233 ));
3234 }
3235 }
3236
3237 let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
3238
3239 let state = self.state.read();
3240 let all: Vec<Value> = state
3241 .replays
3242 .values()
3243 .filter(|r| {
3244 if let Some(prefix) = name_prefix {
3245 r.name.starts_with(prefix)
3246 } else if let Some(arn) = source_arn {
3247 r.event_source_arn == arn
3248 } else if let Some(s) = replay_state {
3249 r.state == s
3250 } else {
3251 true
3252 }
3253 })
3254 .map(|r| {
3255 let mut obj = json!({
3256 "EventSourceArn": r.event_source_arn,
3257 "EventStartTime": r.event_start_time.timestamp() as f64,
3258 "EventEndTime": r.event_end_time.timestamp() as f64,
3259 "ReplayName": r.name,
3260 "ReplayStartTime": r.replay_start_time.timestamp() as f64,
3261 "State": r.state,
3262 });
3263 if let Some(ref end) = r.replay_end_time {
3264 obj["ReplayEndTime"] = json!(end.timestamp() as f64);
3265 }
3266 obj
3267 })
3268 .collect();
3269
3270 let (replays, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
3271 let mut resp = json!({ "Replays": replays });
3272 if let Some(token) = next_token {
3273 resp["NextToken"] = json!(token);
3274 }
3275
3276 Ok(AwsResponse::ok_json(resp))
3277 }
3278
3279 fn cancel_replay(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3280 let body = req.json_body();
3281 validate_required("ReplayName", &body["ReplayName"])?;
3282 let name = body["ReplayName"]
3283 .as_str()
3284 .ok_or_else(|| missing("ReplayName"))?;
3285 validate_string_length("replayName", name, 1, 64)?;
3286
3287 let mut state = self.state.write();
3288 let replay = state.replays.get_mut(name).ok_or_else(|| {
3289 AwsServiceError::aws_error(
3290 StatusCode::BAD_REQUEST,
3291 "ResourceNotFoundException",
3292 format!("Replay {name} does not exist."),
3293 )
3294 })?;
3295
3296 if replay.state == "CANCELLED" || replay.state == "CANCELLING" {
3298 return Err(AwsServiceError::aws_error(
3299 StatusCode::BAD_REQUEST,
3300 "IllegalStatusException",
3301 format!("Replay {name} is not in a valid state for this operation."),
3302 ));
3303 }
3304
3305 let arn = replay.arn.clone();
3306 replay.state = "CANCELLED".to_string();
3307
3308 Ok(AwsResponse::ok_json(json!({
3309 "ReplayArn": arn,
3310 "State": "CANCELLING",
3311 })))
3312 }
3313}
3314
3315fn find_tags_mut<'a>(
3318 state: &'a mut crate::state::EventBridgeState,
3319 arn: &str,
3320) -> Result<&'a mut HashMap<String, String>, AwsServiceError> {
3321 for bus in state.buses.values_mut() {
3323 if bus.arn == arn {
3324 return Ok(&mut bus.tags);
3325 }
3326 }
3327 for rule in state.rules.values_mut() {
3329 if rule.arn == arn {
3330 return Ok(&mut rule.tags);
3331 }
3332 }
3333
3334 let error_msg = if arn.contains(":rule/") {
3336 let parts: Vec<&str> = arn.rsplitn(2, ":rule/").collect();
3338 if let Some(rule_path) = parts.first() {
3339 if let Some((bus, rule_name)) = rule_path.rsplit_once('/') {
3340 format!("Rule {rule_name} does not exist on EventBus {bus}.")
3341 } else {
3342 format!("Rule {} does not exist on EventBus default.", rule_path)
3343 }
3344 } else {
3345 format!("Resource {arn} not found.")
3346 }
3347 } else {
3348 format!("Resource {arn} not found.")
3349 };
3350
3351 Err(AwsServiceError::aws_error(
3352 StatusCode::BAD_REQUEST,
3353 "ResourceNotFoundException",
3354 error_msg,
3355 ))
3356}
3357
3358fn find_tags<'a>(
3359 state: &'a crate::state::EventBridgeState,
3360 arn: &str,
3361) -> Result<&'a HashMap<String, String>, AwsServiceError> {
3362 for bus in state.buses.values() {
3363 if bus.arn == arn {
3364 return Ok(&bus.tags);
3365 }
3366 }
3367 for rule in state.rules.values() {
3368 if rule.arn == arn {
3369 return Ok(&rule.tags);
3370 }
3371 }
3372
3373 let error_msg = if arn.contains(":rule/") {
3374 let parts: Vec<&str> = arn.rsplitn(2, ":rule/").collect();
3375 if let Some(rule_path) = parts.first() {
3376 if let Some((bus, rule_name)) = rule_path.rsplit_once('/') {
3377 format!("Rule {rule_name} does not exist on EventBus {bus}.")
3378 } else {
3379 format!("Rule {} does not exist on EventBus default.", rule_path)
3380 }
3381 } else {
3382 format!("Resource {arn} not found.")
3383 }
3384 } else {
3385 format!("Resource {arn} not found.")
3386 };
3387
3388 Err(AwsServiceError::aws_error(
3389 StatusCode::BAD_REQUEST,
3390 "ResourceNotFoundException",
3391 error_msg,
3392 ))
3393}
3394
3395fn validate_event_pattern(pattern: &str) -> Result<(), AwsServiceError> {
3398 let parsed: Value = serde_json::from_str(pattern).map_err(|_| {
3399 AwsServiceError::aws_error(
3400 StatusCode::BAD_REQUEST,
3401 "InvalidEventPatternException",
3402 "Event pattern is not valid. Reason: Invalid JSON",
3403 )
3404 })?;
3405
3406 validate_pattern_values(&parsed, "")?;
3407 Ok(())
3408}
3409
3410fn validate_pattern_values(value: &Value, path: &str) -> Result<(), AwsServiceError> {
3411 match value {
3412 Value::Object(obj) => {
3413 for (key, val) in obj {
3414 let new_path = if path.is_empty() {
3415 key.clone()
3416 } else {
3417 format!("{path}.{key}")
3418 };
3419 match val {
3420 Value::Object(_) => validate_pattern_values(val, &new_path)?,
3421 Value::Array(_) => {} _ => {
3423 return Err(AwsServiceError::aws_error(
3424 StatusCode::BAD_REQUEST,
3425 "InvalidEventPatternException",
3426 format!(
3427 "Event pattern is not valid. Reason: '{}' must be an object or an array",
3428 key
3429 ),
3430 ));
3431 }
3432 }
3433 }
3434 Ok(())
3435 }
3436 _ => Ok(()),
3437 }
3438}
3439
3440fn build_auth_params_response(auth_type: &str, params: &Value) -> Value {
3443 match auth_type {
3444 "API_KEY" => {
3445 let mut resp = json!({});
3446 if let Some(api_key) = params.get("ApiKeyAuthParameters") {
3447 resp["ApiKeyAuthParameters"] = json!({
3448 "ApiKeyName": api_key["ApiKeyName"],
3449 });
3450 }
3451 resp
3452 }
3453 "BASIC" => {
3454 let mut resp = json!({});
3455 if let Some(basic) = params.get("BasicAuthParameters") {
3456 resp["BasicAuthParameters"] = json!({
3457 "Username": basic["Username"],
3458 });
3459 }
3460 resp
3461 }
3462 "OAUTH_CLIENT_CREDENTIALS" => {
3463 let mut resp = json!({});
3464 if let Some(oauth) = params.get("OAuthParameters") {
3465 resp["OAuthParameters"] = json!({
3466 "AuthorizationEndpoint": oauth["AuthorizationEndpoint"],
3467 "HttpMethod": oauth["HttpMethod"],
3468 "ClientParameters": {
3469 "ClientID": oauth.get("ClientParameters").and_then(|c| c.get("ClientID")),
3470 },
3471 });
3472 }
3473 resp
3474 }
3475 _ => params.clone(),
3476 }
3477}
3478
3479pub fn matches_pattern(
3483 pattern_json: Option<&str>,
3484 source: &str,
3485 detail_type: &str,
3486 detail: &str,
3487 account: &str,
3488 region: &str,
3489 resources: &[String],
3490) -> bool {
3491 let pattern_json = match pattern_json {
3492 Some(p) => p,
3493 None => return true,
3494 };
3495
3496 let pattern: Value = match serde_json::from_str(pattern_json) {
3497 Ok(v) => v,
3498 Err(_) => return false,
3499 };
3500
3501 let pattern_obj = match pattern.as_object() {
3502 Some(o) => o,
3503 None => return false,
3504 };
3505
3506 let detail_value: Value = serde_json::from_str(detail).unwrap_or(json!({}));
3507 let event = json!({
3508 "source": source,
3509 "detail-type": detail_type,
3510 "detail": detail_value,
3511 "account": account,
3512 "region": region,
3513 "resources": resources,
3514 });
3515
3516 for (key, pattern_value) in pattern_obj {
3517 let event_value = &event[key];
3518 if !matches_value(pattern_value, event_value) {
3519 return false;
3520 }
3521 }
3522
3523 true
3524}
3525
3526fn matches_value(pattern: &Value, event_value: &Value) -> bool {
3527 match pattern {
3528 Value::Object(obj) => {
3529 for (key, sub_pattern) in obj {
3530 let sub_value = &event_value[key];
3531 if !matches_value(sub_pattern, sub_value) {
3532 return false;
3533 }
3534 }
3535 true
3536 }
3537 Value::Array(arr) => arr.iter().any(|elem| matches_single(elem, event_value)),
3538 _ => false,
3539 }
3540}
3541
3542fn matches_single(pattern_elem: &Value, event_value: &Value) -> bool {
3543 match pattern_elem {
3544 Value::Object(obj) => {
3545 if let Some(prefix_val) = obj.get("prefix") {
3546 if let (Some(prefix), Some(actual)) = (prefix_val.as_str(), event_value.as_str()) {
3547 return actual.starts_with(prefix);
3548 }
3549 return false;
3550 }
3551 if let Some(exists_val) = obj.get("exists") {
3552 let should_exist = exists_val.as_bool().unwrap_or(true);
3553 let does_exist = !event_value.is_null();
3554 return should_exist == does_exist;
3555 }
3556 if let Some(anything_but_val) = obj.get("anything-but") {
3557 return match anything_but_val {
3558 Value::String(s) => event_value.as_str() != Some(s.as_str()),
3559 Value::Array(arr) => !arr.iter().any(|v| values_equal(v, event_value)),
3560 Value::Number(_) => event_value != anything_but_val,
3561 _ => true,
3562 };
3563 }
3564 if let Some(numeric_val) = obj.get("numeric") {
3565 return matches_numeric(numeric_val, event_value);
3566 }
3567 false
3568 }
3569 _ => values_equal(pattern_elem, event_value),
3570 }
3571}
3572
3573#[allow(clippy::too_many_arguments)]
3577fn archive_matching_event(
3578 state: &mut crate::state::EventBridgeState,
3579 event: &PutEvent,
3580 event_bus_name: &str,
3581 source: &str,
3582 detail_type: &str,
3583 detail: &str,
3584 account_id: &str,
3585 region: &str,
3586 resources: &[String],
3587) {
3588 let archive_keys: Vec<String> = state.archives.keys().cloned().collect();
3589 for akey in archive_keys {
3590 let (archive_bus, archive_pattern, archive_enabled) = {
3591 let a = &state.archives[&akey];
3592 (
3593 state.resolve_bus_name(&a.event_source_arn),
3594 a.event_pattern.clone(),
3595 a.state == "ENABLED",
3596 )
3597 };
3598 if archive_bus != event_bus_name || !archive_enabled {
3599 continue;
3600 }
3601 let pattern_matches = matches_pattern(
3602 archive_pattern.as_deref(),
3603 source,
3604 detail_type,
3605 detail,
3606 account_id,
3607 region,
3608 resources,
3609 );
3610 if !pattern_matches {
3611 continue;
3612 }
3613 if let Some(archive) = state.archives.get_mut(&akey) {
3614 archive.event_count += 1;
3615 archive.size_bytes += detail.len() as i64;
3616 archive.events.push(event.clone());
3617 }
3618 }
3619}
3620
3621struct StartReplayInput {
3623 name: String,
3624 description: Option<String>,
3625 event_source_arn: String,
3626 destination: Value,
3627 destination_arn: String,
3628 event_start_time: DateTime<Utc>,
3629 event_end_time: DateTime<Utc>,
3630}
3631
3632impl StartReplayInput {
3633 fn from_body(body: &Value) -> Result<Self, AwsServiceError> {
3634 validate_required("ReplayName", &body["ReplayName"])?;
3635 let name = body["ReplayName"]
3636 .as_str()
3637 .ok_or_else(|| missing("ReplayName"))?
3638 .to_string();
3639 validate_string_length("replayName", &name, 1, 64)?;
3640 validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
3641 validate_required("EventSourceArn", &body["EventSourceArn"])?;
3642 let description = body["Description"].as_str().map(|s| s.to_string());
3643 let event_source_arn = body["EventSourceArn"]
3644 .as_str()
3645 .ok_or_else(|| missing("EventSourceArn"))?
3646 .to_string();
3647 validate_string_length("eventSourceArn", &event_source_arn, 1, 1600)?;
3648 validate_required("EventStartTime", &body["EventStartTime"])?;
3649 validate_required("EventEndTime", &body["EventEndTime"])?;
3650 validate_required("Destination", &body["Destination"])?;
3651 let destination = body["Destination"].clone();
3652
3653 let event_start_time = body["EventStartTime"]
3654 .as_f64()
3655 .and_then(|f| DateTime::from_timestamp(f as i64, 0))
3656 .unwrap_or_else(Utc::now);
3657 let event_end_time = body["EventEndTime"]
3658 .as_f64()
3659 .and_then(|f| DateTime::from_timestamp(f as i64, 0))
3660 .unwrap_or_else(Utc::now);
3661
3662 let destination_arn = destination["Arn"].as_str().unwrap_or("").to_string();
3663 if !destination_arn.contains(":event-bus/") {
3664 return Err(AwsServiceError::aws_error(
3665 StatusCode::BAD_REQUEST,
3666 "ValidationException",
3667 "Parameter Destination.Arn is not valid. Reason: Must contain an event bus ARN.",
3668 ));
3669 }
3670
3671 Ok(Self {
3672 name,
3673 description,
3674 event_source_arn,
3675 destination,
3676 destination_arn,
3677 event_start_time,
3678 event_end_time,
3679 })
3680 }
3681}
3682
3683#[allow(clippy::too_many_arguments)]
3688fn collect_replay_events_with_targets(
3689 state: &crate::state::EventBridgeState,
3690 archive_name: &str,
3691 bus_name: &str,
3692 event_start_time: DateTime<Utc>,
3693 event_end_time: DateTime<Utc>,
3694 account_id: &str,
3695 region: &str,
3696) -> Vec<(PutEvent, Vec<EventTarget>)> {
3697 let Some(archive) = state.archives.get(archive_name) else {
3698 return Vec::new();
3699 };
3700
3701 let replay_events: Vec<PutEvent> = archive
3702 .events
3703 .iter()
3704 .filter(|e| e.time >= event_start_time && e.time < event_end_time)
3705 .cloned()
3706 .collect();
3707
3708 let mut events_to_deliver: Vec<(PutEvent, Vec<EventTarget>)> = Vec::new();
3709 for event in replay_events {
3710 let matching_targets: Vec<EventTarget> = state
3711 .rules
3712 .values()
3713 .filter(|r| {
3714 r.event_bus_name == bus_name
3715 && r.state == "ENABLED"
3716 && matches_pattern(
3717 r.event_pattern.as_deref(),
3718 &event.source,
3719 &event.detail_type,
3720 &event.detail,
3721 account_id,
3722 region,
3723 &event.resources,
3724 )
3725 })
3726 .flat_map(|r| r.targets.clone())
3727 .collect();
3728
3729 if !matching_targets.is_empty() {
3730 events_to_deliver.push((event, matching_targets));
3731 }
3732 }
3733 events_to_deliver
3734}
3735
3736fn matches_numeric(numeric_arr: &Value, event_value: &Value) -> bool {
3737 let arr = match numeric_arr.as_array() {
3738 Some(a) => a,
3739 None => return false,
3740 };
3741 let actual = match event_value.as_f64() {
3742 Some(n) => n,
3743 None => return false,
3744 };
3745 let mut i = 0;
3746 while i + 1 < arr.len() {
3747 let op = match arr[i].as_str() {
3748 Some(s) => s,
3749 None => return false,
3750 };
3751 let threshold = match arr[i + 1].as_f64() {
3752 Some(n) => n,
3753 None => return false,
3754 };
3755 let ok = match op {
3756 ">" => actual > threshold,
3757 ">=" => actual >= threshold,
3758 "<" => actual < threshold,
3759 "<=" => actual <= threshold,
3760 "=" => (actual - threshold).abs() < f64::EPSILON,
3761 _ => return false,
3762 };
3763 if !ok {
3764 return false;
3765 }
3766 i += 2;
3767 }
3768 true
3769}
3770
3771fn values_equal(a: &Value, b: &Value) -> bool {
3772 a == b
3773}
3774
3775fn resolve_json_path(event: &Value, path: &str) -> Option<Value> {
3777 let path = path.strip_prefix('$').unwrap_or(path);
3778 let mut current = event;
3779 for segment in path.split('.') {
3780 if segment.is_empty() {
3781 continue;
3782 }
3783 current = current.get(segment)?;
3784 }
3785 Some(current.clone())
3786}
3787
3788fn apply_input_transformer(transformer: &Value, event: &Value) -> String {
3790 let input_paths_map = transformer
3791 .get("InputPathsMap")
3792 .and_then(|v| v.as_object())
3793 .cloned()
3794 .unwrap_or_default();
3795 let template = transformer
3796 .get("InputTemplate")
3797 .and_then(|v| v.as_str())
3798 .unwrap_or("")
3799 .to_string();
3800
3801 let mut resolved: HashMap<String, Value> = HashMap::new();
3803 for (var_name, path_val) in &input_paths_map {
3804 if let Some(path_str) = path_val.as_str() {
3805 if let Some(val) = resolve_json_path(event, path_str) {
3806 resolved.insert(var_name.clone(), val);
3807 }
3808 }
3809 }
3810
3811 let mut result = template;
3813 for (var_name, val) in &resolved {
3814 let placeholder = format!("<{var_name}>");
3815 let replacement = match val {
3816 Value::String(s) => s.clone(),
3817 other => other.to_string(),
3818 };
3819 result = result.replace(&placeholder, &replacement);
3820 }
3821
3822 result
3823}
3824
3825fn missing(name: &str) -> AwsServiceError {
3826 AwsServiceError::aws_error(
3827 StatusCode::BAD_REQUEST,
3828 "ValidationException",
3829 format!("The request must contain the parameter {name}"),
3830 )
3831}
3832
3833fn function_name_from_arn(arn: &str) -> &str {
3838 let parts: Vec<&str> = arn.split(':').collect();
3839 if parts.len() >= 7 && parts[5] == "function" {
3840 parts[6]
3841 } else {
3842 arn
3843 }
3844}
3845
3846pub fn invoke_lambda_async(
3849 container_runtime: &Option<Arc<ContainerRuntime>>,
3850 lambda_state: &Option<SharedLambdaState>,
3851 function_arn: &str,
3852 payload: &str,
3853) {
3854 let runtime = match container_runtime {
3855 Some(rt) => rt.clone(),
3856 None => return,
3857 };
3858 let lambda_state = match lambda_state {
3859 Some(ls) => ls.clone(),
3860 None => return,
3861 };
3862 let func_name = function_name_from_arn(function_arn).to_string();
3863 let payload = payload.as_bytes().to_vec();
3864
3865 tokio::spawn(async move {
3866 let func = {
3867 let state = lambda_state.read();
3868 state.functions.get(&func_name).cloned()
3869 };
3870 let func = match func {
3871 Some(f) => f,
3872 None => {
3873 tracing::warn!(
3874 function = %func_name,
3875 "EventBridge Lambda target not found, skipping invocation"
3876 );
3877 return;
3878 }
3879 };
3880 match runtime.invoke(&func, &payload).await {
3881 Ok(_) => {
3882 tracing::info!(function = %func_name, "EventBridge Lambda invocation succeeded");
3883 }
3884 Err(e) => {
3885 tracing::warn!(
3886 function = %func_name,
3887 error = %e,
3888 "EventBridge Lambda invocation failed"
3889 );
3890 }
3891 }
3892 });
3893}
3894
3895pub fn deliver_to_logs(
3898 logs_state: &SharedLogsState,
3899 log_group_arn: &str,
3900 payload: &str,
3901 timestamp: chrono::DateTime<chrono::Utc>,
3902) {
3903 let group_name = if log_group_arn.contains(":log-group:") {
3906 log_group_arn
3907 .split(":log-group:")
3908 .nth(1)
3909 .unwrap_or(log_group_arn)
3910 .trim_end_matches(":*")
3911 } else {
3912 log_group_arn
3913 };
3914
3915 let stream_name = "events".to_string();
3916 let ts_millis = timestamp.timestamp_millis();
3917
3918 let mut state = logs_state.write();
3919 let region = state.region.clone();
3920 let account_id = state.account_id.clone();
3921
3922 let group = state
3924 .log_groups
3925 .entry(group_name.to_string())
3926 .or_insert_with(|| fakecloud_logs::state::LogGroup {
3927 name: group_name.to_string(),
3928 arn: Arn::new(
3929 "logs",
3930 ®ion,
3931 &account_id,
3932 &format!("log-group:{group_name}"),
3933 )
3934 .to_string(),
3935 creation_time: ts_millis,
3936 retention_in_days: None,
3937 kms_key_id: None,
3938 tags: HashMap::new(),
3939 log_streams: HashMap::new(),
3940 stored_bytes: 0,
3941 subscription_filters: Vec::new(),
3942 data_protection_policy: None,
3943 index_policies: Vec::new(),
3944 transformer: None,
3945 deletion_protection: false,
3946 log_group_class: Some("STANDARD".to_string()),
3947 });
3948
3949 let stream = group
3950 .log_streams
3951 .entry(stream_name.clone())
3952 .or_insert_with(|| fakecloud_logs::state::LogStream {
3953 name: stream_name,
3954 arn: format!("{}:log-stream:events", group.arn),
3955 creation_time: ts_millis,
3956 first_event_timestamp: None,
3957 last_event_timestamp: None,
3958 last_ingestion_time: None,
3959 upload_sequence_token: "1".to_string(),
3960 events: Vec::new(),
3961 });
3962
3963 stream.events.push(fakecloud_logs::state::LogEvent {
3964 timestamp: ts_millis,
3965 message: payload.to_string(),
3966 ingestion_time: ts_millis,
3967 });
3968 stream.last_event_timestamp = Some(ts_millis);
3969 stream.last_ingestion_time = Some(ts_millis);
3970 if stream.first_event_timestamp.is_none() {
3971 stream.first_event_timestamp = Some(ts_millis);
3972 }
3973}
3974
3975fn apply_connection_auth(
3977 mut builder: reqwest::RequestBuilder,
3978 conn: &Connection,
3979) -> reqwest::RequestBuilder {
3980 match conn.authorization_type.as_str() {
3981 "API_KEY" => {
3982 if let Some(params) = conn.auth_parameters.get("ApiKeyAuthParameters") {
3983 if let (Some(name), Some(value)) = (
3984 params["ApiKeyName"].as_str(),
3985 params["ApiKeyValue"].as_str(),
3986 ) {
3987 builder = builder.header(name, value);
3988 }
3989 }
3990 }
3991 "BASIC" => {
3992 if let Some(params) = conn.auth_parameters.get("BasicAuthParameters") {
3993 if let (Some(user), Some(pass)) =
3994 (params["Username"].as_str(), params["Password"].as_str())
3995 {
3996 builder = builder.basic_auth(user, Some(pass));
3997 }
3998 }
3999 }
4000 "OAUTH_CLIENT_CREDENTIALS" => {
4001 if let Some(params) = conn.auth_parameters.get("OAuthParameters") {
4004 if let (Some(client_id), Some(client_secret)) = (
4005 params["ClientParameters"]["ClientID"].as_str(),
4006 params["ClientParameters"]["ClientSecret"].as_str(),
4007 ) {
4008 builder = builder.basic_auth(client_id, Some(client_secret));
4009 }
4010 }
4011 }
4012 _ => {}
4013 }
4014 builder
4015}
4016
4017#[cfg(test)]
4018mod tests {
4019 use super::*;
4020
4021 fn test_matches(
4023 pattern_json: Option<&str>,
4024 source: &str,
4025 detail_type: &str,
4026 detail: &str,
4027 ) -> bool {
4028 matches_pattern(
4029 pattern_json,
4030 source,
4031 detail_type,
4032 detail,
4033 "123456789012",
4034 "us-east-1",
4035 &[],
4036 )
4037 }
4038
4039 #[test]
4040 fn pattern_matches_source() {
4041 assert!(test_matches(
4042 Some(r#"{"source": ["my.app"]}"#),
4043 "my.app",
4044 "OrderPlaced",
4045 "{}"
4046 ));
4047 assert!(!test_matches(
4048 Some(r#"{"source": ["other.app"]}"#),
4049 "my.app",
4050 "OrderPlaced",
4051 "{}"
4052 ));
4053 }
4054
4055 #[test]
4056 fn pattern_matches_detail_type() {
4057 assert!(test_matches(
4058 Some(r#"{"detail-type": ["OrderPlaced"]}"#),
4059 "my.app",
4060 "OrderPlaced",
4061 "{}"
4062 ));
4063 assert!(!test_matches(
4064 Some(r#"{"detail-type": ["OrderShipped"]}"#),
4065 "my.app",
4066 "OrderPlaced",
4067 "{}"
4068 ));
4069 }
4070
4071 #[test]
4072 fn pattern_matches_detail_field() {
4073 assert!(test_matches(
4074 Some(r#"{"detail": {"status": ["ACTIVE"]}}"#),
4075 "my.app",
4076 "StatusChange",
4077 r#"{"status": "ACTIVE"}"#
4078 ));
4079 assert!(!test_matches(
4080 Some(r#"{"detail": {"status": ["ACTIVE"]}}"#),
4081 "my.app",
4082 "StatusChange",
4083 r#"{"status": "INACTIVE"}"#
4084 ));
4085 }
4086
4087 #[test]
4088 fn no_pattern_matches_everything() {
4089 assert!(test_matches(None, "any", "any", "{}"));
4090 }
4091
4092 #[test]
4093 fn combined_pattern() {
4094 let pattern = r#"{"source": ["orders"], "detail-type": ["OrderPlaced"]}"#;
4095 assert!(test_matches(Some(pattern), "orders", "OrderPlaced", "{}"));
4096 assert!(!test_matches(Some(pattern), "orders", "OrderShipped", "{}"));
4097 assert!(!test_matches(Some(pattern), "other", "OrderPlaced", "{}"));
4098 }
4099
4100 #[test]
4101 fn nested_detail_pattern() {
4102 let pattern = r#"{"detail": {"order": {"status": ["PLACED"]}}}"#;
4103 assert!(test_matches(
4104 Some(pattern),
4105 "my.app",
4106 "OrderEvent",
4107 r#"{"order": {"status": "PLACED", "id": "123"}}"#
4108 ));
4109 assert!(!test_matches(
4110 Some(pattern),
4111 "my.app",
4112 "OrderEvent",
4113 r#"{"order": {"status": "SHIPPED", "id": "123"}}"#
4114 ));
4115 assert!(!test_matches(
4116 Some(pattern),
4117 "my.app",
4118 "OrderEvent",
4119 r#"{"order": {"id": "123"}}"#
4120 ));
4121 }
4122
4123 #[test]
4124 fn deeply_nested_detail_pattern() {
4125 let pattern = r#"{"detail": {"a": {"b": {"c": ["deep"]}}}}"#;
4126 assert!(test_matches(
4127 Some(pattern),
4128 "src",
4129 "type",
4130 r#"{"a": {"b": {"c": "deep"}}}"#
4131 ));
4132 assert!(!test_matches(
4133 Some(pattern),
4134 "src",
4135 "type",
4136 r#"{"a": {"b": {"c": "shallow"}}}"#
4137 ));
4138 }
4139
4140 #[test]
4141 fn prefix_matcher() {
4142 let pattern = r#"{"source": [{"prefix": "com.myapp"}]}"#;
4143 assert!(test_matches(
4144 Some(pattern),
4145 "com.myapp.orders",
4146 "OrderPlaced",
4147 "{}"
4148 ));
4149 assert!(test_matches(
4150 Some(pattern),
4151 "com.myapp",
4152 "OrderPlaced",
4153 "{}"
4154 ));
4155 assert!(!test_matches(
4156 Some(pattern),
4157 "com.other",
4158 "OrderPlaced",
4159 "{}"
4160 ));
4161 }
4162
4163 #[test]
4164 fn prefix_matcher_in_detail() {
4165 let pattern = r#"{"detail": {"region": [{"prefix": "us-"}]}}"#;
4166 assert!(test_matches(
4167 Some(pattern),
4168 "src",
4169 "type",
4170 r#"{"region": "us-east-1"}"#
4171 ));
4172 assert!(!test_matches(
4173 Some(pattern),
4174 "src",
4175 "type",
4176 r#"{"region": "eu-west-1"}"#
4177 ));
4178 }
4179
4180 #[test]
4181 fn exists_matcher() {
4182 let pattern = r#"{"detail": {"error": [{"exists": true}]}}"#;
4183 assert!(test_matches(
4184 Some(pattern),
4185 "src",
4186 "type",
4187 r#"{"error": "something broke"}"#
4188 ));
4189 assert!(!test_matches(
4190 Some(pattern),
4191 "src",
4192 "type",
4193 r#"{"status": "ok"}"#
4194 ));
4195
4196 let pattern = r#"{"detail": {"error": [{"exists": false}]}}"#;
4197 assert!(test_matches(
4198 Some(pattern),
4199 "src",
4200 "type",
4201 r#"{"status": "ok"}"#
4202 ));
4203 assert!(!test_matches(
4204 Some(pattern),
4205 "src",
4206 "type",
4207 r#"{"error": "something broke"}"#
4208 ));
4209 }
4210
4211 #[test]
4212 fn anything_but_matcher() {
4213 let pattern = r#"{"source": [{"anything-but": "internal"}]}"#;
4214 assert!(test_matches(Some(pattern), "external", "Event", "{}"));
4215 assert!(!test_matches(Some(pattern), "internal", "Event", "{}"));
4216
4217 let pattern = r#"{"source": [{"anything-but": ["internal", "test"]}]}"#;
4218 assert!(test_matches(Some(pattern), "external", "Event", "{}"));
4219 assert!(!test_matches(Some(pattern), "internal", "Event", "{}"));
4220 assert!(!test_matches(Some(pattern), "test", "Event", "{}"));
4221 }
4222
4223 #[test]
4224 fn anything_but_in_detail() {
4225 let pattern = r#"{"detail": {"env": [{"anything-but": "prod"}]}}"#;
4226 assert!(test_matches(
4227 Some(pattern),
4228 "src",
4229 "type",
4230 r#"{"env": "staging"}"#
4231 ));
4232 assert!(!test_matches(
4233 Some(pattern),
4234 "src",
4235 "type",
4236 r#"{"env": "prod"}"#
4237 ));
4238 }
4239
4240 #[test]
4241 fn numeric_greater_than() {
4242 let pattern = r#"{"detail": {"count": [{"numeric": [">", 100]}]}}"#;
4243 assert!(test_matches(
4244 Some(pattern),
4245 "src",
4246 "type",
4247 r#"{"count": 150}"#
4248 ));
4249 assert!(!test_matches(
4250 Some(pattern),
4251 "src",
4252 "type",
4253 r#"{"count": 100}"#
4254 ));
4255 assert!(!test_matches(
4256 Some(pattern),
4257 "src",
4258 "type",
4259 r#"{"count": 50}"#
4260 ));
4261 }
4262
4263 #[test]
4264 fn numeric_less_than() {
4265 let pattern = r#"{"detail": {"count": [{"numeric": ["<", 10]}]}}"#;
4266 assert!(test_matches(
4267 Some(pattern),
4268 "src",
4269 "type",
4270 r#"{"count": 5}"#
4271 ));
4272 assert!(!test_matches(
4273 Some(pattern),
4274 "src",
4275 "type",
4276 r#"{"count": 10}"#
4277 ));
4278 assert!(!test_matches(
4279 Some(pattern),
4280 "src",
4281 "type",
4282 r#"{"count": 15}"#
4283 ));
4284 }
4285
4286 #[test]
4287 fn numeric_range() {
4288 let pattern = r#"{"detail": {"count": [{"numeric": [">=", 50, "<", 200]}]}}"#;
4289 assert!(test_matches(
4290 Some(pattern),
4291 "src",
4292 "type",
4293 r#"{"count": 50}"#
4294 ));
4295 assert!(test_matches(
4296 Some(pattern),
4297 "src",
4298 "type",
4299 r#"{"count": 100}"#
4300 ));
4301 assert!(!test_matches(
4302 Some(pattern),
4303 "src",
4304 "type",
4305 r#"{"count": 200}"#
4306 ));
4307 assert!(!test_matches(
4308 Some(pattern),
4309 "src",
4310 "type",
4311 r#"{"count": 49}"#
4312 ));
4313 }
4314
4315 #[test]
4316 fn mixed_matchers_and_literals() {
4317 let pattern = r#"{"source": ["exact.match", {"prefix": "com.myapp"}]}"#;
4318 assert!(test_matches(Some(pattern), "exact.match", "Event", "{}"));
4319 assert!(test_matches(
4320 Some(pattern),
4321 "com.myapp.orders",
4322 "Event",
4323 "{}"
4324 ));
4325 assert!(!test_matches(Some(pattern), "other.source", "Event", "{}"));
4326 }
4327
4328 use crate::state::EventBridgeState;
4331 use fakecloud_core::delivery::DeliveryBus;
4332 use parking_lot::RwLock;
4333
4334 fn make_service() -> EventBridgeService {
4335 let state = Arc::new(RwLock::new(EventBridgeState::new(
4336 "123456789012",
4337 "us-east-1",
4338 )));
4339 let delivery = Arc::new(DeliveryBus::new());
4340 EventBridgeService::new(state, delivery)
4341 }
4342
4343 fn make_request(action: &str, body: Value) -> AwsRequest {
4344 AwsRequest {
4345 service: "events".to_string(),
4346 action: action.to_string(),
4347 region: "us-east-1".to_string(),
4348 account_id: "123456789012".to_string(),
4349 request_id: "test-id".to_string(),
4350 headers: http::HeaderMap::new(),
4351 query_params: HashMap::new(),
4352 body: serde_json::to_vec(&body).unwrap().into(),
4353 path_segments: vec![],
4354 raw_path: "/".to_string(),
4355 raw_query: String::new(),
4356 method: http::Method::POST,
4357 is_query_protocol: false,
4358 access_key_id: None,
4359 principal: None,
4360 }
4361 }
4362
4363 fn create_connection(svc: &EventBridgeService, name: &str) {
4364 let req = make_request(
4365 "CreateConnection",
4366 json!({
4367 "Name": name,
4368 "AuthorizationType": "API_KEY",
4369 "AuthParameters": {
4370 "ApiKeyAuthParameters": {
4371 "ApiKeyName": "x-api-key",
4372 "ApiKeyValue": "secret"
4373 }
4374 }
4375 }),
4376 );
4377 svc.create_connection(&req).unwrap();
4378 }
4379
4380 fn create_api_destination(svc: &EventBridgeService, name: &str, conn_name: &str) {
4381 let conn_arn_field = {
4382 let state = svc.state.read();
4383 state.connections.get(conn_name).unwrap().arn.clone()
4384 };
4385 let req = make_request(
4386 "CreateApiDestination",
4387 json!({
4388 "Name": name,
4389 "ConnectionArn": conn_arn_field,
4390 "InvocationEndpoint": "https://example.com",
4391 "HttpMethod": "POST"
4392 }),
4393 );
4394 svc.create_api_destination(&req).unwrap();
4395 }
4396
4397 #[test]
4400 fn list_connections_returns_all_by_default() {
4401 let svc = make_service();
4402 create_connection(&svc, "conn-alpha");
4403 create_connection(&svc, "conn-beta");
4404 create_connection(&svc, "conn-gamma");
4405
4406 let req = make_request("ListConnections", json!({}));
4407 let resp = svc.list_connections(&req).unwrap();
4408 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4409 assert_eq!(body["Connections"].as_array().unwrap().len(), 3);
4410 assert!(body["NextToken"].is_null());
4411 }
4412
4413 #[test]
4414 fn list_connections_name_prefix_filter() {
4415 let svc = make_service();
4416 create_connection(&svc, "prod-conn-1");
4417 create_connection(&svc, "prod-conn-2");
4418 create_connection(&svc, "dev-conn-1");
4419
4420 let req = make_request("ListConnections", json!({ "NamePrefix": "prod-" }));
4421 let resp = svc.list_connections(&req).unwrap();
4422 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4423 let names: Vec<&str> = body["Connections"]
4424 .as_array()
4425 .unwrap()
4426 .iter()
4427 .map(|c| c["Name"].as_str().unwrap())
4428 .collect();
4429 assert_eq!(names.len(), 2);
4430 assert!(names.iter().all(|n| n.starts_with("prod-")));
4431 }
4432
4433 #[test]
4434 fn list_connections_state_filter() {
4435 let svc = make_service();
4436 create_connection(&svc, "conn-a");
4437 create_connection(&svc, "conn-b");
4438
4439 {
4441 let mut state = svc.state.write();
4442 state
4443 .connections
4444 .get_mut("conn-b")
4445 .unwrap()
4446 .connection_state = "DEAUTHORIZED".to_string();
4447 }
4448
4449 let req = make_request(
4450 "ListConnections",
4451 json!({ "ConnectionState": "AUTHORIZED" }),
4452 );
4453 let resp = svc.list_connections(&req).unwrap();
4454 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4455 let conns = body["Connections"].as_array().unwrap();
4456 assert_eq!(conns.len(), 1);
4457 assert_eq!(conns[0]["Name"].as_str().unwrap(), "conn-a");
4458 }
4459
4460 #[test]
4461 fn list_connections_pagination() {
4462 let svc = make_service();
4463 for i in 0..5 {
4464 create_connection(&svc, &format!("conn-{i:02}"));
4465 }
4466
4467 let req = make_request("ListConnections", json!({ "Limit": 2 }));
4469 let resp = svc.list_connections(&req).unwrap();
4470 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4471 assert_eq!(body["Connections"].as_array().unwrap().len(), 2);
4472 let token = body["NextToken"].as_str().unwrap();
4473 assert_eq!(token, "2");
4474
4475 let req = make_request("ListConnections", json!({ "Limit": 2, "NextToken": token }));
4477 let resp = svc.list_connections(&req).unwrap();
4478 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4479 assert_eq!(body["Connections"].as_array().unwrap().len(), 2);
4480 let token = body["NextToken"].as_str().unwrap();
4481 assert_eq!(token, "4");
4482
4483 let req = make_request("ListConnections", json!({ "Limit": 2, "NextToken": token }));
4485 let resp = svc.list_connections(&req).unwrap();
4486 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4487 assert_eq!(body["Connections"].as_array().unwrap().len(), 1);
4488 assert!(body["NextToken"].is_null());
4489 }
4490
4491 #[test]
4492 fn list_connections_pagination_with_filter() {
4493 let svc = make_service();
4494 for i in 0..4 {
4495 create_connection(&svc, &format!("prod-{i:02}"));
4496 }
4497 create_connection(&svc, "dev-00");
4498
4499 let req = make_request(
4500 "ListConnections",
4501 json!({ "NamePrefix": "prod-", "Limit": 2 }),
4502 );
4503 let resp = svc.list_connections(&req).unwrap();
4504 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4505 assert_eq!(body["Connections"].as_array().unwrap().len(), 2);
4506 assert!(body["NextToken"].as_str().is_some());
4507 }
4508
4509 #[test]
4512 fn list_api_destinations_returns_all_by_default() {
4513 let svc = make_service();
4514 create_connection(&svc, "my-conn");
4515 create_api_destination(&svc, "dest-alpha", "my-conn");
4516 create_api_destination(&svc, "dest-beta", "my-conn");
4517
4518 let req = make_request("ListApiDestinations", json!({}));
4519 let resp = svc.list_api_destinations(&req).unwrap();
4520 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4521 assert_eq!(body["ApiDestinations"].as_array().unwrap().len(), 2);
4522 assert!(body["NextToken"].is_null());
4523 }
4524
4525 #[test]
4526 fn list_api_destinations_name_prefix_filter() {
4527 let svc = make_service();
4528 create_connection(&svc, "my-conn");
4529 create_api_destination(&svc, "prod-dest-1", "my-conn");
4530 create_api_destination(&svc, "prod-dest-2", "my-conn");
4531 create_api_destination(&svc, "dev-dest-1", "my-conn");
4532
4533 let req = make_request("ListApiDestinations", json!({ "NamePrefix": "prod-" }));
4534 let resp = svc.list_api_destinations(&req).unwrap();
4535 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4536 let names: Vec<&str> = body["ApiDestinations"]
4537 .as_array()
4538 .unwrap()
4539 .iter()
4540 .map(|d| d["Name"].as_str().unwrap())
4541 .collect();
4542 assert_eq!(names.len(), 2);
4543 assert!(names.iter().all(|n| n.starts_with("prod-")));
4544 }
4545
4546 #[test]
4547 fn list_api_destinations_connection_arn_filter() {
4548 let svc = make_service();
4549 create_connection(&svc, "conn-a");
4550 create_connection(&svc, "conn-b");
4551 create_api_destination(&svc, "dest-1", "conn-a");
4552 create_api_destination(&svc, "dest-2", "conn-b");
4553 create_api_destination(&svc, "dest-3", "conn-a");
4554
4555 let conn_a_arn = {
4556 let state = svc.state.read();
4557 state.connections.get("conn-a").unwrap().arn.clone()
4558 };
4559
4560 let req = make_request(
4561 "ListApiDestinations",
4562 json!({ "ConnectionArn": conn_a_arn }),
4563 );
4564 let resp = svc.list_api_destinations(&req).unwrap();
4565 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4566 let names: Vec<&str> = body["ApiDestinations"]
4567 .as_array()
4568 .unwrap()
4569 .iter()
4570 .map(|d| d["Name"].as_str().unwrap())
4571 .collect();
4572 assert_eq!(names.len(), 2);
4573 assert!(names.contains(&"dest-1"));
4574 assert!(names.contains(&"dest-3"));
4575 }
4576
4577 #[test]
4578 fn list_api_destinations_pagination() {
4579 let svc = make_service();
4580 create_connection(&svc, "my-conn");
4581 for i in 0..5 {
4582 create_api_destination(&svc, &format!("dest-{i:02}"), "my-conn");
4583 }
4584
4585 let req = make_request("ListApiDestinations", json!({ "Limit": 2 }));
4587 let resp = svc.list_api_destinations(&req).unwrap();
4588 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4589 assert_eq!(body["ApiDestinations"].as_array().unwrap().len(), 2);
4590 let token = body["NextToken"].as_str().unwrap();
4591 assert_eq!(token, "2");
4592
4593 let req = make_request(
4595 "ListApiDestinations",
4596 json!({ "Limit": 2, "NextToken": token }),
4597 );
4598 let resp = svc.list_api_destinations(&req).unwrap();
4599 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4600 assert_eq!(body["ApiDestinations"].as_array().unwrap().len(), 2);
4601 let token = body["NextToken"].as_str().unwrap();
4602 assert_eq!(token, "4");
4603
4604 let req = make_request(
4606 "ListApiDestinations",
4607 json!({ "Limit": 2, "NextToken": token }),
4608 );
4609 let resp = svc.list_api_destinations(&req).unwrap();
4610 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4611 assert_eq!(body["ApiDestinations"].as_array().unwrap().len(), 1);
4612 assert!(body["NextToken"].is_null());
4613 }
4614
4615 fn create_event_bus(svc: &EventBridgeService, name: &str) {
4618 let req = make_request("CreateEventBus", json!({ "Name": name }));
4619 svc.create_event_bus(&req).unwrap();
4620 }
4621
4622 #[test]
4623 fn list_event_buses_pagination() {
4624 let svc = make_service();
4625 for i in 0..4 {
4627 create_event_bus(&svc, &format!("bus-{i:02}"));
4628 }
4629
4630 let req = make_request("ListEventBuses", json!({ "Limit": 2 }));
4632 let resp = svc.list_event_buses(&req).unwrap();
4633 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4634 assert_eq!(body["EventBuses"].as_array().unwrap().len(), 2);
4635 let token = body["NextToken"].as_str().unwrap();
4636 assert_eq!(token, "2");
4637
4638 let req = make_request("ListEventBuses", json!({ "Limit": 2, "NextToken": token }));
4640 let resp = svc.list_event_buses(&req).unwrap();
4641 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4642 assert_eq!(body["EventBuses"].as_array().unwrap().len(), 2);
4643 let token = body["NextToken"].as_str().unwrap();
4644 assert_eq!(token, "4");
4645
4646 let req = make_request("ListEventBuses", json!({ "Limit": 2, "NextToken": token }));
4648 let resp = svc.list_event_buses(&req).unwrap();
4649 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4650 assert_eq!(body["EventBuses"].as_array().unwrap().len(), 1);
4651 assert!(body["NextToken"].is_null());
4652 }
4653
4654 #[test]
4655 fn list_event_buses_no_pagination_returns_all() {
4656 let svc = make_service();
4657 create_event_bus(&svc, "bus-alpha");
4658 create_event_bus(&svc, "bus-beta");
4659
4660 let req = make_request("ListEventBuses", json!({}));
4661 let resp = svc.list_event_buses(&req).unwrap();
4662 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4663 assert_eq!(body["EventBuses"].as_array().unwrap().len(), 3);
4665 assert!(body["NextToken"].is_null());
4666 }
4667
4668 #[test]
4671 fn put_events_never_includes_endpoint_id_in_response() {
4672 let svc = make_service();
4673 let req = make_request(
4675 "PutEvents",
4676 json!({
4677 "EndpointId": "my-endpoint.abc123",
4678 "Entries": [{
4679 "Source": "my.source",
4680 "DetailType": "MyType",
4681 "Detail": "{}",
4682 "EventBusName": "default"
4683 }]
4684 }),
4685 );
4686 let resp = svc.put_events(&req).unwrap();
4687 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4688 assert!(
4689 !body.as_object().unwrap().contains_key("EndpointId"),
4690 "EndpointId should never be in the PutEvents response"
4691 );
4692 assert_eq!(body["FailedEntryCount"], 0);
4693 }
4694
4695 fn create_archive(svc: &EventBridgeService, name: &str) {
4698 let req = make_request(
4699 "CreateArchive",
4700 json!({
4701 "ArchiveName": name,
4702 "EventSourceArn": "arn:aws:events:us-east-1:123456789012:event-bus/default"
4703 }),
4704 );
4705 svc.create_archive(&req).unwrap();
4706 }
4707
4708 #[test]
4709 fn list_archives_pagination() {
4710 let svc = make_service();
4711 for i in 0..5 {
4712 create_archive(&svc, &format!("archive-{i:02}"));
4713 }
4714
4715 let req = make_request("ListArchives", json!({ "Limit": 2 }));
4717 let resp = svc.list_archives(&req).unwrap();
4718 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4719 assert_eq!(body["Archives"].as_array().unwrap().len(), 2);
4720 let token = body["NextToken"].as_str().unwrap();
4721 assert_eq!(token, "2");
4722
4723 let req = make_request("ListArchives", json!({ "Limit": 2, "NextToken": token }));
4725 let resp = svc.list_archives(&req).unwrap();
4726 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4727 assert_eq!(body["Archives"].as_array().unwrap().len(), 2);
4728 let token = body["NextToken"].as_str().unwrap();
4729 assert_eq!(token, "4");
4730
4731 let req = make_request("ListArchives", json!({ "Limit": 2, "NextToken": token }));
4733 let resp = svc.list_archives(&req).unwrap();
4734 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4735 assert_eq!(body["Archives"].as_array().unwrap().len(), 1);
4736 assert!(body["NextToken"].is_null());
4737 }
4738
4739 fn create_replay(svc: &EventBridgeService, name: &str) {
4742 let archive_arn = {
4744 let state = svc.state.read();
4745 if state.archives.contains_key("replay-archive") {
4746 state.archives["replay-archive"].arn.clone()
4747 } else {
4748 drop(state);
4749 create_archive(svc, "replay-archive");
4750 svc.state.read().archives["replay-archive"].arn.clone()
4751 }
4752 };
4753 let req = make_request(
4754 "StartReplay",
4755 json!({
4756 "ReplayName": name,
4757 "EventSourceArn": archive_arn,
4758 "EventStartTime": 1000000.0,
4759 "EventEndTime": 2000000.0,
4760 "Destination": {
4761 "Arn": "arn:aws:events:us-east-1:123456789012:event-bus/default"
4762 }
4763 }),
4764 );
4765 svc.start_replay(&req).unwrap();
4766 }
4767
4768 #[test]
4769 fn list_replays_pagination() {
4770 let svc = make_service();
4771 for i in 0..5 {
4772 create_replay(&svc, &format!("replay-{i:02}"));
4773 }
4774
4775 let req = make_request("ListReplays", json!({ "Limit": 2 }));
4777 let resp = svc.list_replays(&req).unwrap();
4778 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4779 assert_eq!(body["Replays"].as_array().unwrap().len(), 2);
4780 let token = body["NextToken"].as_str().unwrap();
4781 assert_eq!(token, "2");
4782
4783 let req = make_request("ListReplays", json!({ "Limit": 2, "NextToken": token }));
4785 let resp = svc.list_replays(&req).unwrap();
4786 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4787 assert_eq!(body["Replays"].as_array().unwrap().len(), 2);
4788 let token = body["NextToken"].as_str().unwrap();
4789 assert_eq!(token, "4");
4790
4791 let req = make_request("ListReplays", json!({ "Limit": 2, "NextToken": token }));
4793 let resp = svc.list_replays(&req).unwrap();
4794 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4795 assert_eq!(body["Replays"].as_array().unwrap().len(), 1);
4796 assert!(body["NextToken"].is_null());
4797 }
4798
4799 #[test]
4800 fn list_event_buses_invalid_next_token_returns_error() {
4801 let svc = make_service();
4802
4803 let req = make_request("ListEventBuses", json!({ "NextToken": "not-a-number" }));
4804 let result = svc.list_event_buses(&req);
4805 assert!(
4806 result.is_err(),
4807 "non-numeric NextToken should return an error"
4808 );
4809 }
4810
4811 #[test]
4814 fn test_event_pattern_match() {
4815 let svc = make_service();
4816 let req = make_request(
4817 "TestEventPattern",
4818 json!({
4819 "EventPattern": r#"{"source": ["my.app"]}"#,
4820 "Event": r#"{"source": "my.app", "detail-type": "Test", "detail": {}}"#
4821 }),
4822 );
4823 let resp = svc.test_event_pattern(&req).unwrap();
4824 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4825 assert_eq!(body["Result"], true);
4826 }
4827
4828 #[test]
4829 fn test_event_pattern_no_match() {
4830 let svc = make_service();
4831 let req = make_request(
4832 "TestEventPattern",
4833 json!({
4834 "EventPattern": r#"{"source": ["other.app"]}"#,
4835 "Event": r#"{"source": "my.app", "detail-type": "Test", "detail": {}}"#
4836 }),
4837 );
4838 let resp = svc.test_event_pattern(&req).unwrap();
4839 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4840 assert_eq!(body["Result"], false);
4841 }
4842
4843 #[test]
4844 fn test_event_pattern_detail_match() {
4845 let svc = make_service();
4846 let req = make_request(
4847 "TestEventPattern",
4848 json!({
4849 "EventPattern": r#"{"detail": {"status": ["PLACED"]}}"#,
4850 "Event": r#"{"source": "my.app", "detail-type": "Order", "detail": {"status": "PLACED", "id": "123"}}"#
4851 }),
4852 );
4853 let resp = svc.test_event_pattern(&req).unwrap();
4854 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4855 assert_eq!(body["Result"], true);
4856 }
4857
4858 #[test]
4861 fn update_event_bus_description() {
4862 let svc = make_service();
4863 create_event_bus(&svc, "my-bus");
4864
4865 let req = make_request(
4866 "UpdateEventBus",
4867 json!({ "Name": "my-bus", "Description": "Updated desc" }),
4868 );
4869 let resp = svc.update_event_bus(&req).unwrap();
4870 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4871 assert_eq!(body["Name"], "my-bus");
4872
4873 let req = make_request("DescribeEventBus", json!({ "Name": "my-bus" }));
4875 let resp = svc.describe_event_bus(&req).unwrap();
4876 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4877 assert_eq!(body["Description"], "Updated desc");
4878 }
4879
4880 #[test]
4881 fn update_event_bus_not_found() {
4882 let svc = make_service();
4883 let req = make_request(
4884 "UpdateEventBus",
4885 json!({ "Name": "ghost-bus", "Description": "nope" }),
4886 );
4887 assert!(svc.update_event_bus(&req).is_err());
4888 }
4889
4890 fn create_endpoint_helper(svc: &EventBridgeService, name: &str) {
4893 let req = make_request(
4894 "CreateEndpoint",
4895 json!({
4896 "Name": name,
4897 "RoutingConfig": {
4898 "FailoverConfig": {
4899 "Primary": { "HealthCheck": "" },
4900 "Secondary": { "Route": "us-west-2" }
4901 }
4902 },
4903 "EventBuses": [
4904 { "EventBusArn": "arn:aws:events:us-east-1:123456789012:event-bus/default" }
4905 ]
4906 }),
4907 );
4908 svc.create_endpoint(&req).unwrap();
4909 }
4910
4911 #[test]
4912 fn endpoint_create_describe_delete() {
4913 let svc = make_service();
4914 create_endpoint_helper(&svc, "my-endpoint");
4915
4916 let req = make_request("DescribeEndpoint", json!({ "Name": "my-endpoint" }));
4918 let resp = svc.describe_endpoint(&req).unwrap();
4919 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4920 assert_eq!(body["Name"], "my-endpoint");
4921 assert_eq!(body["State"], "ACTIVE");
4922 assert!(body["EndpointId"].as_str().unwrap().contains("my-endpoint"));
4923
4924 let req = make_request("DeleteEndpoint", json!({ "Name": "my-endpoint" }));
4926 svc.delete_endpoint(&req).unwrap();
4927
4928 let req = make_request("DescribeEndpoint", json!({ "Name": "my-endpoint" }));
4930 assert!(svc.describe_endpoint(&req).is_err());
4931 }
4932
4933 #[test]
4934 fn endpoint_list_and_update() {
4935 let svc = make_service();
4936 create_endpoint_helper(&svc, "ep-alpha");
4937 create_endpoint_helper(&svc, "ep-beta");
4938
4939 let req = make_request("ListEndpoints", json!({}));
4941 let resp = svc.list_endpoints(&req).unwrap();
4942 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4943 assert_eq!(body["Endpoints"].as_array().unwrap().len(), 2);
4944
4945 let req = make_request(
4947 "UpdateEndpoint",
4948 json!({ "Name": "ep-alpha", "Description": "updated" }),
4949 );
4950 let resp = svc.update_endpoint(&req).unwrap();
4951 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4952 assert_eq!(body["Name"], "ep-alpha");
4953
4954 let req = make_request("DescribeEndpoint", json!({ "Name": "ep-alpha" }));
4956 let resp = svc.describe_endpoint(&req).unwrap();
4957 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4958 assert_eq!(body["Description"], "updated");
4959 }
4960
4961 #[test]
4962 fn endpoint_duplicate_fails() {
4963 let svc = make_service();
4964 create_endpoint_helper(&svc, "dup-ep");
4965 let req = make_request(
4966 "CreateEndpoint",
4967 json!({
4968 "Name": "dup-ep",
4969 "RoutingConfig": {},
4970 "EventBuses": []
4971 }),
4972 );
4973 assert!(svc.create_endpoint(&req).is_err());
4974 }
4975
4976 #[test]
4979 fn deauthorize_connection_sets_state() {
4980 let svc = make_service();
4981 create_connection(&svc, "deauth-conn");
4982
4983 let req = make_request("DeauthorizeConnection", json!({ "Name": "deauth-conn" }));
4984 let resp = svc.deauthorize_connection(&req).unwrap();
4985 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4986 assert_eq!(body["ConnectionState"], "DEAUTHORIZING");
4987 assert!(body["ConnectionArn"]
4988 .as_str()
4989 .unwrap()
4990 .contains("deauth-conn"));
4991
4992 let req = make_request("DescribeConnection", json!({ "Name": "deauth-conn" }));
4994 let resp = svc.describe_connection(&req).unwrap();
4995 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4996 assert_eq!(body["ConnectionState"], "DEAUTHORIZING");
4997 }
4998
4999 #[test]
5000 fn deauthorize_connection_not_found() {
5001 let svc = make_service();
5002 let req = make_request("DeauthorizeConnection", json!({ "Name": "ghost-conn" }));
5003 assert!(svc.deauthorize_connection(&req).is_err());
5004 }
5005
5006 #[test]
5009 fn partner_event_source_crud() {
5010 let svc = make_service();
5011
5012 let req = make_request(
5014 "CreatePartnerEventSource",
5015 json!({ "Name": "partner/test", "Account": "123456789012" }),
5016 );
5017 svc.create_partner_event_source(&req).unwrap();
5018
5019 let req = make_request(
5021 "DescribePartnerEventSource",
5022 json!({ "Name": "partner/test" }),
5023 );
5024 let resp = svc.describe_partner_event_source(&req).unwrap();
5025 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5026 assert_eq!(body["Name"], "partner/test");
5027
5028 let req = make_request("ListPartnerEventSources", json!({"NamePrefix": "partner/"}));
5030 let resp = svc.list_partner_event_sources(&req).unwrap();
5031 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5032 assert_eq!(body["PartnerEventSources"].as_array().unwrap().len(), 1);
5033
5034 let req = make_request(
5036 "ListPartnerEventSourceAccounts",
5037 json!({ "EventSourceName": "partner/test" }),
5038 );
5039 let resp = svc.list_partner_event_source_accounts(&req).unwrap();
5040 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5041 assert_eq!(
5042 body["PartnerEventSourceAccounts"].as_array().unwrap().len(),
5043 1
5044 );
5045
5046 let req = make_request("DescribeEventSource", json!({ "Name": "partner/test" }));
5048 let resp = svc.describe_event_source(&req).unwrap();
5049 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5050 assert_eq!(body["Name"], "partner/test");
5051 assert_eq!(body["State"], "ACTIVE");
5052
5053 let req = make_request("ListEventSources", json!({}));
5055 let resp = svc.list_event_sources(&req).unwrap();
5056 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5057 assert_eq!(body["EventSources"].as_array().unwrap().len(), 1);
5058
5059 let req = make_request(
5061 "DeletePartnerEventSource",
5062 json!({ "Name": "partner/test", "Account": "123456789012" }),
5063 );
5064 svc.delete_partner_event_source(&req).unwrap();
5065
5066 let req = make_request(
5068 "DescribePartnerEventSource",
5069 json!({ "Name": "partner/test" }),
5070 );
5071 assert!(svc.describe_partner_event_source(&req).is_err());
5072 }
5073
5074 #[test]
5075 fn activate_deactivate_event_source() {
5076 let svc = make_service();
5077
5078 let req = make_request(
5080 "CreatePartnerEventSource",
5081 json!({ "Name": "aws.partner/test", "Account": "123456789012" }),
5082 );
5083 svc.create_partner_event_source(&req).unwrap();
5084
5085 let req = make_request(
5087 "DeactivateEventSource",
5088 json!({ "Name": "aws.partner/test" }),
5089 );
5090 svc.deactivate_event_source(&req).unwrap();
5091 {
5092 let state = svc.state.read();
5093 assert_eq!(
5094 state.partner_event_sources["aws.partner/test"].state,
5095 "INACTIVE"
5096 );
5097 }
5098
5099 let req = make_request("ActivateEventSource", json!({ "Name": "aws.partner/test" }));
5101 svc.activate_event_source(&req).unwrap();
5102 {
5103 let state = svc.state.read();
5104 assert_eq!(
5105 state.partner_event_sources["aws.partner/test"].state,
5106 "ACTIVE"
5107 );
5108 }
5109
5110 let req = make_request("ActivateEventSource", json!({ "Name": "nonexistent" }));
5112 assert!(svc.activate_event_source(&req).is_err());
5113
5114 let req = make_request("DeactivateEventSource", json!({ "Name": "nonexistent" }));
5115 assert!(svc.deactivate_event_source(&req).is_err());
5116 }
5117
5118 #[test]
5119 fn delete_partner_event_source_verifies_account() {
5120 let svc = make_service();
5121
5122 let req = make_request(
5124 "CreatePartnerEventSource",
5125 json!({ "Name": "aws.partner/test", "Account": "123456789012" }),
5126 );
5127 svc.create_partner_event_source(&req).unwrap();
5128
5129 let req = make_request(
5131 "DeletePartnerEventSource",
5132 json!({ "Name": "aws.partner/test", "Account": "999999999999" }),
5133 );
5134 assert!(svc.delete_partner_event_source(&req).is_err());
5135 assert!(svc
5137 .state
5138 .read()
5139 .partner_event_sources
5140 .contains_key("aws.partner/test"));
5141
5142 let req = make_request(
5144 "DeletePartnerEventSource",
5145 json!({ "Name": "aws.partner/test", "Account": "123456789012" }),
5146 );
5147 svc.delete_partner_event_source(&req).unwrap();
5148 assert!(!svc
5149 .state
5150 .read()
5151 .partner_event_sources
5152 .contains_key("aws.partner/test"));
5153
5154 let req = make_request(
5156 "DeletePartnerEventSource",
5157 json!({ "Name": "aws.partner/test", "Account": "123456789012" }),
5158 );
5159 assert!(svc.delete_partner_event_source(&req).is_err());
5160 }
5161
5162 #[test]
5163 fn put_partner_events() {
5164 let svc = make_service();
5165 let req = make_request(
5166 "PutPartnerEvents",
5167 json!({
5168 "Entries": [
5169 { "Source": "partner.app", "DetailType": "Test", "Detail": "{}" }
5170 ]
5171 }),
5172 );
5173 let resp = svc.put_partner_events(&req).unwrap();
5174 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5175 assert_eq!(body["FailedEntryCount"], 0);
5176 assert_eq!(body["Entries"].as_array().unwrap().len(), 1);
5177 assert!(body["Entries"][0]["EventId"].as_str().is_some());
5178 }
5179
5180 #[allow(clippy::type_complexity)]
5184 fn make_service_with_sqs_recorder() -> (
5185 EventBridgeService,
5186 Arc<parking_lot::Mutex<Vec<(String, String)>>>,
5187 ) {
5188 use fakecloud_core::delivery::SqsDelivery;
5189
5190 struct RecordingSqsDelivery {
5191 messages: Arc<parking_lot::Mutex<Vec<(String, String)>>>,
5192 }
5193
5194 impl SqsDelivery for RecordingSqsDelivery {
5195 fn deliver_to_queue(
5196 &self,
5197 queue_arn: &str,
5198 message_body: &str,
5199 _attributes: &HashMap<String, String>,
5200 ) {
5201 self.messages
5202 .lock()
5203 .push((queue_arn.to_string(), message_body.to_string()));
5204 }
5205 }
5206
5207 let messages: Arc<parking_lot::Mutex<Vec<(String, String)>>> =
5208 Arc::new(parking_lot::Mutex::new(Vec::new()));
5209 let state = Arc::new(RwLock::new(EventBridgeState::new(
5210 "123456789012",
5211 "us-east-1",
5212 )));
5213 let delivery = Arc::new(DeliveryBus::new().with_sqs(Arc::new(RecordingSqsDelivery {
5214 messages: messages.clone(),
5215 })));
5216 let svc = EventBridgeService::new(state, delivery);
5217 (svc, messages)
5218 }
5219
5220 #[test]
5221 fn start_replay_delivers_archived_events_to_sqs_target() {
5222 let (svc, messages) = make_service_with_sqs_recorder();
5223 let queue_arn = "arn:aws:sqs:us-east-1:123456789012:replay-queue";
5224
5225 let req = make_request(
5227 "PutRule",
5228 json!({
5229 "Name": "replay-test-rule",
5230 "EventPattern": r#"{"source": ["my.app"]}"#,
5231 "State": "ENABLED"
5232 }),
5233 );
5234 svc.put_rule(&req).unwrap();
5235
5236 let req = make_request(
5237 "PutTargets",
5238 json!({
5239 "Rule": "replay-test-rule",
5240 "Targets": [{
5241 "Id": "sqs-target",
5242 "Arn": queue_arn
5243 }]
5244 }),
5245 );
5246 svc.put_targets(&req).unwrap();
5247
5248 let req = make_request(
5250 "CreateArchive",
5251 json!({
5252 "ArchiveName": "test-archive",
5253 "EventSourceArn": "arn:aws:events:us-east-1:123456789012:event-bus/default"
5254 }),
5255 );
5256 svc.create_archive(&req).unwrap();
5257
5258 let req = make_request(
5260 "PutEvents",
5261 json!({
5262 "Entries": [
5263 {
5264 "Source": "my.app",
5265 "DetailType": "OrderCreated",
5266 "Detail": "{\"orderId\": \"1\"}",
5267 "EventBusName": "default"
5268 },
5269 {
5270 "Source": "my.app",
5271 "DetailType": "OrderShipped",
5272 "Detail": "{\"orderId\": \"2\"}",
5273 "EventBusName": "default"
5274 }
5275 ]
5276 }),
5277 );
5278 let resp = svc.put_events(&req).unwrap();
5279 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5280 assert_eq!(body["FailedEntryCount"], 0);
5281
5282 {
5284 let state = svc.state.read();
5285 let archive = state.archives.get("test-archive").unwrap();
5286 assert_eq!(archive.events.len(), 2);
5287 assert_eq!(archive.event_count, 2);
5288 }
5289
5290 messages.lock().clear();
5292
5293 let archive_arn = {
5295 let state = svc.state.read();
5296 state.archives.get("test-archive").unwrap().arn.clone()
5297 };
5298
5299 let start_ts = 0.0_f64;
5301 let end_ts = (chrono::Utc::now().timestamp() + 3600) as f64;
5302
5303 let req = make_request(
5304 "StartReplay",
5305 json!({
5306 "ReplayName": "my-replay",
5307 "EventSourceArn": archive_arn,
5308 "Destination": {
5309 "Arn": "arn:aws:events:us-east-1:123456789012:event-bus/default"
5310 },
5311 "EventStartTime": start_ts,
5312 "EventEndTime": end_ts
5313 }),
5314 );
5315 let resp = svc.start_replay(&req).unwrap();
5316 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5317 assert_eq!(body["State"], "STARTING");
5318
5319 let delivered = messages.lock();
5321 assert_eq!(
5322 delivered.len(),
5323 2,
5324 "expected 2 replayed events delivered to SQS"
5325 );
5326 for (arn, msg) in delivered.iter() {
5327 assert_eq!(arn, queue_arn);
5328 let event: Value = serde_json::from_str(msg).unwrap();
5329 assert_eq!(event["source"], "my.app");
5330 assert!(event["replay-name"].as_str().is_some());
5332 }
5333
5334 let state = svc.state.read();
5336 let replay = state.replays.get("my-replay").unwrap();
5337 assert_eq!(replay.state, "COMPLETED");
5338 }
5339
5340 #[test]
5341 fn apply_connection_auth_api_key() {
5342 let conn = Connection {
5343 name: "test-conn".to_string(),
5344 arn: "arn:aws:events:us-east-1:123456789012:connection/test-conn/uuid".to_string(),
5345 description: None,
5346 authorization_type: "API_KEY".to_string(),
5347 auth_parameters: json!({
5348 "ApiKeyAuthParameters": {
5349 "ApiKeyName": "x-api-key",
5350 "ApiKeyValue": "my-secret"
5351 }
5352 }),
5353 connection_state: "AUTHORIZED".to_string(),
5354 secret_arn: "arn:aws:secretsmanager:us-east-1:123456789012:secret:test".to_string(),
5355 creation_time: Utc::now(),
5356 last_modified_time: Utc::now(),
5357 last_authorized_time: Utc::now(),
5358 };
5359
5360 let client = reqwest::Client::new();
5361 let builder = client
5362 .post("http://localhost:12345/test")
5363 .header("Content-Type", "application/json");
5364 let builder = apply_connection_auth(builder, &conn);
5365
5366 let request = builder.body("{}").build().unwrap();
5368 assert_eq!(
5369 request
5370 .headers()
5371 .get("x-api-key")
5372 .unwrap()
5373 .to_str()
5374 .unwrap(),
5375 "my-secret"
5376 );
5377 }
5378
5379 #[test]
5380 fn apply_connection_auth_basic() {
5381 let conn = Connection {
5382 name: "basic-conn".to_string(),
5383 arn: "arn:aws:events:us-east-1:123456789012:connection/basic-conn/uuid".to_string(),
5384 description: None,
5385 authorization_type: "BASIC".to_string(),
5386 auth_parameters: json!({
5387 "BasicAuthParameters": {
5388 "Username": "user",
5389 "Password": "pass"
5390 }
5391 }),
5392 connection_state: "AUTHORIZED".to_string(),
5393 secret_arn: "arn:aws:secretsmanager:us-east-1:123456789012:secret:test".to_string(),
5394 creation_time: Utc::now(),
5395 last_modified_time: Utc::now(),
5396 last_authorized_time: Utc::now(),
5397 };
5398
5399 let client = reqwest::Client::new();
5400 let builder = client.post("http://localhost:12345/test");
5401 let builder = apply_connection_auth(builder, &conn);
5402
5403 let request = builder.body("{}").build().unwrap();
5404 let auth_header = request
5405 .headers()
5406 .get("authorization")
5407 .unwrap()
5408 .to_str()
5409 .unwrap();
5410 assert!(
5411 auth_header.starts_with("Basic "),
5412 "Expected Basic auth header, got: {auth_header}"
5413 );
5414 }
5415
5416 #[tokio::test]
5417 async fn put_events_with_api_destination_target_resolves_destination() {
5418 let state = Arc::new(RwLock::new(EventBridgeState::new(
5422 "123456789012",
5423 "us-east-1",
5424 )));
5425 let delivery = Arc::new(DeliveryBus::new());
5426 let svc = EventBridgeService::new(state, delivery);
5427
5428 create_connection(&svc, "my-conn");
5430 let conn_arn = {
5431 let state = svc.state.read();
5432 state.connections.get("my-conn").unwrap().arn.clone()
5433 };
5434 let req = make_request(
5435 "CreateApiDestination",
5436 json!({
5437 "Name": "my-dest",
5438 "ConnectionArn": conn_arn,
5439 "InvocationEndpoint": "http://127.0.0.1:1/noop",
5440 "HttpMethod": "POST"
5441 }),
5442 );
5443 svc.create_api_destination(&req).unwrap();
5444
5445 let dest_arn = {
5446 let state = svc.state.read();
5447 state.api_destinations.get("my-dest").unwrap().arn.clone()
5448 };
5449
5450 let req = make_request(
5452 "PutRule",
5453 json!({
5454 "Name": "api-dest-rule",
5455 "EventPattern": r#"{"source":["test.app"]}"#,
5456 "State": "ENABLED"
5457 }),
5458 );
5459 svc.put_rule(&req).unwrap();
5460
5461 let req = make_request(
5462 "PutTargets",
5463 json!({
5464 "Rule": "api-dest-rule",
5465 "Targets": [{ "Id": "dest-target", "Arn": dest_arn }]
5466 }),
5467 );
5468 svc.put_targets(&req).unwrap();
5469
5470 let req = make_request(
5472 "PutEvents",
5473 json!({
5474 "Entries": [{
5475 "Source": "test.app",
5476 "DetailType": "TestEvent",
5477 "Detail": r#"{"key":"value"}"#
5478 }]
5479 }),
5480 );
5481 let resp = svc.put_events(&req).unwrap();
5482 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5483 assert_eq!(body["FailedEntryCount"], 0);
5484 assert_eq!(body["Entries"].as_array().unwrap().len(), 1);
5485 assert!(body["Entries"][0]["EventId"].as_str().is_some());
5486 }
5487
5488 #[test]
5489 fn test_function_name_from_arn() {
5490 assert_eq!(
5492 super::function_name_from_arn("arn:aws:lambda:us-east-1:123456789012:function:my-func"),
5493 "my-func"
5494 );
5495 assert_eq!(
5497 super::function_name_from_arn(
5498 "arn:aws:lambda:us-east-1:123456789012:function:my-func:prod"
5499 ),
5500 "my-func"
5501 );
5502 assert_eq!(
5504 super::function_name_from_arn(
5505 "arn:aws:lambda:us-east-1:123456789012:function:my-func:42"
5506 ),
5507 "my-func"
5508 );
5509 assert_eq!(super::function_name_from_arn("my-func"), "my-func");
5511 }
5512}