1use async_trait::async_trait;
2use chrono::{DateTime, Utc};
3use http::StatusCode;
4use serde_json::{json, Value};
5
6use std::collections::HashMap;
7use std::sync::Arc;
8
9use fakecloud_aws::arn::Arn;
10use fakecloud_core::delivery::DeliveryBus;
11use fakecloud_core::pagination::paginate;
12use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
13use fakecloud_core::validation::*;
14
15use fakecloud_lambda::runtime::ContainerRuntime;
16use fakecloud_lambda::state::{LambdaInvocation, SharedLambdaState};
17use fakecloud_logs::state::SharedLogsState;
18
19use crate::state::{
20 ApiDestination, Archive, Connection, Endpoint, EventBus, EventRule, EventTarget,
21 PartnerEventSource, PutEvent, Replay, SharedEventBridgeState,
22};
23
24fn validate_put_events_entry(source: &str, detail_type: &str, detail: &str) -> Result<(), Value> {
29 if source.is_empty() {
30 return Err(json!({
31 "ErrorCode": "InvalidArgument",
32 "ErrorMessage": "Parameter Source is not valid. Reason: Source is a required argument.",
33 }));
34 }
35 if detail_type.is_empty() {
36 return Err(json!({
37 "ErrorCode": "InvalidArgument",
38 "ErrorMessage": "Parameter DetailType is not valid. Reason: DetailType is a required argument.",
39 }));
40 }
41 if detail.is_empty() {
42 return Err(json!({
43 "ErrorCode": "InvalidArgument",
44 "ErrorMessage": "Parameter Detail is not valid. Reason: Detail is a required argument.",
45 }));
46 }
47 if serde_json::from_str::<Value>(detail).is_err() {
48 return Err(json!({
49 "ErrorCode": "MalformedDetail",
50 "ErrorMessage": "Detail is malformed.",
51 }));
52 }
53 Ok(())
54}
55
56fn parse_put_events_time(raw: &Value) -> DateTime<Utc> {
61 if let Some(s) = raw.as_str() {
62 return DateTime::parse_from_rfc3339(s)
63 .map(|dt| dt.with_timezone(&Utc))
64 .unwrap_or_else(|_| Utc::now());
65 }
66 if let Some(ts) = raw.as_f64() {
67 return DateTime::from_timestamp(ts as i64, ((ts.fract()) * 1_000_000_000.0) as u32)
68 .unwrap_or_else(Utc::now);
69 }
70 if let Some(ts) = raw.as_i64() {
71 return DateTime::from_timestamp(ts, 0).unwrap_or_else(Utc::now);
72 }
73 Utc::now()
74}
75
76pub struct EventBridgeService {
77 state: SharedEventBridgeState,
78 delivery: Arc<DeliveryBus>,
79 lambda_state: Option<SharedLambdaState>,
80 logs_state: Option<SharedLogsState>,
81 container_runtime: Option<Arc<ContainerRuntime>>,
82}
83
84impl EventBridgeService {
85 pub fn new(state: SharedEventBridgeState, delivery: Arc<DeliveryBus>) -> Self {
86 Self {
87 state,
88 delivery,
89 lambda_state: None,
90 logs_state: None,
91 container_runtime: None,
92 }
93 }
94
95 pub fn with_lambda(mut self, lambda_state: SharedLambdaState) -> Self {
96 self.lambda_state = Some(lambda_state);
97 self
98 }
99
100 pub fn with_logs(mut self, logs_state: SharedLogsState) -> Self {
101 self.logs_state = Some(logs_state);
102 self
103 }
104
105 pub fn with_runtime(mut self, runtime: Arc<ContainerRuntime>) -> Self {
106 self.container_runtime = Some(runtime);
107 self
108 }
109}
110
111#[async_trait]
112impl AwsService for EventBridgeService {
113 fn service_name(&self) -> &str {
114 "events"
115 }
116
117 async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
118 match req.action.as_str() {
119 "CreateEventBus" => self.create_event_bus(&req),
120 "DeleteEventBus" => self.delete_event_bus(&req),
121 "ListEventBuses" => self.list_event_buses(&req),
122 "DescribeEventBus" => self.describe_event_bus(&req),
123 "PutRule" => self.put_rule(&req),
124 "DeleteRule" => self.delete_rule(&req),
125 "ListRules" => self.list_rules(&req),
126 "DescribeRule" => self.describe_rule(&req),
127 "EnableRule" => self.enable_rule(&req),
128 "DisableRule" => self.disable_rule(&req),
129 "PutTargets" => self.put_targets(&req),
130 "RemoveTargets" => self.remove_targets(&req),
131 "ListTargetsByRule" => self.list_targets_by_rule(&req),
132 "ListRuleNamesByTarget" => self.list_rule_names_by_target(&req),
133 "PutEvents" => self.put_events(&req),
134 "PutPermission" => self.put_permission(&req),
135 "RemovePermission" => self.remove_permission(&req),
136 "TagResource" => self.tag_resource(&req),
137 "UntagResource" => self.untag_resource(&req),
138 "ListTagsForResource" => self.list_tags_for_resource(&req),
139 "CreateArchive" => self.create_archive(&req),
140 "DescribeArchive" => self.describe_archive(&req),
141 "ListArchives" => self.list_archives(&req),
142 "UpdateArchive" => self.update_archive(&req),
143 "DeleteArchive" => self.delete_archive(&req),
144 "CreateConnection" => self.create_connection(&req),
145 "DescribeConnection" => self.describe_connection(&req),
146 "ListConnections" => self.list_connections(&req),
147 "UpdateConnection" => self.update_connection(&req),
148 "DeleteConnection" => self.delete_connection(&req),
149 "CreateApiDestination" => self.create_api_destination(&req),
150 "DescribeApiDestination" => self.describe_api_destination(&req),
151 "ListApiDestinations" => self.list_api_destinations(&req),
152 "UpdateApiDestination" => self.update_api_destination(&req),
153 "DeleteApiDestination" => self.delete_api_destination(&req),
154 "StartReplay" => self.start_replay(&req),
155 "DescribeReplay" => self.describe_replay(&req),
156 "ListReplays" => self.list_replays(&req),
157 "CancelReplay" => self.cancel_replay(&req),
158 "CreatePartnerEventSource" => self.create_partner_event_source(&req),
159 "DeletePartnerEventSource" => self.delete_partner_event_source(&req),
160 "DescribePartnerEventSource" => self.describe_partner_event_source(&req),
161 "ListPartnerEventSources" => self.list_partner_event_sources(&req),
162 "ListPartnerEventSourceAccounts" => self.list_partner_event_source_accounts(&req),
163 "ActivateEventSource" => self.activate_event_source(&req),
164 "DeactivateEventSource" => self.deactivate_event_source(&req),
165 "DescribeEventSource" => self.describe_event_source(&req),
166 "ListEventSources" => self.list_event_sources(&req),
167 "PutPartnerEvents" => self.put_partner_events(&req),
168 "TestEventPattern" => self.test_event_pattern(&req),
169 "UpdateEventBus" => self.update_event_bus(&req),
170 "CreateEndpoint" => self.create_endpoint(&req),
171 "DeleteEndpoint" => self.delete_endpoint(&req),
172 "DescribeEndpoint" => self.describe_endpoint(&req),
173 "ListEndpoints" => self.list_endpoints(&req),
174 "UpdateEndpoint" => self.update_endpoint(&req),
175 "DeauthorizeConnection" => self.deauthorize_connection(&req),
176 _ => Err(AwsServiceError::action_not_implemented(
177 "events",
178 &req.action,
179 )),
180 }
181 }
182
183 fn supported_actions(&self) -> &[&str] {
184 &[
185 "CreateEventBus",
186 "DeleteEventBus",
187 "ListEventBuses",
188 "DescribeEventBus",
189 "PutRule",
190 "DeleteRule",
191 "ListRules",
192 "DescribeRule",
193 "EnableRule",
194 "DisableRule",
195 "PutTargets",
196 "RemoveTargets",
197 "ListTargetsByRule",
198 "ListRuleNamesByTarget",
199 "PutEvents",
200 "PutPermission",
201 "RemovePermission",
202 "TagResource",
203 "UntagResource",
204 "ListTagsForResource",
205 "CreateArchive",
206 "DescribeArchive",
207 "ListArchives",
208 "UpdateArchive",
209 "DeleteArchive",
210 "CreateConnection",
211 "DescribeConnection",
212 "ListConnections",
213 "UpdateConnection",
214 "DeleteConnection",
215 "CreateApiDestination",
216 "DescribeApiDestination",
217 "ListApiDestinations",
218 "UpdateApiDestination",
219 "DeleteApiDestination",
220 "StartReplay",
221 "DescribeReplay",
222 "ListReplays",
223 "CancelReplay",
224 "CreatePartnerEventSource",
225 "DeletePartnerEventSource",
226 "DescribePartnerEventSource",
227 "ListPartnerEventSources",
228 "ListPartnerEventSourceAccounts",
229 "ActivateEventSource",
230 "DeactivateEventSource",
231 "DescribeEventSource",
232 "ListEventSources",
233 "PutPartnerEvents",
234 "TestEventPattern",
235 "UpdateEventBus",
236 "CreateEndpoint",
237 "DeleteEndpoint",
238 "DescribeEndpoint",
239 "ListEndpoints",
240 "UpdateEndpoint",
241 "DeauthorizeConnection",
242 ]
243 }
244}
245
246fn parse_tags(body: &Value) -> HashMap<String, String> {
247 let mut tags = HashMap::new();
248 if let Some(arr) = body["Tags"].as_array() {
249 for tag in arr {
250 if let (Some(key), Some(val)) = (tag["Key"].as_str(), tag["Value"].as_str()) {
251 tags.insert(key.to_string(), val.to_string());
252 }
253 }
254 }
255 tags
256}
257
258fn parse_target(target: &Value) -> EventTarget {
259 EventTarget {
260 id: target["Id"].as_str().unwrap_or("").to_string(),
261 arn: target["Arn"].as_str().unwrap_or("").to_string(),
262 input: target["Input"].as_str().map(|s| s.to_string()),
263 input_path: target["InputPath"].as_str().map(|s| s.to_string()),
264 input_transformer: target.get("InputTransformer").cloned(),
265 sqs_parameters: target.get("SqsParameters").cloned(),
266 }
267}
268
269fn target_to_json(t: &EventTarget) -> Value {
270 let mut obj = json!({ "Id": t.id, "Arn": t.arn });
271 if let Some(ref input) = t.input {
272 obj["Input"] = json!(input);
273 }
274 if let Some(ref input_path) = t.input_path {
275 obj["InputPath"] = json!(input_path);
276 }
277 if let Some(ref it) = t.input_transformer {
278 obj["InputTransformer"] = it.clone();
279 }
280 if let Some(ref sp) = t.sqs_parameters {
281 obj["SqsParameters"] = sp.clone();
282 }
283 obj
284}
285
286impl EventBridgeService {
288 fn create_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
289 let body = req.json_body();
290 validate_required("Name", &body["Name"])?;
291 let name = body["Name"]
292 .as_str()
293 .ok_or_else(|| missing("Name"))?
294 .to_string();
295 validate_string_length("name", &name, 1, 256)?;
296 validate_optional_string_length(
297 "eventSourceName",
298 body["EventSourceName"].as_str(),
299 1,
300 256,
301 )?;
302 validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
303 validate_optional_string_length(
304 "kmsKeyIdentifier",
305 body["KmsKeyIdentifier"].as_str(),
306 0,
307 2048,
308 )?;
309
310 if name.contains('/') && !name.starts_with("aws.partner/") {
312 return Err(AwsServiceError::aws_error(
313 StatusCode::BAD_REQUEST,
314 "ValidationException",
315 "Event bus name must not contain '/'.",
316 ));
317 }
318
319 if name.starts_with("aws.partner/") {
321 let event_source = body["EventSourceName"].as_str().unwrap_or("");
322 let state_r = self.state.read();
323 let has_source = state_r.partner_event_sources.contains_key(event_source);
324 drop(state_r);
325 if !has_source {
326 return Err(AwsServiceError::aws_error(
327 StatusCode::BAD_REQUEST,
328 "ResourceNotFoundException",
329 format!("Event source {event_source} does not exist."),
330 ));
331 }
332 }
333
334 let mut state = self.state.write();
335
336 if state.buses.contains_key(&name) {
337 return Err(AwsServiceError::aws_error(
338 StatusCode::BAD_REQUEST,
339 "ResourceAlreadyExistsException",
340 format!("Event bus {name} already exists."),
341 ));
342 }
343
344 let arn = format!(
345 "arn:aws:events:{}:{}:event-bus/{}",
346 req.region, state.account_id, name
347 );
348 let now = Utc::now();
349 let description = body["Description"].as_str().map(|s| s.to_string());
350 let kms_key_identifier = body["KmsKeyIdentifier"].as_str().map(|s| s.to_string());
351 let dead_letter_config = body.get("DeadLetterConfig").cloned();
352
353 let tags = parse_tags(&body);
354
355 let bus = EventBus {
356 name: name.clone(),
357 arn: arn.clone(),
358 tags,
359 policy: None,
360 description,
361 kms_key_identifier,
362 dead_letter_config,
363 creation_time: now,
364 last_modified_time: now,
365 };
366 state.buses.insert(name, bus);
367
368 Ok(AwsResponse::ok_json(json!({ "EventBusArn": arn })))
369 }
370
371 fn delete_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
372 let body = req.json_body();
373 validate_required("Name", &body["Name"])?;
374 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
375 validate_string_length("name", name, 1, 256)?;
376
377 if name == "default" {
378 return Err(AwsServiceError::aws_error(
379 StatusCode::BAD_REQUEST,
380 "ValidationException",
381 format!("Cannot delete event bus {name}."),
382 ));
383 }
384
385 let mut state = self.state.write();
386 state.buses.remove(name);
387 state.rules.retain(|k, _| k.0 != name);
388
389 Ok(AwsResponse::ok_json(json!({})))
390 }
391
392 fn list_event_buses(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
393 let body = req.json_body();
394 validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 256)?;
395 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
396 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
397 let name_prefix = body["NamePrefix"].as_str();
398 let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
399 if let Some(t) = body["NextToken"].as_str() {
400 t.parse::<usize>().map_err(|_| {
401 AwsServiceError::aws_error(
402 StatusCode::BAD_REQUEST,
403 "InvalidNextTokenException",
404 format!("Invalid NextToken value: '{t}'"),
405 )
406 })?;
407 }
408
409 let state = self.state.read();
410 let filtered: Vec<&_> = state
411 .buses
412 .values()
413 .filter(|b| match name_prefix {
414 Some(prefix) => b.name.starts_with(prefix),
415 None => true,
416 })
417 .collect();
418
419 let (page, next_token) = paginate(&filtered, body["NextToken"].as_str(), limit);
420 let buses: Vec<Value> = page
421 .iter()
422 .map(|b| json!({ "Name": b.name, "Arn": b.arn }))
423 .collect();
424 let mut resp = json!({ "EventBuses": buses });
425 if let Some(token) = next_token {
426 resp["NextToken"] = json!(token);
427 }
428
429 Ok(AwsResponse::ok_json(resp))
430 }
431
432 fn describe_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
433 let body = req.json_body();
434 validate_optional_string_length("name", body["Name"].as_str(), 1, 1600)?;
435 let name = body["Name"].as_str().unwrap_or("default");
436
437 let state = self.state.read();
438 let bus = state.buses.get(name).ok_or_else(|| {
439 AwsServiceError::aws_error(
440 StatusCode::BAD_REQUEST,
441 "ResourceNotFoundException",
442 format!("Event bus {name} does not exist."),
443 )
444 })?;
445
446 let mut resp = json!({
447 "Name": bus.name,
448 "Arn": bus.arn,
449 "CreationTime": bus.creation_time.timestamp() as f64,
450 "LastModifiedTime": bus.last_modified_time.timestamp() as f64,
451 });
452
453 if let Some(ref policy) = bus.policy {
454 resp["Policy"] = Value::String(serde_json::to_string(policy).unwrap());
455 }
456 if let Some(ref desc) = bus.description {
457 resp["Description"] = json!(desc);
458 }
459 if let Some(ref kms) = bus.kms_key_identifier {
460 resp["KmsKeyIdentifier"] = json!(kms);
461 }
462 if let Some(ref dlc) = bus.dead_letter_config {
463 resp["DeadLetterConfig"] = dlc.clone();
464 }
465
466 Ok(AwsResponse::ok_json(resp))
467 }
468
469 fn put_permission(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
472 let body = req.json_body();
473 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 256)?;
474 validate_optional_string_length("action", body["Action"].as_str(), 1, 64)?;
475 validate_optional_string_length("principal", body["Principal"].as_str(), 1, 12)?;
476 validate_optional_string_length("statementId", body["StatementId"].as_str(), 1, 64)?;
477 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
478
479 let mut state = self.state.write();
480
481 let bus = state.buses.get_mut(event_bus_name).ok_or_else(|| {
482 AwsServiceError::aws_error(
483 StatusCode::BAD_REQUEST,
484 "ResourceNotFoundException",
485 format!("Event bus {event_bus_name} does not exist."),
486 )
487 })?;
488
489 if let Some(policy_str) = body["Policy"].as_str() {
491 if let Ok(policy) = serde_json::from_str::<Value>(policy_str) {
492 bus.policy = Some(policy);
493 return Ok(AwsResponse::ok_json(json!({})));
494 }
495 }
496
497 let action = body["Action"].as_str().unwrap_or("");
499 let principal = body["Principal"].as_str().unwrap_or("");
500 let statement_id = body["StatementId"].as_str().unwrap_or("");
501
502 if action != "events:PutEvents" {
504 return Err(AwsServiceError::aws_error(
505 StatusCode::BAD_REQUEST,
506 "ValidationException",
507 "Provided value in parameter 'action' is not supported.",
508 ));
509 }
510
511 let statement = json!({
512 "Sid": statement_id,
513 "Effect": "Allow",
514 "Principal": { "AWS": Arn::global("iam", principal, "root").to_string() },
515 "Action": action,
516 "Resource": bus.arn,
517 });
518
519 let policy = bus.policy.get_or_insert_with(|| {
520 json!({
521 "Version": "2012-10-17",
522 "Statement": [],
523 })
524 });
525
526 if let Some(stmts) = policy["Statement"].as_array_mut() {
527 stmts.push(statement);
528 }
529
530 Ok(AwsResponse::ok_json(json!({})))
531 }
532
533 fn remove_permission(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
534 let body = req.json_body();
535 validate_optional_string_length("statementId", body["StatementId"].as_str(), 1, 64)?;
536 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 256)?;
537 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
538 let statement_id = body["StatementId"].as_str().unwrap_or("");
539 let remove_all = body["RemoveAllPermissions"].as_bool().unwrap_or(false);
540
541 let mut state = self.state.write();
542
543 let bus = state.buses.get_mut(event_bus_name).ok_or_else(|| {
544 AwsServiceError::aws_error(
545 StatusCode::BAD_REQUEST,
546 "ResourceNotFoundException",
547 format!("Event bus {event_bus_name} does not exist."),
548 )
549 })?;
550
551 if remove_all {
552 bus.policy = None;
553 return Ok(AwsResponse::ok_json(json!({})));
554 }
555
556 let policy = bus.policy.as_mut().ok_or_else(|| {
557 AwsServiceError::aws_error(
558 StatusCode::BAD_REQUEST,
559 "ResourceNotFoundException",
560 "EventBus does not have a policy.",
561 )
562 })?;
563
564 if let Some(stmts) = policy["Statement"].as_array_mut() {
565 let before = stmts.len();
566 stmts.retain(|s| s["Sid"].as_str() != Some(statement_id));
567 if stmts.len() == before {
568 return Err(AwsServiceError::aws_error(
569 StatusCode::BAD_REQUEST,
570 "ResourceNotFoundException",
571 "Statement with the provided id does not exist.",
572 ));
573 }
574 if stmts.is_empty() {
575 bus.policy = None;
576 }
577 }
578
579 Ok(AwsResponse::ok_json(json!({})))
580 }
581
582 fn put_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
585 let body = req.json_body();
586 validate_required("Name", &body["Name"])?;
587 let name = body["Name"]
588 .as_str()
589 .ok_or_else(|| missing("Name"))?
590 .to_string();
591 validate_string_length("name", &name, 1, 64)?;
592 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
593 validate_optional_string_length(
594 "scheduleExpression",
595 body["ScheduleExpression"].as_str(),
596 0,
597 256,
598 )?;
599 validate_optional_string_length("eventPattern", body["EventPattern"].as_str(), 0, 4096)?;
600 validate_optional_enum(
601 "state",
602 body["State"].as_str(),
603 &[
604 "ENABLED",
605 "DISABLED",
606 "ENABLED_WITH_ALL_CLOUDTRAIL_MANAGEMENT_EVENTS",
607 ],
608 )?;
609 validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
610 validate_optional_string_length("roleArn", body["RoleArn"].as_str(), 1, 1600)?;
611
612 let raw_bus = body["EventBusName"]
613 .as_str()
614 .unwrap_or("default")
615 .to_string();
616
617 let mut state = self.state.write();
618 let event_bus_name = state.resolve_bus_name(&raw_bus);
619
620 let event_pattern = body["EventPattern"].as_str().and_then(|s| {
621 if s.is_empty() {
622 None
623 } else {
624 Some(s.to_string())
625 }
626 });
627 let schedule_expression = body["ScheduleExpression"].as_str().and_then(|s| {
628 if s.is_empty() {
629 None
630 } else {
631 Some(s.to_string())
632 }
633 });
634 let description = body["Description"].as_str().map(|s| s.to_string());
635 let role_arn = body["RoleArn"].as_str().map(|s| s.to_string());
636 let rule_state = body["State"].as_str().unwrap_or("ENABLED").to_string();
637
638 if schedule_expression.is_some() && event_bus_name != "default" {
640 return Err(AwsServiceError::aws_error(
641 StatusCode::BAD_REQUEST,
642 "ValidationException",
643 "ScheduleExpression is supported only on the default event bus.",
644 ));
645 }
646
647 if !state.buses.contains_key(&event_bus_name) {
648 return Err(AwsServiceError::aws_error(
649 StatusCode::BAD_REQUEST,
650 "ResourceNotFoundException",
651 format!("Event bus {event_bus_name} does not exist."),
652 ));
653 }
654
655 let arn = if event_bus_name == "default" {
656 format!(
657 "arn:aws:events:{}:{}:rule/{}",
658 req.region, state.account_id, name
659 )
660 } else {
661 format!(
662 "arn:aws:events:{}:{}:rule/{}/{}",
663 req.region, state.account_id, event_bus_name, name
664 )
665 };
666
667 let key = (event_bus_name.clone(), name.clone());
668 let targets = state
669 .rules
670 .get(&key)
671 .map(|r| r.targets.clone())
672 .unwrap_or_default();
673
674 let tags = parse_tags(&body);
675
676 let rule = EventRule {
677 name: name.clone(),
678 arn: arn.clone(),
679 event_bus_name,
680 event_pattern,
681 schedule_expression,
682 state: rule_state,
683 description,
684 role_arn,
685 managed_by: None,
686 created_by: None,
687 targets,
688 tags,
689 last_fired: None,
690 };
691
692 state.rules.insert(key, rule);
693 Ok(AwsResponse::ok_json(json!({ "RuleArn": arn })))
694 }
695
696 fn delete_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
697 let body = req.json_body();
698 validate_required("Name", &body["Name"])?;
699 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
700 validate_string_length("name", name, 1, 64)?;
701 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
702 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
703
704 let mut state = self.state.write();
705 let bus_name = state.resolve_bus_name(event_bus_name);
706 let key = (bus_name, name.to_string());
707
708 if let Some(rule) = state.rules.get(&key) {
710 if !rule.targets.is_empty() {
711 return Err(AwsServiceError::aws_error(
712 StatusCode::BAD_REQUEST,
713 "ValidationException",
714 "Rule can't be deleted since it has targets.",
715 ));
716 }
717 }
718
719 state.rules.remove(&key);
720 Ok(AwsResponse::ok_json(json!({})))
721 }
722
723 fn list_rules(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
724 let body = req.json_body();
725 validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 64)?;
726 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
727 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
728 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
729 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
730 let name_prefix = body["NamePrefix"].as_str();
731 let limit = body["Limit"].as_u64().map(|n| n as usize);
732 let next_token = body["NextToken"].as_str();
733
734 let state = self.state.read();
735 let bus_name = state.resolve_bus_name(event_bus_name);
736
737 let mut rules: Vec<&EventRule> = state
738 .rules
739 .values()
740 .filter(|r| r.event_bus_name == bus_name)
741 .filter(|r| match name_prefix {
742 Some(prefix) => r.name.starts_with(prefix),
743 None => true,
744 })
745 .collect();
746 rules.sort_by(|a, b| a.name.cmp(&b.name));
747
748 let start = next_token
750 .and_then(|t| t.parse::<usize>().ok())
751 .unwrap_or(0)
752 .min(rules.len());
753 let rules_slice = &rules[start..];
754
755 let (page, new_next_token) = if let Some(lim) = limit {
756 if rules_slice.len() > lim {
757 (&rules_slice[..lim], Some((start + lim).to_string()))
758 } else {
759 (rules_slice, None)
760 }
761 } else {
762 (rules_slice, None)
763 };
764
765 let rules_json: Vec<Value> = page
766 .iter()
767 .map(|r| {
768 let mut obj = json!({
769 "Name": r.name,
770 "Arn": r.arn,
771 "EventBusName": r.event_bus_name,
772 "State": r.state,
773 });
774 if let Some(ref desc) = r.description {
775 obj["Description"] = json!(desc);
776 }
777 if let Some(ref ep) = r.event_pattern {
778 obj["EventPattern"] = json!(ep);
779 }
780 if let Some(ref se) = r.schedule_expression {
781 obj["ScheduleExpression"] = json!(se);
782 }
783 if let Some(ref mb) = r.managed_by {
784 obj["ManagedBy"] = json!(mb);
785 }
786 obj
787 })
788 .collect();
789
790 let mut resp = json!({ "Rules": rules_json });
791 if let Some(token) = new_next_token {
792 resp["NextToken"] = json!(token);
793 }
794
795 Ok(AwsResponse::ok_json(resp))
796 }
797
798 fn describe_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
799 let body = req.json_body();
800 validate_required("Name", &body["Name"])?;
801 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
802 validate_string_length("name", name, 1, 64)?;
803 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
804 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
805
806 let state = self.state.read();
807 let bus_name = state.resolve_bus_name(event_bus_name);
808 let key = (bus_name.clone(), name.to_string());
809
810 let rule = state.rules.get(&key).ok_or_else(|| {
811 AwsServiceError::aws_error(
812 StatusCode::BAD_REQUEST,
813 "ResourceNotFoundException",
814 format!("Rule {name} does not exist."),
815 )
816 })?;
817
818 let mut resp = json!({
819 "Name": rule.name,
820 "Arn": rule.arn,
821 "EventBusName": rule.event_bus_name,
822 "State": rule.state,
823 });
824
825 if let Some(ref desc) = rule.description {
826 resp["Description"] = json!(desc);
827 }
828 if let Some(ref ep) = rule.event_pattern {
829 resp["EventPattern"] = json!(ep);
830 }
831 if let Some(ref se) = rule.schedule_expression {
832 resp["ScheduleExpression"] = json!(se);
833 }
834 if let Some(ref role) = rule.role_arn {
835 resp["RoleArn"] = json!(role);
836 }
837 if let Some(ref mb) = rule.managed_by {
838 resp["ManagedBy"] = json!(mb);
839 }
840 if let Some(ref cb) = rule.created_by {
841 resp["CreatedBy"] = json!(cb);
842 }
843 if rule.event_bus_name != "default" && rule.created_by.is_none() {
845 resp["CreatedBy"] = json!(state.account_id);
846 }
847
848 Ok(AwsResponse::ok_json(resp))
849 }
850
851 fn enable_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
852 let body = req.json_body();
853 validate_required("Name", &body["Name"])?;
854 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
855 validate_string_length("name", name, 1, 64)?;
856 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
857 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
858
859 let mut state = self.state.write();
860 let bus_name = state.resolve_bus_name(event_bus_name);
861 let key = (bus_name, name.to_string());
862
863 let rule = state.rules.get_mut(&key).ok_or_else(|| {
864 AwsServiceError::aws_error(
865 StatusCode::BAD_REQUEST,
866 "ResourceNotFoundException",
867 format!("Rule {name} does not exist."),
868 )
869 })?;
870
871 rule.state = "ENABLED".to_string();
872 Ok(AwsResponse::ok_json(json!({})))
873 }
874
875 fn disable_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
876 let body = req.json_body();
877 validate_required("Name", &body["Name"])?;
878 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
879 validate_string_length("name", name, 1, 64)?;
880 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
881 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
882
883 let mut state = self.state.write();
884 let bus_name = state.resolve_bus_name(event_bus_name);
885 let key = (bus_name, name.to_string());
886
887 let rule = state.rules.get_mut(&key).ok_or_else(|| {
888 AwsServiceError::aws_error(
889 StatusCode::BAD_REQUEST,
890 "ResourceNotFoundException",
891 format!("Rule {name} does not exist."),
892 )
893 })?;
894
895 rule.state = "DISABLED".to_string();
896 Ok(AwsResponse::ok_json(json!({})))
897 }
898
899 fn put_targets(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
902 let body = req.json_body();
903 validate_required("Rule", &body["Rule"])?;
904 let rule_name = body["Rule"].as_str().ok_or_else(|| missing("Rule"))?;
905 validate_string_length("rule", rule_name, 1, 64)?;
906 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
907 validate_required("Targets", &body["Targets"])?;
908 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
909 let targets = body["Targets"]
910 .as_array()
911 .ok_or_else(|| missing("Targets"))?;
912
913 for target in targets {
915 let target_id = target["Id"].as_str().unwrap_or("");
916 let target_arn = target["Arn"].as_str().unwrap_or("");
917
918 if target_arn.ends_with(".fifo") && target.get("SqsParameters").is_none() {
919 return Err(AwsServiceError::aws_error(
920 StatusCode::BAD_REQUEST,
921 "ValidationException",
922 format!(
923 "Parameter(s) SqsParameters must be specified for target: {target_id}."
924 ),
925 ));
926 }
927
928 if !target_arn.starts_with("arn:") {
930 return Err(AwsServiceError::aws_error(
931 StatusCode::BAD_REQUEST,
932 "ValidationException",
933 format!(
934 "Parameter {target_arn} is not valid. Reason: Provided Arn is not in correct format."
935 ),
936 ));
937 }
938 }
939
940 let mut state = self.state.write();
941 let bus_name = state.resolve_bus_name(event_bus_name);
942 let key = (bus_name.clone(), rule_name.to_string());
943
944 let rule = state.rules.get_mut(&key).ok_or_else(|| {
945 AwsServiceError::aws_error(
946 StatusCode::BAD_REQUEST,
947 "ResourceNotFoundException",
948 format!("Rule {rule_name} does not exist on EventBus {bus_name}."),
949 )
950 })?;
951
952 for target in targets {
953 let et = parse_target(target);
954 rule.targets.retain(|t| t.id != et.id);
956 rule.targets.push(et);
957 }
958
959 Ok(AwsResponse::ok_json(json!({
960 "FailedEntryCount": 0,
961 "FailedEntries": [],
962 })))
963 }
964
965 fn remove_targets(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
966 let body = req.json_body();
967 validate_required("Rule", &body["Rule"])?;
968 let rule_name = body["Rule"].as_str().ok_or_else(|| missing("Rule"))?;
969 validate_string_length("rule", rule_name, 1, 64)?;
970 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
971 validate_required("Ids", &body["Ids"])?;
972 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
973 let ids = body["Ids"].as_array().ok_or_else(|| missing("Ids"))?;
974
975 let target_ids: Vec<String> = ids
976 .iter()
977 .filter_map(|v| v.as_str().map(|s| s.to_string()))
978 .collect();
979
980 let mut state = self.state.write();
981 let bus_name = state.resolve_bus_name(event_bus_name);
982 let key = (bus_name.clone(), rule_name.to_string());
983
984 let rule = state.rules.get_mut(&key).ok_or_else(|| {
985 AwsServiceError::aws_error(
986 StatusCode::BAD_REQUEST,
987 "ResourceNotFoundException",
988 format!("Rule {rule_name} does not exist on EventBus {bus_name}."),
989 )
990 })?;
991
992 rule.targets.retain(|t| !target_ids.contains(&t.id));
993
994 Ok(AwsResponse::ok_json(json!({
995 "FailedEntryCount": 0,
996 "FailedEntries": [],
997 })))
998 }
999
1000 fn list_targets_by_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1001 let body = req.json_body();
1002 validate_required("Rule", &body["Rule"])?;
1003 let rule_name = body["Rule"].as_str().ok_or_else(|| missing("Rule"))?;
1004 validate_string_length("rule", rule_name, 1, 64)?;
1005 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
1006 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1007 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1008 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
1009 let limit = body["Limit"].as_u64().map(|n| n as usize);
1010 let next_token = body["NextToken"].as_str();
1011
1012 let state = self.state.read();
1013 let bus_name = state.resolve_bus_name(event_bus_name);
1014 let key = (bus_name, rule_name.to_string());
1015
1016 let rule = state.rules.get(&key).ok_or_else(|| {
1017 AwsServiceError::aws_error(
1018 StatusCode::BAD_REQUEST,
1019 "ResourceNotFoundException",
1020 format!("Rule {rule_name} does not exist."),
1021 )
1022 })?;
1023
1024 let all_targets = &rule.targets;
1025 let start = next_token
1026 .and_then(|t| t.parse::<usize>().ok())
1027 .unwrap_or(0)
1028 .min(all_targets.len());
1029 let slice = &all_targets[start..];
1030
1031 let (page, new_next_token) = if let Some(lim) = limit {
1032 if slice.len() > lim {
1033 (&slice[..lim], Some((start + lim).to_string()))
1034 } else {
1035 (slice, None)
1036 }
1037 } else {
1038 (slice, None)
1039 };
1040
1041 let targets: Vec<Value> = page.iter().map(target_to_json).collect();
1042
1043 let mut resp = json!({ "Targets": targets });
1044 if let Some(token) = new_next_token {
1045 resp["NextToken"] = json!(token);
1046 }
1047
1048 Ok(AwsResponse::ok_json(resp))
1049 }
1050
1051 fn list_rule_names_by_target(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1052 let body = req.json_body();
1053 validate_required("TargetArn", &body["TargetArn"])?;
1054 let target_arn = body["TargetArn"]
1055 .as_str()
1056 .ok_or_else(|| missing("TargetArn"))?;
1057 validate_string_length("targetArn", target_arn, 1, 1600)?;
1058 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
1059 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1060 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1061 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
1062 let limit = body["Limit"].as_u64().map(|n| n as usize);
1063 let next_token = body["NextToken"].as_str();
1064
1065 let state = self.state.read();
1066 let bus_name = state.resolve_bus_name(event_bus_name);
1067
1068 let mut rule_names: Vec<String> = Vec::new();
1070 for rule in state.rules.values() {
1071 if rule.event_bus_name == bus_name
1072 && rule.targets.iter().any(|t| t.arn == target_arn)
1073 && !rule_names.contains(&rule.name)
1074 {
1075 rule_names.push(rule.name.clone());
1076 }
1077 }
1078 rule_names.sort();
1079
1080 let start = next_token
1081 .and_then(|t| t.parse::<usize>().ok())
1082 .unwrap_or(0)
1083 .min(rule_names.len());
1084 let slice = &rule_names[start..];
1085
1086 let (page, new_next_token) = if let Some(lim) = limit {
1087 if slice.len() > lim {
1088 (&slice[..lim], Some((start + lim).to_string()))
1089 } else {
1090 (slice, None)
1091 }
1092 } else {
1093 (slice, None)
1094 };
1095
1096 let mut resp = json!({ "RuleNames": page });
1097 if let Some(token) = new_next_token {
1098 resp["NextToken"] = json!(token);
1099 }
1100
1101 Ok(AwsResponse::ok_json(resp))
1102 }
1103
1104 fn create_partner_event_source(
1107 &self,
1108 req: &AwsRequest,
1109 ) -> Result<AwsResponse, AwsServiceError> {
1110 let body = req.json_body();
1111 validate_required("Name", &body["Name"])?;
1112 let name = body["Name"]
1113 .as_str()
1114 .ok_or_else(|| missing("Name"))?
1115 .to_string();
1116 validate_string_length("name", &name, 1, 256)?;
1117 validate_required("Account", &body["Account"])?;
1118 let account = body["Account"]
1119 .as_str()
1120 .ok_or_else(|| missing("Account"))?
1121 .to_string();
1122 validate_string_length("account", &account, 12, 12)?;
1123
1124 let mut state = self.state.write();
1125 if state.partner_event_sources.contains_key(&name) {
1126 return Err(AwsServiceError::aws_error(
1127 StatusCode::CONFLICT,
1128 "ResourceAlreadyExistsException",
1129 format!("Partner event source {name} already exists."),
1130 ));
1131 }
1132 let arn = format!(
1133 "arn:aws:events:{}::event-source/aws.partner/{}",
1134 state.region, name
1135 );
1136 let now = Utc::now();
1137 let ps = PartnerEventSource {
1138 name: name.clone(),
1139 arn: arn.clone(),
1140 account,
1141 creation_time: now,
1142 expiration_time: None,
1143 state: "ACTIVE".to_string(),
1144 };
1145 state.partner_event_sources.insert(name.clone(), ps);
1146
1147 Ok(AwsResponse::ok_json(json!({ "EventSourceArn": arn })))
1148 }
1149
1150 fn delete_partner_event_source(
1151 &self,
1152 req: &AwsRequest,
1153 ) -> Result<AwsResponse, AwsServiceError> {
1154 let body = req.json_body();
1155 validate_required("Name", &body["Name"])?;
1156 let name = body["Name"]
1157 .as_str()
1158 .ok_or_else(|| missing("Name"))?
1159 .to_string();
1160 validate_required("Account", &body["Account"])?;
1161 let account = body["Account"]
1162 .as_str()
1163 .ok_or_else(|| missing("Account"))?
1164 .to_string();
1165
1166 let mut state = self.state.write();
1167 match state.partner_event_sources.get(&name) {
1168 Some(ps) if ps.account == account => {
1169 state.partner_event_sources.remove(&name);
1170 }
1171 Some(_) => {
1172 return Err(AwsServiceError::aws_error(
1173 StatusCode::NOT_FOUND,
1174 "ResourceNotFoundException",
1175 format!("Partner event source {name} does not exist for account {account}."),
1176 ));
1177 }
1178 None => {
1179 return Err(AwsServiceError::aws_error(
1180 StatusCode::NOT_FOUND,
1181 "ResourceNotFoundException",
1182 format!("Partner event source {name} does not exist."),
1183 ));
1184 }
1185 }
1186
1187 Ok(AwsResponse::ok_json(json!({})))
1188 }
1189
1190 fn describe_partner_event_source(
1191 &self,
1192 req: &AwsRequest,
1193 ) -> Result<AwsResponse, AwsServiceError> {
1194 let body = req.json_body();
1195 validate_required("Name", &body["Name"])?;
1196 let name = body["Name"]
1197 .as_str()
1198 .ok_or_else(|| missing("Name"))?
1199 .to_string();
1200 validate_string_length("name", &name, 1, 256)?;
1201
1202 let state = self.state.read();
1203 let ps = state.partner_event_sources.get(&name).ok_or_else(|| {
1204 AwsServiceError::aws_error(
1205 StatusCode::NOT_FOUND,
1206 "ResourceNotFoundException",
1207 format!("Partner event source {name} does not exist."),
1208 )
1209 })?;
1210
1211 Ok(AwsResponse::ok_json(json!({
1212 "Arn": ps.arn,
1213 "Name": ps.name,
1214 })))
1215 }
1216
1217 fn list_partner_event_sources(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1218 let body = req.json_body();
1219 validate_required("namePrefix", &body["NamePrefix"])?;
1220 let name_prefix = body["NamePrefix"]
1221 .as_str()
1222 .ok_or_else(|| missing("NamePrefix"))?;
1223 validate_string_length("namePrefix", name_prefix, 1, 256)?;
1224 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1225 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1226 let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
1227
1228 let state = self.state.read();
1229 let all: Vec<Value> = state
1230 .partner_event_sources
1231 .values()
1232 .filter(|ps| ps.name.starts_with(name_prefix))
1233 .map(|ps| {
1234 json!({
1235 "Arn": ps.arn,
1236 "Name": ps.name,
1237 })
1238 })
1239 .collect();
1240
1241 let (sources, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
1242 let mut resp = json!({ "PartnerEventSources": sources });
1243 if let Some(token) = next_token {
1244 resp["NextToken"] = json!(token);
1245 }
1246
1247 Ok(AwsResponse::ok_json(resp))
1248 }
1249
1250 fn list_partner_event_source_accounts(
1251 &self,
1252 req: &AwsRequest,
1253 ) -> Result<AwsResponse, AwsServiceError> {
1254 let body = req.json_body();
1255 validate_required("EventSourceName", &body["EventSourceName"])?;
1256 let event_source_name = body["EventSourceName"]
1257 .as_str()
1258 .ok_or_else(|| missing("EventSourceName"))?;
1259 validate_string_length("eventSourceName", event_source_name, 1, 256)?;
1260 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1261 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1262
1263 let state = self.state.read();
1264 let accounts: Vec<Value> = state
1265 .partner_event_sources
1266 .values()
1267 .filter(|ps| ps.name == event_source_name)
1268 .map(|ps| json!({ "Account": ps.account }))
1269 .collect();
1270
1271 Ok(AwsResponse::ok_json(json!({
1272 "PartnerEventSourceAccounts": accounts
1273 })))
1274 }
1275
1276 fn activate_event_source(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1277 let body = req.json_body();
1278 validate_required("Name", &body["Name"])?;
1279 let name = body["Name"]
1280 .as_str()
1281 .ok_or_else(|| missing("Name"))?
1282 .to_string();
1283
1284 let mut state = self.state.write();
1285 let ps = state.partner_event_sources.get_mut(&name).ok_or_else(|| {
1286 AwsServiceError::aws_error(
1287 StatusCode::NOT_FOUND,
1288 "ResourceNotFoundException",
1289 format!("Event source {name} does not exist."),
1290 )
1291 })?;
1292 ps.state = "ACTIVE".to_string();
1293
1294 Ok(AwsResponse::ok_json(json!({})))
1295 }
1296
1297 fn deactivate_event_source(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1298 let body = req.json_body();
1299 validate_required("Name", &body["Name"])?;
1300 let name = body["Name"]
1301 .as_str()
1302 .ok_or_else(|| missing("Name"))?
1303 .to_string();
1304
1305 let mut state = self.state.write();
1306 let ps = state.partner_event_sources.get_mut(&name).ok_or_else(|| {
1307 AwsServiceError::aws_error(
1308 StatusCode::NOT_FOUND,
1309 "ResourceNotFoundException",
1310 format!("Event source {name} does not exist."),
1311 )
1312 })?;
1313 ps.state = "INACTIVE".to_string();
1314
1315 Ok(AwsResponse::ok_json(json!({})))
1316 }
1317
1318 fn describe_event_source(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1319 let body = req.json_body();
1320 validate_required("Name", &body["Name"])?;
1321 let name = body["Name"]
1322 .as_str()
1323 .ok_or_else(|| missing("Name"))?
1324 .to_string();
1325
1326 let state = self.state.read();
1327 let ps = state.partner_event_sources.get(&name).ok_or_else(|| {
1328 AwsServiceError::aws_error(
1329 StatusCode::NOT_FOUND,
1330 "ResourceNotFoundException",
1331 format!("Event source {name} does not exist."),
1332 )
1333 })?;
1334
1335 Ok(AwsResponse::ok_json(json!({
1336 "Arn": ps.arn,
1337 "Name": ps.name,
1338 "CreatedBy": ps.account,
1339 "CreationTime": ps.creation_time.timestamp() as f64,
1340 "State": ps.state,
1341 })))
1342 }
1343
1344 fn list_event_sources(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1345 let body = req.json_body();
1346 validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 256)?;
1347 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1348 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1349 let name_prefix = body["NamePrefix"].as_str();
1350 let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
1351
1352 let state = self.state.read();
1353 let all: Vec<Value> = state
1354 .partner_event_sources
1355 .values()
1356 .filter(|ps| match name_prefix {
1357 Some(prefix) => ps.name.starts_with(prefix),
1358 None => true,
1359 })
1360 .map(|ps| {
1361 json!({
1362 "Arn": ps.arn,
1363 "Name": ps.name,
1364 "CreatedBy": ps.account,
1365 "CreationTime": ps.creation_time.timestamp() as f64,
1366 "State": ps.state,
1367 })
1368 })
1369 .collect();
1370
1371 let (sources, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
1372 let mut resp = json!({ "EventSources": sources });
1373 if let Some(token) = next_token {
1374 resp["NextToken"] = json!(token);
1375 }
1376
1377 Ok(AwsResponse::ok_json(resp))
1378 }
1379
1380 fn put_partner_events(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1381 let body = req.json_body();
1382 validate_required("Entries", &body["Entries"])?;
1383 let entries = body["Entries"]
1384 .as_array()
1385 .ok_or_else(|| missing("Entries"))?;
1386
1387 let mut result_entries = Vec::new();
1388 for _entry in entries {
1389 let event_id = uuid::Uuid::new_v4().to_string();
1390 result_entries.push(json!({ "EventId": event_id }));
1391 }
1392
1393 Ok(AwsResponse::ok_json(json!({
1394 "FailedEntryCount": 0,
1395 "Entries": result_entries,
1396 })))
1397 }
1398
1399 fn test_event_pattern(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1402 let body = req.json_body();
1403 validate_required("EventPattern", &body["EventPattern"])?;
1404 validate_required("Event", &body["Event"])?;
1405 let event_pattern = body["EventPattern"]
1406 .as_str()
1407 .ok_or_else(|| missing("EventPattern"))?;
1408 let event_str = body["Event"].as_str().ok_or_else(|| missing("Event"))?;
1409
1410 let event: Value = serde_json::from_str(event_str).map_err(|_| {
1412 AwsServiceError::aws_error(
1413 StatusCode::BAD_REQUEST,
1414 "InvalidEventPatternException",
1415 "Event is not valid JSON.",
1416 )
1417 })?;
1418
1419 let _pattern: Value = serde_json::from_str(event_pattern).map_err(|_| {
1421 AwsServiceError::aws_error(
1422 StatusCode::BAD_REQUEST,
1423 "InvalidEventPatternException",
1424 "Event pattern is not valid JSON.",
1425 )
1426 })?;
1427
1428 let source = event["source"].as_str().unwrap_or("");
1429 let detail_type = event["detail-type"].as_str().unwrap_or("");
1430 let detail = event
1431 .get("detail")
1432 .map(|v| serde_json::to_string(v).unwrap_or_default())
1433 .unwrap_or_else(|| "{}".to_string());
1434 let account = event["account"].as_str().unwrap_or("");
1435 let region = event["region"].as_str().unwrap_or("");
1436 let resources: Vec<String> = event["resources"]
1437 .as_array()
1438 .map(|arr| {
1439 arr.iter()
1440 .filter_map(|v| v.as_str().map(|s| s.to_string()))
1441 .collect()
1442 })
1443 .unwrap_or_default();
1444
1445 let result = matches_pattern(
1446 Some(event_pattern),
1447 source,
1448 detail_type,
1449 &detail,
1450 account,
1451 region,
1452 &resources,
1453 );
1454
1455 Ok(AwsResponse::ok_json(json!({ "Result": result })))
1456 }
1457
1458 fn update_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1461 let body = req.json_body();
1462 validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
1463 validate_optional_string_length(
1464 "kmsKeyIdentifier",
1465 body["KmsKeyIdentifier"].as_str(),
1466 0,
1467 2048,
1468 )?;
1469 let name = body["Name"].as_str().unwrap_or("default");
1470
1471 let mut state = self.state.write();
1472 let bus = state.buses.get_mut(name).ok_or_else(|| {
1473 AwsServiceError::aws_error(
1474 StatusCode::BAD_REQUEST,
1475 "ResourceNotFoundException",
1476 format!("Event bus {name} does not exist."),
1477 )
1478 })?;
1479
1480 if let Some(desc) = body["Description"].as_str() {
1481 bus.description = Some(desc.to_string());
1482 }
1483 if let Some(kms) = body["KmsKeyIdentifier"].as_str() {
1484 bus.kms_key_identifier = Some(kms.to_string());
1485 }
1486 if let Some(dlc) = body.get("DeadLetterConfig") {
1487 bus.dead_letter_config = Some(dlc.clone());
1488 }
1489 bus.last_modified_time = Utc::now();
1490
1491 let arn = bus.arn.clone();
1492 let bus_name = bus.name.clone();
1493
1494 Ok(AwsResponse::ok_json(json!({
1495 "Arn": arn,
1496 "Name": bus_name,
1497 })))
1498 }
1499
1500 fn create_endpoint(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1503 let body = req.json_body();
1504 validate_required("Name", &body["Name"])?;
1505 let name = body["Name"]
1506 .as_str()
1507 .ok_or_else(|| missing("Name"))?
1508 .to_string();
1509 validate_string_length("name", &name, 1, 64)?;
1510 validate_required("RoutingConfig", &body["RoutingConfig"])?;
1511 validate_required("EventBuses", &body["EventBuses"])?;
1512
1513 let description = body["Description"].as_str().map(|s| s.to_string());
1514 let routing_config = body["RoutingConfig"].clone();
1515 let replication_config = body.get("ReplicationConfig").cloned();
1516 let event_buses = body["EventBuses"].as_array().cloned().unwrap_or_default();
1517 let role_arn = body["RoleArn"].as_str().map(|s| s.to_string());
1518
1519 let mut state = self.state.write();
1520 if state.endpoints.contains_key(&name) {
1521 return Err(AwsServiceError::aws_error(
1522 StatusCode::CONFLICT,
1523 "ResourceAlreadyExistsException",
1524 format!("Endpoint {name} already exists."),
1525 ));
1526 }
1527
1528 let endpoint_id = format!("{}.abc123", name);
1529 let arn = format!(
1530 "arn:aws:events:{}:{}:endpoint/{}",
1531 req.region, state.account_id, name
1532 );
1533 let endpoint_url = format!(
1534 "https://{}.endpoint.events.{}.amazonaws.com",
1535 endpoint_id, req.region
1536 );
1537 let now = Utc::now();
1538
1539 let endpoint = Endpoint {
1540 name: name.clone(),
1541 arn: arn.clone(),
1542 endpoint_id: endpoint_id.clone(),
1543 endpoint_url: Some(endpoint_url),
1544 description,
1545 routing_config: routing_config.clone(),
1546 replication_config: replication_config.clone(),
1547 event_buses: event_buses.clone(),
1548 role_arn: role_arn.clone(),
1549 state: "ACTIVE".to_string(),
1550 creation_time: now,
1551 last_modified_time: now,
1552 };
1553 state.endpoints.insert(name.clone(), endpoint);
1554
1555 let mut resp = json!({
1556 "Name": name,
1557 "Arn": arn,
1558 "State": "ACTIVE",
1559 "RoutingConfig": routing_config,
1560 "EventBuses": event_buses,
1561 });
1562 if let Some(ref rc) = replication_config {
1563 resp["ReplicationConfig"] = rc.clone();
1564 }
1565 if let Some(ref ra) = role_arn {
1566 resp["RoleArn"] = json!(ra);
1567 }
1568
1569 Ok(AwsResponse::ok_json(resp))
1570 }
1571
1572 fn delete_endpoint(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1573 let body = req.json_body();
1574 validate_required("Name", &body["Name"])?;
1575 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
1576
1577 let mut state = self.state.write();
1578 state.endpoints.remove(name).ok_or_else(|| {
1579 AwsServiceError::aws_error(
1580 StatusCode::BAD_REQUEST,
1581 "ResourceNotFoundException",
1582 format!("Endpoint '{name}' does not exist."),
1583 )
1584 })?;
1585
1586 Ok(AwsResponse::ok_json(json!({})))
1587 }
1588
1589 fn describe_endpoint(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1590 let body = req.json_body();
1591 validate_required("Name", &body["Name"])?;
1592 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
1593
1594 let state = self.state.read();
1595 let ep = state.endpoints.get(name).ok_or_else(|| {
1596 AwsServiceError::aws_error(
1597 StatusCode::BAD_REQUEST,
1598 "ResourceNotFoundException",
1599 format!("Endpoint '{name}' does not exist."),
1600 )
1601 })?;
1602
1603 let mut resp = json!({
1604 "Name": ep.name,
1605 "Arn": ep.arn,
1606 "EndpointId": ep.endpoint_id,
1607 "State": ep.state,
1608 "RoutingConfig": ep.routing_config,
1609 "EventBuses": ep.event_buses,
1610 "CreationTime": ep.creation_time.timestamp() as f64,
1611 "LastModifiedTime": ep.last_modified_time.timestamp() as f64,
1612 });
1613 if let Some(ref url) = ep.endpoint_url {
1614 resp["EndpointUrl"] = json!(url);
1615 }
1616 if let Some(ref desc) = ep.description {
1617 resp["Description"] = json!(desc);
1618 }
1619 if let Some(ref rc) = ep.replication_config {
1620 resp["ReplicationConfig"] = rc.clone();
1621 }
1622 if let Some(ref ra) = ep.role_arn {
1623 resp["RoleArn"] = json!(ra);
1624 }
1625
1626 Ok(AwsResponse::ok_json(resp))
1627 }
1628
1629 fn list_endpoints(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1630 let body = req.json_body();
1631 validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 64)?;
1632 validate_optional_string_length("homeRegion", body["HomeRegion"].as_str(), 9, 20)?;
1633 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1634 validate_optional_range_i64("maxResults", body["MaxResults"].as_i64(), 1, 100)?;
1635 let name_prefix = body["NamePrefix"].as_str();
1636 let limit = body["MaxResults"].as_i64().unwrap_or(100) as usize;
1637
1638 let state = self.state.read();
1639 let all: Vec<Value> = state
1640 .endpoints
1641 .values()
1642 .filter(|ep| match name_prefix {
1643 Some(prefix) => ep.name.starts_with(prefix),
1644 None => true,
1645 })
1646 .map(|ep| {
1647 let mut obj = json!({
1648 "Name": ep.name,
1649 "Arn": ep.arn,
1650 "EndpointId": ep.endpoint_id,
1651 "State": ep.state,
1652 "RoutingConfig": ep.routing_config,
1653 "EventBuses": ep.event_buses,
1654 "CreationTime": ep.creation_time.timestamp() as f64,
1655 "LastModifiedTime": ep.last_modified_time.timestamp() as f64,
1656 });
1657 if let Some(ref url) = ep.endpoint_url {
1658 obj["EndpointUrl"] = json!(url);
1659 }
1660 obj
1661 })
1662 .collect();
1663
1664 let (endpoints, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
1665 let mut resp = json!({ "Endpoints": endpoints });
1666 if let Some(token) = next_token {
1667 resp["NextToken"] = json!(token);
1668 }
1669
1670 Ok(AwsResponse::ok_json(resp))
1671 }
1672
1673 fn update_endpoint(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1674 let body = req.json_body();
1675 validate_required("Name", &body["Name"])?;
1676 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
1677
1678 let mut state = self.state.write();
1679 let ep = state.endpoints.get_mut(name).ok_or_else(|| {
1680 AwsServiceError::aws_error(
1681 StatusCode::BAD_REQUEST,
1682 "ResourceNotFoundException",
1683 format!("Endpoint '{name}' does not exist."),
1684 )
1685 })?;
1686
1687 if let Some(desc) = body["Description"].as_str() {
1688 ep.description = Some(desc.to_string());
1689 }
1690 if !body["RoutingConfig"].is_null() {
1691 ep.routing_config = body["RoutingConfig"].clone();
1692 }
1693 if let Some(rc) = body.get("ReplicationConfig") {
1694 ep.replication_config = Some(rc.clone());
1695 }
1696 if let Some(buses) = body["EventBuses"].as_array() {
1697 ep.event_buses = buses.clone();
1698 }
1699 if let Some(ra) = body["RoleArn"].as_str() {
1700 ep.role_arn = Some(ra.to_string());
1701 }
1702 ep.last_modified_time = Utc::now();
1703
1704 let resp = json!({
1705 "Name": ep.name,
1706 "Arn": ep.arn,
1707 "EndpointId": ep.endpoint_id,
1708 "State": ep.state,
1709 "RoutingConfig": ep.routing_config,
1710 "EventBuses": ep.event_buses,
1711 });
1712
1713 Ok(AwsResponse::ok_json(resp))
1714 }
1715
1716 fn deauthorize_connection(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1719 let body = req.json_body();
1720 validate_required("Name", &body["Name"])?;
1721 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
1722 validate_string_length("name", name, 1, 64)?;
1723
1724 let mut state = self.state.write();
1725 let conn = state.connections.get_mut(name).ok_or_else(|| {
1726 AwsServiceError::aws_error(
1727 StatusCode::BAD_REQUEST,
1728 "ResourceNotFoundException",
1729 format!("Connection '{name}' does not exist."),
1730 )
1731 })?;
1732
1733 conn.connection_state = "DEAUTHORIZING".to_string();
1734 conn.last_modified_time = Utc::now();
1735
1736 let resp = json!({
1737 "ConnectionArn": conn.arn,
1738 "ConnectionState": conn.connection_state,
1739 "CreationTime": conn.creation_time.timestamp() as f64,
1740 "LastModifiedTime": conn.last_modified_time.timestamp() as f64,
1741 "LastAuthorizedTime": conn.last_authorized_time.timestamp() as f64,
1742 });
1743
1744 Ok(AwsResponse::ok_json(resp))
1745 }
1746
1747 fn put_events(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1750 let body = req.json_body();
1751 validate_required("Entries", &body["Entries"])?;
1752 validate_optional_string_length("endpointId", body["EndpointId"].as_str(), 1, 50)?;
1753 let entries = body["Entries"]
1754 .as_array()
1755 .ok_or_else(|| missing("Entries"))?;
1756
1757 if entries.is_empty() {
1759 return Err(AwsServiceError::aws_error(
1760 StatusCode::BAD_REQUEST,
1761 "ValidationException",
1762 "1 validation error detected: Value '[PutEventsRequestEntry]' at 'entries' failed to satisfy constraint: Member must have length greater than or equal to 1",
1763 ));
1764 }
1765 if entries.len() > 10 {
1766 return Err(AwsServiceError::aws_error(
1767 StatusCode::BAD_REQUEST,
1768 "ValidationException",
1769 "1 validation error detected: Value '[PutEventsRequestEntry]' at 'entries' failed to satisfy constraint: Member must have length less than or equal to 10",
1770 ));
1771 }
1772
1773 let mut state = self.state.write();
1774 let mut result_entries = Vec::new();
1775 let mut events_to_deliver = Vec::new();
1776 let mut failed_count = 0;
1777
1778 for entry in entries {
1779 let source = entry["Source"].as_str().unwrap_or("").to_string();
1780 let detail_type = entry["DetailType"].as_str().unwrap_or("").to_string();
1781 let detail = entry["Detail"].as_str().unwrap_or("").to_string();
1782
1783 if let Err(error) = validate_put_events_entry(&source, &detail_type, &detail) {
1784 failed_count += 1;
1785 result_entries.push(error);
1786 continue;
1787 }
1788
1789 let event_id = uuid::Uuid::new_v4().to_string();
1790 let raw_bus = entry["EventBusName"]
1791 .as_str()
1792 .unwrap_or("default")
1793 .to_string();
1794 let event_bus_name = state.resolve_bus_name(&raw_bus);
1795 let time = parse_put_events_time(&entry["Time"]);
1796 let resources: Vec<String> = entry["Resources"]
1797 .as_array()
1798 .map(|arr| {
1799 arr.iter()
1800 .filter_map(|v| v.as_str().map(|s| s.to_string()))
1801 .collect()
1802 })
1803 .unwrap_or_default();
1804
1805 let event = PutEvent {
1806 event_id: event_id.clone(),
1807 source: source.clone(),
1808 detail_type: detail_type.clone(),
1809 detail: detail.clone(),
1810 event_bus_name: event_bus_name.clone(),
1811 time,
1812 resources: resources.clone(),
1813 };
1814
1815 archive_matching_event(
1816 &mut state,
1817 &event,
1818 &event_bus_name,
1819 &source,
1820 &detail_type,
1821 &detail,
1822 &req.account_id,
1823 &req.region,
1824 &resources,
1825 );
1826
1827 state.events.push(event);
1828
1829 let matching_targets: Vec<EventTarget> = state
1831 .rules
1832 .values()
1833 .filter(|r| {
1834 r.event_bus_name == event_bus_name
1835 && r.state == "ENABLED"
1836 && matches_pattern(
1837 r.event_pattern.as_deref(),
1838 &source,
1839 &detail_type,
1840 &detail,
1841 &req.account_id,
1842 &req.region,
1843 &resources,
1844 )
1845 })
1846 .flat_map(|r| r.targets.clone())
1847 .collect();
1848
1849 if !matching_targets.is_empty() {
1850 events_to_deliver.push((
1851 event_id.clone(),
1852 source,
1853 detail_type,
1854 detail,
1855 time,
1856 resources,
1857 matching_targets,
1858 ));
1859 }
1860
1861 result_entries.push(json!({ "EventId": event_id }));
1862 }
1863
1864 drop(state);
1866
1867 for (event_id, source, detail_type, detail, time, resources, targets) in events_to_deliver {
1869 let detail_value: Value = serde_json::from_str(&detail).unwrap_or(json!({}));
1870 let event_json = json!({
1871 "version": "0",
1872 "id": event_id,
1873 "source": source,
1874 "account": req.account_id,
1875 "detail-type": detail_type,
1876 "detail": detail_value,
1877 "time": time.format("%Y-%m-%dT%H:%M:%SZ").to_string(),
1878 "region": req.region,
1879 "resources": resources,
1880 });
1881 let event_str = event_json.to_string();
1882
1883 for target in targets {
1884 let arn = &target.arn;
1885 let body_str = if let Some(ref transformer) = target.input_transformer {
1887 apply_input_transformer(transformer, &event_json)
1888 } else if let Some(ref input) = target.input {
1889 input.clone()
1890 } else if let Some(ref input_path) = target.input_path {
1891 resolve_json_path(&event_json, input_path)
1892 .map(|v| v.to_string())
1893 .unwrap_or_else(|| event_str.clone())
1894 } else {
1895 event_str.clone()
1896 };
1897
1898 if arn.contains(":sqs:") {
1899 let group_id = target
1901 .sqs_parameters
1902 .as_ref()
1903 .and_then(|p| p["MessageGroupId"].as_str())
1904 .map(|s| s.to_string());
1905 if group_id.is_some() {
1906 self.delivery.send_to_sqs_with_attrs(
1910 arn,
1911 &body_str,
1912 &HashMap::new(),
1913 group_id.as_deref(),
1914 None,
1915 );
1916 } else {
1917 self.delivery.send_to_sqs(arn, &body_str, &HashMap::new());
1918 }
1919 } else if arn.contains(":sns:") {
1920 self.delivery
1921 .publish_to_sns(arn, &body_str, Some(&detail_type));
1922 } else if arn.contains(":lambda:") {
1923 tracing::info!(
1924 function_arn = %arn,
1925 payload = %body_str,
1926 "EventBridge delivering to Lambda function"
1927 );
1928 let now = Utc::now();
1929 let mut state = self.state.write();
1930 state
1931 .lambda_invocations
1932 .push(crate::state::LambdaInvocation {
1933 function_arn: arn.clone(),
1934 payload: body_str.clone(),
1935 timestamp: now,
1936 });
1937 drop(state);
1938 if let Some(ref ls) = self.lambda_state {
1940 ls.write().invocations.push(LambdaInvocation {
1941 function_arn: arn.clone(),
1942 payload: body_str.clone(),
1943 timestamp: now,
1944 source: "aws:events".to_string(),
1945 });
1946 }
1947 invoke_lambda_async(
1949 &self.container_runtime,
1950 &self.lambda_state,
1951 arn,
1952 &body_str,
1953 );
1954 } else if arn.contains(":logs:") {
1955 tracing::info!(
1956 log_group_arn = %arn,
1957 payload = %body_str,
1958 "EventBridge delivering to CloudWatch Logs"
1959 );
1960 let now = Utc::now();
1961 let mut state = self.state.write();
1962 state.log_deliveries.push(crate::state::LogDelivery {
1963 log_group_arn: arn.clone(),
1964 payload: body_str.clone(),
1965 timestamp: now,
1966 });
1967 drop(state);
1968 if let Some(ref log_state) = self.logs_state {
1970 deliver_to_logs(log_state, arn, &body_str, now);
1971 }
1972 } else if arn.contains(":kinesis:") {
1973 tracing::info!(
1974 stream_arn = %arn,
1975 "EventBridge delivering to Kinesis stream"
1976 );
1977 self.delivery.send_to_kinesis(arn, &body_str, &event_id);
1979 } else if arn.contains(":states:") {
1980 tracing::info!(
1981 state_machine_arn = %arn,
1982 "EventBridge delivering to Step Functions"
1983 );
1984 self.delivery.start_stepfunctions_execution(arn, &body_str);
1985 let mut state = self.state.write();
1986 state
1987 .step_function_executions
1988 .push(crate::state::StepFunctionExecution {
1989 state_machine_arn: arn.clone(),
1990 payload: body_str.clone(),
1991 timestamp: Utc::now(),
1992 });
1993 } else if arn.contains(":api-destination/") {
1994 let state = self.state.read();
1996 let dest = state.api_destinations.values().find(|d| d.arn == *arn);
1997 if let Some(dest) = dest {
1998 let url = dest.invocation_endpoint.clone();
1999 let method = dest.http_method.clone();
2000 let conn = state
2001 .connections
2002 .values()
2003 .find(|c| c.arn == dest.connection_arn)
2004 .cloned();
2005 drop(state);
2006
2007 let payload = body_str.clone();
2008 tokio::spawn(async move {
2009 let client = reqwest::Client::new();
2010 let mut req_builder = match method.as_str() {
2011 "GET" => client.get(&url),
2012 "PUT" => client.put(&url),
2013 "DELETE" => client.delete(&url),
2014 "PATCH" => client.patch(&url),
2015 "HEAD" => client.head(&url),
2016 _ => client.post(&url),
2017 };
2018 req_builder = req_builder.header("Content-Type", "application/json");
2019
2020 if let Some(conn) = conn {
2022 req_builder = apply_connection_auth(req_builder, &conn);
2023 }
2024
2025 let result = req_builder.body(payload).send().await;
2026 if let Err(e) = result {
2027 tracing::warn!(
2028 endpoint = %url,
2029 error = %e,
2030 "EventBridge ApiDestination delivery failed"
2031 );
2032 }
2033 });
2034 }
2035 } else if arn.starts_with("https://") || arn.starts_with("http://") {
2036 let url = arn.clone();
2038 let payload = body_str.clone();
2039 tokio::spawn(async move {
2040 let client = reqwest::Client::new();
2041 let result = client
2042 .post(&url)
2043 .header("Content-Type", "application/json")
2044 .body(payload)
2045 .send()
2046 .await;
2047 if let Err(e) = result {
2048 tracing::warn!(
2049 endpoint = %url,
2050 error = %e,
2051 "EventBridge HTTP target delivery failed"
2052 );
2053 }
2054 });
2055 }
2056 }
2057 }
2058
2059 let resp = json!({
2060 "FailedEntryCount": failed_count,
2061 "Entries": result_entries,
2062 });
2063
2064 Ok(AwsResponse::ok_json(resp))
2065 }
2066
2067 fn tag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2070 let body = req.json_body();
2071 validate_required("ResourceARN", &body["ResourceARN"])?;
2072 let arn = body["ResourceARN"]
2073 .as_str()
2074 .ok_or_else(|| missing("ResourceARN"))?;
2075 validate_string_length("resourceARN", arn, 1, 1600)?;
2076 validate_required("Tags", &body["Tags"])?;
2077
2078 let mut state = self.state.write();
2079 let tag_map = find_tags_mut(&mut state, arn)?;
2080
2081 fakecloud_core::tags::apply_tags(tag_map, &body, "Tags", "Key", "Value").map_err(|f| {
2082 AwsServiceError::aws_error(
2083 StatusCode::BAD_REQUEST,
2084 "ValidationException",
2085 format!("{f} must be a list"),
2086 )
2087 })?;
2088
2089 Ok(AwsResponse::ok_json(json!({})))
2090 }
2091
2092 fn untag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2093 let body = req.json_body();
2094 validate_required("ResourceARN", &body["ResourceARN"])?;
2095 let arn = body["ResourceARN"]
2096 .as_str()
2097 .ok_or_else(|| missing("ResourceARN"))?;
2098 validate_string_length("resourceARN", arn, 1, 1600)?;
2099 validate_required("TagKeys", &body["TagKeys"])?;
2100
2101 let mut state = self.state.write();
2102 let tag_map = find_tags_mut(&mut state, arn)?;
2103
2104 fakecloud_core::tags::remove_tags(tag_map, &body, "TagKeys").map_err(|f| {
2105 AwsServiceError::aws_error(
2106 StatusCode::BAD_REQUEST,
2107 "ValidationException",
2108 format!("{f} must be a list"),
2109 )
2110 })?;
2111
2112 Ok(AwsResponse::ok_json(json!({})))
2113 }
2114
2115 fn list_tags_for_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2116 let body = req.json_body();
2117 validate_required("ResourceARN", &body["ResourceARN"])?;
2118 let arn = body["ResourceARN"]
2119 .as_str()
2120 .ok_or_else(|| missing("ResourceARN"))?;
2121 validate_string_length("resourceARN", arn, 1, 1600)?;
2122
2123 let state = self.state.read();
2124 let tag_map = find_tags(&state, arn)?;
2125
2126 let tags = fakecloud_core::tags::tags_to_json(tag_map, "Key", "Value");
2127
2128 Ok(AwsResponse::ok_json(json!({ "Tags": tags })))
2129 }
2130
2131 fn create_archive(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2134 let body = req.json_body();
2135 validate_required("ArchiveName", &body["ArchiveName"])?;
2136 let name = body["ArchiveName"]
2137 .as_str()
2138 .ok_or_else(|| missing("ArchiveName"))?
2139 .to_string();
2140 validate_string_length("archiveName", &name, 1, 48)?;
2141 validate_required("EventSourceArn", &body["EventSourceArn"])?;
2142 let event_source_arn = body["EventSourceArn"]
2143 .as_str()
2144 .ok_or_else(|| missing("EventSourceArn"))?
2145 .to_string();
2146 validate_string_length("eventSourceArn", &event_source_arn, 1, 1600)?;
2147 validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2148 validate_optional_string_length("eventPattern", body["EventPattern"].as_str(), 0, 4096)?;
2149 if let Some(rd) = body["RetentionDays"].as_i64() {
2150 validate_range_i64("retentionDays", rd, 0, i64::MAX)?;
2151 }
2152 let description = body["Description"].as_str().map(|s| s.to_string());
2153 let event_pattern = body["EventPattern"].as_str().map(|s| s.to_string());
2154 let retention_days = body["RetentionDays"].as_i64().unwrap_or(0);
2155
2156 if let Some(ref pattern) = event_pattern {
2158 validate_event_pattern(pattern)?;
2159 }
2160
2161 let mut state = self.state.write();
2162
2163 let bus_name = state.resolve_bus_name(&event_source_arn);
2165 if !state.buses.contains_key(&bus_name) {
2166 return Err(AwsServiceError::aws_error(
2167 StatusCode::BAD_REQUEST,
2168 "ResourceNotFoundException",
2169 format!("Event bus {bus_name} does not exist."),
2170 ));
2171 }
2172
2173 if state.archives.contains_key(&name) {
2175 return Err(AwsServiceError::aws_error(
2176 StatusCode::BAD_REQUEST,
2177 "ResourceAlreadyExistsException",
2178 format!("Archive {name} already exists."),
2179 ));
2180 }
2181
2182 let now = Utc::now();
2183 let arn = format!(
2184 "arn:aws:events:{}:{}:archive/{}",
2185 req.region, state.account_id, name
2186 );
2187
2188 let archive = Archive {
2189 name: name.clone(),
2190 arn: arn.clone(),
2191 event_source_arn: event_source_arn.clone(),
2192 description,
2193 event_pattern: event_pattern.clone(),
2194 retention_days,
2195 state: "ENABLED".to_string(),
2196 creation_time: now,
2197 event_count: 0,
2198 size_bytes: 0,
2199 events: Vec::new(),
2200 };
2201 state.archives.insert(name.clone(), archive);
2202
2203 let rule_name = format!("Events-Archive-{name}");
2205 let rule_arn = format!(
2206 "arn:aws:events:{}:{}:rule/{}",
2207 req.region, state.account_id, rule_name
2208 );
2209 let rule_event_pattern = {
2211 let mut merged = if let Some(ref ep) = event_pattern {
2212 serde_json::from_str::<Value>(ep).unwrap_or_else(|_| json!({}))
2213 } else {
2214 json!({})
2215 };
2216 if let Some(obj) = merged.as_object_mut() {
2217 obj.insert("replay-name".to_string(), json!([{"exists": false}]));
2218 }
2219 serde_json::to_string(&merged).unwrap_or_default()
2220 };
2221
2222 let archive_target = EventTarget {
2224 id: name.clone(),
2225 arn: format!("arn:aws:events:{}:::", req.region),
2226 input: None,
2227 input_path: None,
2228 input_transformer: Some(json!({
2229 "InputPathsMap": {},
2230 "InputTemplate": format!(
2231 "{{\"archive-arn\": \"{}\", \"event\": <aws.events.event.json>, \"ingestion-time\": <aws.events.event.ingestion-time>}}",
2232 arn
2233 )
2234 })),
2235 sqs_parameters: None,
2236 };
2237
2238 let archive_rule = EventRule {
2239 name: rule_name.clone(),
2240 arn: rule_arn,
2241 event_bus_name: bus_name.clone(),
2242 event_pattern: Some(rule_event_pattern),
2243 schedule_expression: None,
2244 state: "ENABLED".to_string(),
2245 description: None,
2246 role_arn: None,
2247 managed_by: Some("prod.vhs.events.aws.internal".to_string()),
2248 created_by: Some(state.account_id.clone()),
2249 targets: vec![archive_target],
2250 tags: HashMap::new(),
2251 last_fired: None,
2252 };
2253 let key = (bus_name, rule_name);
2254 state.rules.insert(key, archive_rule);
2255
2256 Ok(AwsResponse::ok_json(json!({
2257 "ArchiveArn": arn,
2258 "CreationTime": now.timestamp() as f64,
2259 "State": "ENABLED",
2260 })))
2261 }
2262
2263 fn describe_archive(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2264 let body = req.json_body();
2265 validate_required("ArchiveName", &body["ArchiveName"])?;
2266 let name = body["ArchiveName"]
2267 .as_str()
2268 .ok_or_else(|| missing("ArchiveName"))?;
2269 validate_string_length("archiveName", name, 1, 48)?;
2270
2271 let state = self.state.read();
2272 let archive = state.archives.get(name).ok_or_else(|| {
2273 AwsServiceError::aws_error(
2274 StatusCode::BAD_REQUEST,
2275 "ResourceNotFoundException",
2276 format!("Archive {name} does not exist."),
2277 )
2278 })?;
2279
2280 let mut resp = json!({
2281 "ArchiveArn": archive.arn,
2282 "ArchiveName": archive.name,
2283 "CreationTime": archive.creation_time.timestamp() as f64,
2284 "EventCount": archive.event_count,
2285 "EventSourceArn": archive.event_source_arn,
2286 "RetentionDays": archive.retention_days,
2287 "SizeBytes": archive.size_bytes,
2288 "State": archive.state,
2289 });
2290 if let Some(ref desc) = archive.description {
2291 resp["Description"] = json!(desc);
2292 }
2293 if let Some(ref ep) = archive.event_pattern {
2294 resp["EventPattern"] = json!(ep);
2295 }
2296
2297 Ok(AwsResponse::ok_json(resp))
2298 }
2299
2300 fn list_archives(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2301 let body = req.json_body();
2302 validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 48)?;
2303 validate_optional_string_length(
2304 "eventSourceArn",
2305 body["EventSourceArn"].as_str(),
2306 1,
2307 1600,
2308 )?;
2309 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
2310 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
2311 let name_prefix = body["NamePrefix"].as_str();
2312 let source_arn = body["EventSourceArn"].as_str();
2313 let archive_state = body["State"].as_str();
2314
2315 let filter_count = [
2317 name_prefix.is_some(),
2318 source_arn.is_some(),
2319 archive_state.is_some(),
2320 ]
2321 .iter()
2322 .filter(|&&x| x)
2323 .count();
2324 if filter_count > 1 {
2325 return Err(AwsServiceError::aws_error(
2326 StatusCode::BAD_REQUEST,
2327 "ValidationException",
2328 "At most one filter is allowed for ListArchives. Use either : State, EventSourceArn, or NamePrefix.",
2329 ));
2330 }
2331
2332 if let Some(s) = archive_state {
2334 let valid = [
2335 "ENABLED",
2336 "DISABLED",
2337 "CREATING",
2338 "UPDATING",
2339 "CREATE_FAILED",
2340 "UPDATE_FAILED",
2341 ];
2342 if !valid.contains(&s) {
2343 return Err(AwsServiceError::aws_error(
2344 StatusCode::BAD_REQUEST,
2345 "ValidationException",
2346 format!(
2347 "1 validation error detected: Value '{}' at 'state' failed to satisfy constraint: Member must satisfy enum value set: [ENABLED, DISABLED, CREATING, UPDATING, CREATE_FAILED, UPDATE_FAILED]",
2348 s
2349 ),
2350 ));
2351 }
2352 }
2353
2354 let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
2355
2356 let state = self.state.read();
2357 let all: Vec<Value> = state
2358 .archives
2359 .values()
2360 .filter(|a| {
2361 if let Some(prefix) = name_prefix {
2362 a.name.starts_with(prefix)
2363 } else if let Some(arn) = source_arn {
2364 a.event_source_arn == arn
2365 } else if let Some(s) = archive_state {
2366 a.state == s
2367 } else {
2368 true
2369 }
2370 })
2371 .map(|a| {
2372 json!({
2373 "ArchiveName": a.name,
2374 "CreationTime": a.creation_time.timestamp() as f64,
2375 "EventCount": a.event_count,
2376 "EventSourceArn": a.event_source_arn,
2377 "RetentionDays": a.retention_days,
2378 "SizeBytes": a.size_bytes,
2379 "State": a.state,
2380 })
2381 })
2382 .collect();
2383
2384 let (archives, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
2385 let mut resp = json!({ "Archives": archives });
2386 if let Some(token) = next_token {
2387 resp["NextToken"] = json!(token);
2388 }
2389
2390 Ok(AwsResponse::ok_json(resp))
2391 }
2392
2393 fn update_archive(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2394 let body = req.json_body();
2395 validate_required("ArchiveName", &body["ArchiveName"])?;
2396 let name = body["ArchiveName"]
2397 .as_str()
2398 .ok_or_else(|| missing("ArchiveName"))?;
2399 validate_string_length("archiveName", name, 1, 48)?;
2400 validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2401 validate_optional_string_length("eventPattern", body["EventPattern"].as_str(), 0, 4096)?;
2402 if let Some(rd) = body["RetentionDays"].as_i64() {
2403 validate_range_i64("retentionDays", rd, 0, i64::MAX)?;
2404 }
2405
2406 if let Some(pattern) = body["EventPattern"].as_str() {
2408 validate_event_pattern(pattern)?;
2409 }
2410
2411 let mut state = self.state.write();
2412 let archive = state.archives.get_mut(name).ok_or_else(|| {
2413 AwsServiceError::aws_error(
2414 StatusCode::BAD_REQUEST,
2415 "ResourceNotFoundException",
2416 format!("Archive {name} does not exist."),
2417 )
2418 })?;
2419
2420 if let Some(desc) = body["Description"].as_str() {
2421 archive.description = Some(desc.to_string());
2422 }
2423 if let Some(pattern) = body["EventPattern"].as_str() {
2424 archive.event_pattern = Some(pattern.to_string());
2425 }
2426 if let Some(days) = body["RetentionDays"].as_i64() {
2427 archive.retention_days = days;
2428 }
2429
2430 Ok(AwsResponse::ok_json(json!({
2431 "ArchiveArn": archive.arn,
2432 "CreationTime": archive.creation_time.timestamp() as f64,
2433 "State": archive.state,
2434 })))
2435 }
2436
2437 fn delete_archive(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2438 let body = req.json_body();
2439 validate_required("ArchiveName", &body["ArchiveName"])?;
2440 let name = body["ArchiveName"]
2441 .as_str()
2442 .ok_or_else(|| missing("ArchiveName"))?;
2443 validate_string_length("archiveName", name, 1, 48)?;
2444
2445 let mut state = self.state.write();
2446 if !state.archives.contains_key(name) {
2447 return Err(AwsServiceError::aws_error(
2448 StatusCode::BAD_REQUEST,
2449 "ResourceNotFoundException",
2450 format!("Archive {name} does not exist."),
2451 ));
2452 }
2453
2454 state.archives.remove(name);
2455
2456 let rule_name = format!("Events-Archive-{name}");
2458 state.rules.retain(|k, _| k.1 != rule_name);
2459
2460 Ok(AwsResponse::ok_json(json!({})))
2461 }
2462
2463 fn create_connection(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2466 let body = req.json_body();
2467 validate_required("Name", &body["Name"])?;
2468 let name = body["Name"]
2469 .as_str()
2470 .ok_or_else(|| missing("Name"))?
2471 .to_string();
2472 validate_string_length("name", &name, 1, 64)?;
2473 validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2474 validate_required("AuthorizationType", &body["AuthorizationType"])?;
2475 let description = body["Description"].as_str().map(|s| s.to_string());
2476 let auth_type = body["AuthorizationType"]
2477 .as_str()
2478 .ok_or_else(|| missing("AuthorizationType"))?
2479 .to_string();
2480 validate_enum(
2481 "authorizationType",
2482 &auth_type,
2483 &["BASIC", "OAUTH_CLIENT_CREDENTIALS", "API_KEY"],
2484 )?;
2485 validate_optional_string_length(
2486 "kmsKeyIdentifier",
2487 body["KmsKeyIdentifier"].as_str(),
2488 0,
2489 2048,
2490 )?;
2491 validate_required("AuthParameters", &body["AuthParameters"])?;
2492 let auth_params = body["AuthParameters"].clone();
2493
2494 let mut state = self.state.write();
2495 let now = Utc::now();
2496 let conn_uuid = uuid::Uuid::new_v4();
2497 let arn = format!(
2498 "arn:aws:events:{}:{}:connection/{}/{}",
2499 req.region, state.account_id, name, conn_uuid
2500 );
2501 let secret_arn = format!(
2502 "arn:aws:secretsmanager:{}:{}:secret:events!connection/{}/{}",
2503 req.region, state.account_id, name, conn_uuid
2504 );
2505
2506 let conn = Connection {
2507 name: name.clone(),
2508 arn: arn.clone(),
2509 description,
2510 authorization_type: auth_type.clone(),
2511 auth_parameters: auth_params,
2512 connection_state: "AUTHORIZED".to_string(),
2513 secret_arn: secret_arn.clone(),
2514 creation_time: now,
2515 last_modified_time: now,
2516 last_authorized_time: now,
2517 };
2518 state.connections.insert(name, conn);
2519
2520 Ok(AwsResponse::ok_json(json!({
2521 "ConnectionArn": arn,
2522 "ConnectionState": "AUTHORIZED",
2523 "CreationTime": now.timestamp() as f64,
2524 "LastModifiedTime": now.timestamp() as f64,
2525 })))
2526 }
2527
2528 fn describe_connection(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2529 let body = req.json_body();
2530 validate_required("Name", &body["Name"])?;
2531 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
2532 validate_string_length("name", name, 1, 64)?;
2533
2534 let state = self.state.read();
2535 let conn = state.connections.get(name).ok_or_else(|| {
2536 AwsServiceError::aws_error(
2537 StatusCode::BAD_REQUEST,
2538 "ResourceNotFoundException",
2539 format!("Connection '{name}' does not exist."),
2540 )
2541 })?;
2542
2543 let auth_params_response =
2545 build_auth_params_response(&conn.authorization_type, &conn.auth_parameters);
2546
2547 let mut resp = json!({
2548 "ConnectionArn": conn.arn,
2549 "Name": conn.name,
2550 "AuthorizationType": conn.authorization_type,
2551 "AuthParameters": auth_params_response,
2552 "ConnectionState": conn.connection_state,
2553 "SecretArn": conn.secret_arn,
2554 "CreationTime": conn.creation_time.timestamp() as f64,
2555 "LastModifiedTime": conn.last_modified_time.timestamp() as f64,
2556 "LastAuthorizedTime": conn.last_authorized_time.timestamp() as f64,
2557 });
2558 if let Some(ref desc) = conn.description {
2559 resp["Description"] = json!(desc);
2560 }
2561
2562 Ok(AwsResponse::ok_json(resp))
2563 }
2564
2565 fn list_connections(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2566 let body = req.json_body();
2567 validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 64)?;
2568 validate_optional_enum(
2569 "connectionState",
2570 body["ConnectionState"].as_str(),
2571 &[
2572 "CREATING",
2573 "UPDATING",
2574 "DELETING",
2575 "AUTHORIZED",
2576 "DEAUTHORIZED",
2577 "AUTHORIZING",
2578 "DEAUTHORIZING",
2579 "ACTIVE",
2580 "FAILED_CONNECTIVITY",
2581 ],
2582 )?;
2583 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
2584 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
2585
2586 let name_prefix = body["NamePrefix"].as_str();
2587 let connection_state = body["ConnectionState"].as_str();
2588 let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
2589
2590 let state = self.state.read();
2591 let all: Vec<Value> = state
2592 .connections
2593 .values()
2594 .filter(|c| {
2595 if let Some(prefix) = name_prefix {
2596 if !c.name.starts_with(prefix) {
2597 return false;
2598 }
2599 }
2600 if let Some(cs) = connection_state {
2601 if c.connection_state != cs {
2602 return false;
2603 }
2604 }
2605 true
2606 })
2607 .map(|c| {
2608 json!({
2609 "ConnectionArn": c.arn,
2610 "Name": c.name,
2611 "AuthorizationType": c.authorization_type,
2612 "ConnectionState": c.connection_state,
2613 "CreationTime": c.creation_time.timestamp() as f64,
2614 "LastModifiedTime": c.last_modified_time.timestamp() as f64,
2615 "LastAuthorizedTime": c.last_authorized_time.timestamp() as f64,
2616 })
2617 })
2618 .collect();
2619
2620 let (conns, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
2621 let mut resp = json!({ "Connections": conns });
2622 if let Some(token) = next_token {
2623 resp["NextToken"] = json!(token);
2624 }
2625
2626 Ok(AwsResponse::ok_json(resp))
2627 }
2628
2629 fn update_connection(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2630 let body = req.json_body();
2631 validate_required("Name", &body["Name"])?;
2632 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
2633 validate_string_length("name", name, 1, 64)?;
2634 validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2635 validate_optional_enum(
2636 "authorizationType",
2637 body["AuthorizationType"].as_str(),
2638 &["BASIC", "OAUTH_CLIENT_CREDENTIALS", "API_KEY"],
2639 )?;
2640
2641 let mut state = self.state.write();
2642 let conn = state.connections.get_mut(name).ok_or_else(|| {
2643 AwsServiceError::aws_error(
2644 StatusCode::BAD_REQUEST,
2645 "ResourceNotFoundException",
2646 format!("Connection '{name}' does not exist."),
2647 )
2648 })?;
2649
2650 if let Some(desc) = body["Description"].as_str() {
2651 conn.description = Some(desc.to_string());
2652 }
2653 if let Some(auth_type) = body["AuthorizationType"].as_str() {
2654 conn.authorization_type = auth_type.to_string();
2655 }
2656 if body.get("AuthParameters").is_some() {
2657 conn.auth_parameters = body["AuthParameters"].clone();
2658 }
2659 conn.last_modified_time = Utc::now();
2660
2661 Ok(AwsResponse::ok_json(json!({
2662 "ConnectionArn": conn.arn,
2663 "ConnectionState": conn.connection_state,
2664 "CreationTime": conn.creation_time.timestamp() as f64,
2665 "LastModifiedTime": conn.last_modified_time.timestamp() as f64,
2666 "LastAuthorizedTime": conn.last_authorized_time.timestamp() as f64,
2667 })))
2668 }
2669
2670 fn delete_connection(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2671 let body = req.json_body();
2672 validate_required("Name", &body["Name"])?;
2673 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
2674 validate_string_length("name", name, 1, 64)?;
2675
2676 let mut state = self.state.write();
2677 let conn = state.connections.remove(name).ok_or_else(|| {
2678 AwsServiceError::aws_error(
2679 StatusCode::BAD_REQUEST,
2680 "ResourceNotFoundException",
2681 format!("Connection '{name}' does not exist."),
2682 )
2683 })?;
2684
2685 Ok(AwsResponse::ok_json(json!({
2686 "ConnectionArn": conn.arn,
2687 "ConnectionState": conn.connection_state,
2688 "CreationTime": conn.creation_time.timestamp() as f64,
2689 "LastModifiedTime": conn.last_modified_time.timestamp() as f64,
2690 "LastAuthorizedTime": conn.last_authorized_time.timestamp() as f64,
2691 })))
2692 }
2693
2694 fn create_api_destination(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2697 let body = req.json_body();
2698 validate_required("Name", &body["Name"])?;
2699 let name = body["Name"]
2700 .as_str()
2701 .ok_or_else(|| missing("Name"))?
2702 .to_string();
2703 validate_string_length("name", &name, 1, 64)?;
2704 validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2705 validate_required("ConnectionArn", &body["ConnectionArn"])?;
2706 let description = body["Description"].as_str().map(|s| s.to_string());
2707 let connection_arn = body["ConnectionArn"]
2708 .as_str()
2709 .ok_or_else(|| missing("ConnectionArn"))?
2710 .to_string();
2711 validate_string_length("connectionArn", &connection_arn, 1, 1600)?;
2712 validate_required("InvocationEndpoint", &body["InvocationEndpoint"])?;
2713 let endpoint = body["InvocationEndpoint"]
2714 .as_str()
2715 .ok_or_else(|| missing("InvocationEndpoint"))?
2716 .to_string();
2717 validate_string_length("invocationEndpoint", &endpoint, 1, 2048)?;
2718 validate_required("HttpMethod", &body["HttpMethod"])?;
2719 let http_method = body["HttpMethod"]
2720 .as_str()
2721 .ok_or_else(|| missing("HttpMethod"))?
2722 .to_string();
2723 validate_enum(
2724 "httpMethod",
2725 &http_method,
2726 &["POST", "GET", "HEAD", "OPTIONS", "PUT", "PATCH", "DELETE"],
2727 )?;
2728 let rate_limit = body["InvocationRateLimitPerSecond"].as_i64();
2729 if let Some(r) = rate_limit {
2730 validate_range_i64("invocationRateLimitPerSecond", r, 1, i64::MAX)?;
2731 }
2732
2733 let mut state = self.state.write();
2734 let now = Utc::now();
2735 let dest_uuid = uuid::Uuid::new_v4();
2736 let arn = format!(
2737 "arn:aws:events:{}:{}:api-destination/{}/{}",
2738 req.region, state.account_id, name, dest_uuid
2739 );
2740
2741 let dest = ApiDestination {
2742 name: name.clone(),
2743 arn: arn.clone(),
2744 description,
2745 connection_arn,
2746 invocation_endpoint: endpoint,
2747 http_method,
2748 invocation_rate_limit_per_second: rate_limit,
2749 state: "ACTIVE".to_string(),
2750 creation_time: now,
2751 last_modified_time: now,
2752 };
2753 state.api_destinations.insert(name, dest);
2754
2755 Ok(AwsResponse::ok_json(json!({
2756 "ApiDestinationArn": arn,
2757 "ApiDestinationState": "ACTIVE",
2758 "CreationTime": now.timestamp() as f64,
2759 "LastModifiedTime": now.timestamp() as f64,
2760 })))
2761 }
2762
2763 fn describe_api_destination(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2764 let body = req.json_body();
2765 validate_required("Name", &body["Name"])?;
2766 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
2767 validate_string_length("name", name, 1, 64)?;
2768
2769 let state = self.state.read();
2770 let dest = state.api_destinations.get(name).ok_or_else(|| {
2771 AwsServiceError::aws_error(
2772 StatusCode::BAD_REQUEST,
2773 "ResourceNotFoundException",
2774 format!("An api-destination '{name}' does not exist."),
2775 )
2776 })?;
2777
2778 let mut resp = json!({
2779 "ApiDestinationArn": dest.arn,
2780 "Name": dest.name,
2781 "ConnectionArn": dest.connection_arn,
2782 "InvocationEndpoint": dest.invocation_endpoint,
2783 "HttpMethod": dest.http_method,
2784 "ApiDestinationState": dest.state,
2785 "CreationTime": dest.creation_time.timestamp() as f64,
2786 "LastModifiedTime": dest.last_modified_time.timestamp() as f64,
2787 });
2788 if let Some(ref desc) = dest.description {
2789 resp["Description"] = json!(desc);
2790 }
2791 if let Some(rate) = dest.invocation_rate_limit_per_second {
2792 resp["InvocationRateLimitPerSecond"] = json!(rate);
2793 }
2794
2795 Ok(AwsResponse::ok_json(resp))
2796 }
2797
2798 fn list_api_destinations(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2799 let body = req.json_body();
2800 validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 64)?;
2801 validate_optional_string_length("connectionArn", body["ConnectionArn"].as_str(), 1, 1600)?;
2802 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
2803 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
2804
2805 let name_prefix = body["NamePrefix"].as_str();
2806 let connection_arn = body["ConnectionArn"].as_str();
2807 let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
2808
2809 let state = self.state.read();
2810 let all: Vec<Value> = state
2811 .api_destinations
2812 .values()
2813 .filter(|d| {
2814 if let Some(prefix) = name_prefix {
2815 if !d.name.starts_with(prefix) {
2816 return false;
2817 }
2818 }
2819 if let Some(arn) = connection_arn {
2820 if d.connection_arn != arn {
2821 return false;
2822 }
2823 }
2824 true
2825 })
2826 .map(|d| {
2827 let mut obj = json!({
2828 "ApiDestinationArn": d.arn,
2829 "Name": d.name,
2830 "ConnectionArn": d.connection_arn,
2831 "InvocationEndpoint": d.invocation_endpoint,
2832 "HttpMethod": d.http_method,
2833 "ApiDestinationState": d.state,
2834 "CreationTime": d.creation_time.timestamp() as f64,
2835 "LastModifiedTime": d.last_modified_time.timestamp() as f64,
2836 });
2837 if let Some(rate) = d.invocation_rate_limit_per_second {
2838 obj["InvocationRateLimitPerSecond"] = json!(rate);
2839 }
2840 obj
2841 })
2842 .collect();
2843
2844 let (dests, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
2845 let mut resp = json!({ "ApiDestinations": dests });
2846 if let Some(token) = next_token {
2847 resp["NextToken"] = json!(token);
2848 }
2849
2850 Ok(AwsResponse::ok_json(resp))
2851 }
2852
2853 fn update_api_destination(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2854 let body = req.json_body();
2855 validate_required("Name", &body["Name"])?;
2856 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
2857 validate_string_length("name", name, 1, 64)?;
2858 validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2859 validate_optional_string_length("connectionArn", body["ConnectionArn"].as_str(), 1, 1600)?;
2860 validate_optional_string_length(
2861 "invocationEndpoint",
2862 body["InvocationEndpoint"].as_str(),
2863 1,
2864 2048,
2865 )?;
2866 validate_optional_enum(
2867 "httpMethod",
2868 body["HttpMethod"].as_str(),
2869 &["POST", "GET", "HEAD", "OPTIONS", "PUT", "PATCH", "DELETE"],
2870 )?;
2871 if let Some(r) = body["InvocationRateLimitPerSecond"].as_i64() {
2872 validate_range_i64("invocationRateLimitPerSecond", r, 1, i64::MAX)?;
2873 }
2874
2875 let mut state = self.state.write();
2876 let dest = state.api_destinations.get_mut(name).ok_or_else(|| {
2877 AwsServiceError::aws_error(
2878 StatusCode::BAD_REQUEST,
2879 "ResourceNotFoundException",
2880 format!("An api-destination '{name}' does not exist."),
2881 )
2882 })?;
2883
2884 if let Some(desc) = body["Description"].as_str() {
2885 dest.description = Some(desc.to_string());
2886 }
2887 if let Some(endpoint) = body["InvocationEndpoint"].as_str() {
2888 dest.invocation_endpoint = endpoint.to_string();
2889 }
2890 if let Some(method) = body["HttpMethod"].as_str() {
2891 dest.http_method = method.to_string();
2892 }
2893 if let Some(rate) = body["InvocationRateLimitPerSecond"].as_i64() {
2894 dest.invocation_rate_limit_per_second = Some(rate);
2895 }
2896 if let Some(conn) = body["ConnectionArn"].as_str() {
2897 dest.connection_arn = conn.to_string();
2898 }
2899 dest.last_modified_time = Utc::now();
2900
2901 Ok(AwsResponse::ok_json(json!({
2902 "ApiDestinationArn": dest.arn,
2903 "ApiDestinationState": dest.state,
2904 "CreationTime": dest.creation_time.timestamp() as f64,
2905 "LastModifiedTime": dest.last_modified_time.timestamp() as f64,
2906 })))
2907 }
2908
2909 fn delete_api_destination(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2910 let body = req.json_body();
2911 validate_required("Name", &body["Name"])?;
2912 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
2913 validate_string_length("name", name, 1, 64)?;
2914
2915 let mut state = self.state.write();
2916 if !state.api_destinations.contains_key(name) {
2917 return Err(AwsServiceError::aws_error(
2918 StatusCode::BAD_REQUEST,
2919 "ResourceNotFoundException",
2920 format!("An api-destination '{name}' does not exist."),
2921 ));
2922 }
2923 state.api_destinations.remove(name);
2924
2925 Ok(AwsResponse::ok_json(json!({})))
2926 }
2927
2928 fn start_replay(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2931 let input = StartReplayInput::from_body(&req.json_body())?;
2932
2933 let mut state = self.state.write();
2934
2935 let bus_name = state.resolve_bus_name(&input.destination_arn);
2937 if !state.buses.contains_key(&bus_name) {
2938 return Err(AwsServiceError::aws_error(
2939 StatusCode::BAD_REQUEST,
2940 "ResourceNotFoundException",
2941 format!("Event bus {bus_name} does not exist."),
2942 ));
2943 }
2944
2945 let archive_name = input
2946 .event_source_arn
2947 .rsplit_once("archive/")
2948 .map(|(_, n)| n.to_string())
2949 .unwrap_or_default();
2950 let archive = state.archives.get(&archive_name).ok_or_else(|| {
2951 AwsServiceError::aws_error(
2952 StatusCode::BAD_REQUEST,
2953 "ValidationException",
2954 format!(
2955 "Parameter EventSourceArn is not valid. Reason: Archive {archive_name} does not exist."
2956 ),
2957 )
2958 })?;
2959 let archive_bus = state.resolve_bus_name(&archive.event_source_arn);
2960 if archive_bus != bus_name {
2961 return Err(AwsServiceError::aws_error(
2962 StatusCode::BAD_REQUEST,
2963 "ValidationException",
2964 "Parameter Destination.Arn is not valid. Reason: Cross event bus replay is not permitted.",
2965 ));
2966 }
2967
2968 if input.event_end_time <= input.event_start_time {
2969 return Err(AwsServiceError::aws_error(
2970 StatusCode::BAD_REQUEST,
2971 "ValidationException",
2972 "Parameter EventEndTime is not valid. Reason: EventStartTime must be before EventEndTime.",
2973 ));
2974 }
2975
2976 if state.replays.contains_key(&input.name) {
2977 return Err(AwsServiceError::aws_error(
2978 StatusCode::BAD_REQUEST,
2979 "ResourceAlreadyExistsException",
2980 format!("Replay {} already exists.", input.name),
2981 ));
2982 }
2983
2984 let now = Utc::now();
2985 let arn = format!(
2986 "arn:aws:events:{}:{}:replay/{}",
2987 req.region, state.account_id, input.name
2988 );
2989
2990 let events_to_deliver = collect_replay_events_with_targets(
2991 &state,
2992 &archive_name,
2993 &bus_name,
2994 input.event_start_time,
2995 input.event_end_time,
2996 &req.account_id,
2997 &req.region,
2998 );
2999
3000 let replay = Replay {
3001 name: input.name.clone(),
3002 arn: arn.clone(),
3003 description: input.description,
3004 event_source_arn: input.event_source_arn,
3005 destination: input.destination,
3006 event_start_time: input.event_start_time,
3007 event_end_time: input.event_end_time,
3008 state: "COMPLETED".to_string(),
3009 replay_start_time: now,
3010 replay_end_time: Some(now),
3011 };
3012 state.replays.insert(input.name, replay);
3013
3014 drop(state);
3015
3016 for (event, targets) in events_to_deliver {
3017 let detail_value: Value = serde_json::from_str(&event.detail).unwrap_or(json!({}));
3018 let event_json = json!({
3019 "version": "0",
3020 "id": event.event_id,
3021 "source": event.source,
3022 "account": req.account_id,
3023 "detail-type": event.detail_type,
3024 "detail": detail_value,
3025 "time": event.time.format("%Y-%m-%dT%H:%M:%SZ").to_string(),
3026 "region": req.region,
3027 "resources": event.resources,
3028 "replay-name": arn,
3029 });
3030 let event_str = event_json.to_string();
3031
3032 for target in targets {
3033 self.deliver_replay_event_to_target(&target, &event, &event_json, &event_str);
3034 }
3035 }
3036
3037 Ok(AwsResponse::ok_json(json!({
3038 "ReplayArn": arn,
3039 "ReplayStartTime": now.timestamp() as f64,
3040 "State": "STARTING",
3041 })))
3042 }
3043
3044 fn deliver_replay_event_to_target(
3045 &self,
3046 target: &EventTarget,
3047 event: &PutEvent,
3048 event_json: &Value,
3049 event_str: &str,
3050 ) {
3051 let target_arn = &target.arn;
3052 let body_str = if let Some(ref transformer) = target.input_transformer {
3053 apply_input_transformer(transformer, event_json)
3054 } else if let Some(ref input) = target.input {
3055 input.clone()
3056 } else if let Some(ref input_path) = target.input_path {
3057 resolve_json_path(event_json, input_path)
3058 .map(|v| v.to_string())
3059 .unwrap_or_else(|| event_str.to_string())
3060 } else {
3061 event_str.to_string()
3062 };
3063
3064 if target_arn.contains(":sqs:") {
3065 let group_id = target
3066 .sqs_parameters
3067 .as_ref()
3068 .and_then(|p| p["MessageGroupId"].as_str())
3069 .map(|s| s.to_string());
3070 if group_id.is_some() {
3071 self.delivery.send_to_sqs_with_attrs(
3072 target_arn,
3073 &body_str,
3074 &HashMap::new(),
3075 group_id.as_deref(),
3076 None,
3077 );
3078 } else {
3079 self.delivery
3080 .send_to_sqs(target_arn, &body_str, &HashMap::new());
3081 }
3082 } else if target_arn.contains(":sns:") {
3083 self.delivery
3084 .publish_to_sns(target_arn, &body_str, Some(&event.detail_type));
3085 } else if target_arn.contains(":lambda:") {
3086 let mut state = self.state.write();
3087 state
3088 .lambda_invocations
3089 .push(crate::state::LambdaInvocation {
3090 function_arn: target_arn.clone(),
3091 payload: body_str.clone(),
3092 timestamp: Utc::now(),
3093 });
3094 drop(state);
3095 if let Some(ref ls) = self.lambda_state {
3096 ls.write().invocations.push(LambdaInvocation {
3097 function_arn: target_arn.clone(),
3098 payload: body_str.clone(),
3099 timestamp: Utc::now(),
3100 source: "aws:events".to_string(),
3101 });
3102 }
3103 invoke_lambda_async(
3104 &self.container_runtime,
3105 &self.lambda_state,
3106 target_arn,
3107 &body_str,
3108 );
3109 } else if target_arn.contains(":logs:") {
3110 let mut state = self.state.write();
3111 state.log_deliveries.push(crate::state::LogDelivery {
3112 log_group_arn: target_arn.clone(),
3113 payload: body_str.clone(),
3114 timestamp: Utc::now(),
3115 });
3116 drop(state);
3117 if let Some(ref log_state) = self.logs_state {
3118 deliver_to_logs(log_state, target_arn, &body_str, Utc::now());
3119 }
3120 } else if target_arn.contains(":states:") {
3121 self.delivery
3122 .start_stepfunctions_execution(target_arn, &body_str);
3123 let mut state = self.state.write();
3124 state
3125 .step_function_executions
3126 .push(crate::state::StepFunctionExecution {
3127 state_machine_arn: target_arn.clone(),
3128 payload: body_str.clone(),
3129 timestamp: Utc::now(),
3130 });
3131 } else if target_arn.starts_with("https://") || target_arn.starts_with("http://") {
3132 let url = target_arn.clone();
3133 let payload = body_str.clone();
3134 tokio::spawn(async move {
3135 let client = reqwest::Client::new();
3136 let _ = client
3137 .post(&url)
3138 .header("Content-Type", "application/json")
3139 .body(payload)
3140 .send()
3141 .await;
3142 });
3143 }
3144 }
3145
3146 fn describe_replay(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3147 let body = req.json_body();
3148 validate_required("ReplayName", &body["ReplayName"])?;
3149 let name = body["ReplayName"]
3150 .as_str()
3151 .ok_or_else(|| missing("ReplayName"))?;
3152 validate_string_length("replayName", name, 1, 64)?;
3153
3154 let state = self.state.read();
3155 let replay = state.replays.get(name).ok_or_else(|| {
3156 AwsServiceError::aws_error(
3157 StatusCode::BAD_REQUEST,
3158 "ResourceNotFoundException",
3159 format!("Replay {name} does not exist."),
3160 )
3161 })?;
3162
3163 let mut resp = json!({
3164 "Destination": replay.destination,
3165 "EventSourceArn": replay.event_source_arn,
3166 "EventStartTime": replay.event_start_time.timestamp() as f64,
3167 "EventEndTime": replay.event_end_time.timestamp() as f64,
3168 "ReplayArn": replay.arn,
3169 "ReplayName": replay.name,
3170 "ReplayStartTime": replay.replay_start_time.timestamp() as f64,
3171 "State": replay.state,
3172 });
3173 if let Some(ref desc) = replay.description {
3174 resp["Description"] = json!(desc);
3175 }
3176 if let Some(ref end) = replay.replay_end_time {
3177 resp["ReplayEndTime"] = json!(end.timestamp() as f64);
3178 }
3179
3180 Ok(AwsResponse::ok_json(resp))
3181 }
3182
3183 fn list_replays(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3184 let body = req.json_body();
3185 validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 64)?;
3186 validate_optional_string_length(
3187 "eventSourceArn",
3188 body["EventSourceArn"].as_str(),
3189 1,
3190 1600,
3191 )?;
3192 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
3193 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
3194 let name_prefix = body["NamePrefix"].as_str();
3195 let source_arn = body["EventSourceArn"].as_str();
3196 let replay_state = body["State"].as_str();
3197
3198 let filter_count = [
3200 name_prefix.is_some(),
3201 source_arn.is_some(),
3202 replay_state.is_some(),
3203 ]
3204 .iter()
3205 .filter(|&&x| x)
3206 .count();
3207 if filter_count > 1 {
3208 return Err(AwsServiceError::aws_error(
3209 StatusCode::BAD_REQUEST,
3210 "ValidationException",
3211 "At most one filter is allowed for ListReplays. Use either : State, EventSourceArn, or NamePrefix.",
3212 ));
3213 }
3214
3215 if let Some(s) = replay_state {
3217 let valid = [
3218 "CANCELLED",
3219 "CANCELLING",
3220 "COMPLETED",
3221 "FAILED",
3222 "RUNNING",
3223 "STARTING",
3224 ];
3225 if !valid.contains(&s) {
3226 return Err(AwsServiceError::aws_error(
3227 StatusCode::BAD_REQUEST,
3228 "ValidationException",
3229 format!(
3230 "1 validation error detected: Value '{}' at 'state' failed to satisfy constraint: Member must satisfy enum value set: [CANCELLED, CANCELLING, COMPLETED, FAILED, RUNNING, STARTING]",
3231 s
3232 ),
3233 ));
3234 }
3235 }
3236
3237 let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
3238
3239 let state = self.state.read();
3240 let all: Vec<Value> = state
3241 .replays
3242 .values()
3243 .filter(|r| {
3244 if let Some(prefix) = name_prefix {
3245 r.name.starts_with(prefix)
3246 } else if let Some(arn) = source_arn {
3247 r.event_source_arn == arn
3248 } else if let Some(s) = replay_state {
3249 r.state == s
3250 } else {
3251 true
3252 }
3253 })
3254 .map(|r| {
3255 let mut obj = json!({
3256 "EventSourceArn": r.event_source_arn,
3257 "EventStartTime": r.event_start_time.timestamp() as f64,
3258 "EventEndTime": r.event_end_time.timestamp() as f64,
3259 "ReplayName": r.name,
3260 "ReplayStartTime": r.replay_start_time.timestamp() as f64,
3261 "State": r.state,
3262 });
3263 if let Some(ref end) = r.replay_end_time {
3264 obj["ReplayEndTime"] = json!(end.timestamp() as f64);
3265 }
3266 obj
3267 })
3268 .collect();
3269
3270 let (replays, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
3271 let mut resp = json!({ "Replays": replays });
3272 if let Some(token) = next_token {
3273 resp["NextToken"] = json!(token);
3274 }
3275
3276 Ok(AwsResponse::ok_json(resp))
3277 }
3278
3279 fn cancel_replay(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3280 let body = req.json_body();
3281 validate_required("ReplayName", &body["ReplayName"])?;
3282 let name = body["ReplayName"]
3283 .as_str()
3284 .ok_or_else(|| missing("ReplayName"))?;
3285 validate_string_length("replayName", name, 1, 64)?;
3286
3287 let mut state = self.state.write();
3288 let replay = state.replays.get_mut(name).ok_or_else(|| {
3289 AwsServiceError::aws_error(
3290 StatusCode::BAD_REQUEST,
3291 "ResourceNotFoundException",
3292 format!("Replay {name} does not exist."),
3293 )
3294 })?;
3295
3296 if replay.state == "CANCELLED" || replay.state == "CANCELLING" {
3298 return Err(AwsServiceError::aws_error(
3299 StatusCode::BAD_REQUEST,
3300 "IllegalStatusException",
3301 format!("Replay {name} is not in a valid state for this operation."),
3302 ));
3303 }
3304
3305 let arn = replay.arn.clone();
3306 replay.state = "CANCELLED".to_string();
3307
3308 Ok(AwsResponse::ok_json(json!({
3309 "ReplayArn": arn,
3310 "State": "CANCELLING",
3311 })))
3312 }
3313}
3314
3315fn find_tags_mut<'a>(
3318 state: &'a mut crate::state::EventBridgeState,
3319 arn: &str,
3320) -> Result<&'a mut HashMap<String, String>, AwsServiceError> {
3321 for bus in state.buses.values_mut() {
3323 if bus.arn == arn {
3324 return Ok(&mut bus.tags);
3325 }
3326 }
3327 for rule in state.rules.values_mut() {
3329 if rule.arn == arn {
3330 return Ok(&mut rule.tags);
3331 }
3332 }
3333
3334 let error_msg = if arn.contains(":rule/") {
3336 let parts: Vec<&str> = arn.rsplitn(2, ":rule/").collect();
3338 if let Some(rule_path) = parts.first() {
3339 if let Some((bus, rule_name)) = rule_path.rsplit_once('/') {
3340 format!("Rule {rule_name} does not exist on EventBus {bus}.")
3341 } else {
3342 format!("Rule {} does not exist on EventBus default.", rule_path)
3343 }
3344 } else {
3345 format!("Resource {arn} not found.")
3346 }
3347 } else {
3348 format!("Resource {arn} not found.")
3349 };
3350
3351 Err(AwsServiceError::aws_error(
3352 StatusCode::BAD_REQUEST,
3353 "ResourceNotFoundException",
3354 error_msg,
3355 ))
3356}
3357
3358fn find_tags<'a>(
3359 state: &'a crate::state::EventBridgeState,
3360 arn: &str,
3361) -> Result<&'a HashMap<String, String>, AwsServiceError> {
3362 for bus in state.buses.values() {
3363 if bus.arn == arn {
3364 return Ok(&bus.tags);
3365 }
3366 }
3367 for rule in state.rules.values() {
3368 if rule.arn == arn {
3369 return Ok(&rule.tags);
3370 }
3371 }
3372
3373 let error_msg = if arn.contains(":rule/") {
3374 let parts: Vec<&str> = arn.rsplitn(2, ":rule/").collect();
3375 if let Some(rule_path) = parts.first() {
3376 if let Some((bus, rule_name)) = rule_path.rsplit_once('/') {
3377 format!("Rule {rule_name} does not exist on EventBus {bus}.")
3378 } else {
3379 format!("Rule {} does not exist on EventBus default.", rule_path)
3380 }
3381 } else {
3382 format!("Resource {arn} not found.")
3383 }
3384 } else {
3385 format!("Resource {arn} not found.")
3386 };
3387
3388 Err(AwsServiceError::aws_error(
3389 StatusCode::BAD_REQUEST,
3390 "ResourceNotFoundException",
3391 error_msg,
3392 ))
3393}
3394
3395fn validate_event_pattern(pattern: &str) -> Result<(), AwsServiceError> {
3398 let parsed: Value = serde_json::from_str(pattern).map_err(|_| {
3399 AwsServiceError::aws_error(
3400 StatusCode::BAD_REQUEST,
3401 "InvalidEventPatternException",
3402 "Event pattern is not valid. Reason: Invalid JSON",
3403 )
3404 })?;
3405
3406 validate_pattern_values(&parsed, "")?;
3407 Ok(())
3408}
3409
3410fn validate_pattern_values(value: &Value, path: &str) -> Result<(), AwsServiceError> {
3411 match value {
3412 Value::Object(obj) => {
3413 for (key, val) in obj {
3414 let new_path = if path.is_empty() {
3415 key.clone()
3416 } else {
3417 format!("{path}.{key}")
3418 };
3419 match val {
3420 Value::Object(_) => validate_pattern_values(val, &new_path)?,
3421 Value::Array(_) => {} _ => {
3423 return Err(AwsServiceError::aws_error(
3424 StatusCode::BAD_REQUEST,
3425 "InvalidEventPatternException",
3426 format!(
3427 "Event pattern is not valid. Reason: '{}' must be an object or an array",
3428 key
3429 ),
3430 ));
3431 }
3432 }
3433 }
3434 Ok(())
3435 }
3436 _ => Ok(()),
3437 }
3438}
3439
3440fn build_auth_params_response(auth_type: &str, params: &Value) -> Value {
3443 match auth_type {
3444 "API_KEY" => {
3445 let mut resp = json!({});
3446 if let Some(api_key) = params.get("ApiKeyAuthParameters") {
3447 resp["ApiKeyAuthParameters"] = json!({
3448 "ApiKeyName": api_key["ApiKeyName"],
3449 });
3450 }
3451 resp
3452 }
3453 "BASIC" => {
3454 let mut resp = json!({});
3455 if let Some(basic) = params.get("BasicAuthParameters") {
3456 resp["BasicAuthParameters"] = json!({
3457 "Username": basic["Username"],
3458 });
3459 }
3460 resp
3461 }
3462 "OAUTH_CLIENT_CREDENTIALS" => {
3463 let mut resp = json!({});
3464 if let Some(oauth) = params.get("OAuthParameters") {
3465 resp["OAuthParameters"] = json!({
3466 "AuthorizationEndpoint": oauth["AuthorizationEndpoint"],
3467 "HttpMethod": oauth["HttpMethod"],
3468 "ClientParameters": {
3469 "ClientID": oauth.get("ClientParameters").and_then(|c| c.get("ClientID")),
3470 },
3471 });
3472 }
3473 resp
3474 }
3475 _ => params.clone(),
3476 }
3477}
3478
3479pub fn matches_pattern(
3483 pattern_json: Option<&str>,
3484 source: &str,
3485 detail_type: &str,
3486 detail: &str,
3487 account: &str,
3488 region: &str,
3489 resources: &[String],
3490) -> bool {
3491 let pattern_json = match pattern_json {
3492 Some(p) => p,
3493 None => return true,
3494 };
3495
3496 let pattern: Value = match serde_json::from_str(pattern_json) {
3497 Ok(v) => v,
3498 Err(_) => return false,
3499 };
3500
3501 let pattern_obj = match pattern.as_object() {
3502 Some(o) => o,
3503 None => return false,
3504 };
3505
3506 let detail_value: Value = serde_json::from_str(detail).unwrap_or(json!({}));
3507 let event = json!({
3508 "source": source,
3509 "detail-type": detail_type,
3510 "detail": detail_value,
3511 "account": account,
3512 "region": region,
3513 "resources": resources,
3514 });
3515
3516 for (key, pattern_value) in pattern_obj {
3517 let event_value = &event[key];
3518 if !matches_value(pattern_value, event_value) {
3519 return false;
3520 }
3521 }
3522
3523 true
3524}
3525
3526fn matches_value(pattern: &Value, event_value: &Value) -> bool {
3527 match pattern {
3528 Value::Object(obj) => {
3529 for (key, sub_pattern) in obj {
3530 let sub_value = &event_value[key];
3531 if !matches_value(sub_pattern, sub_value) {
3532 return false;
3533 }
3534 }
3535 true
3536 }
3537 Value::Array(arr) => arr.iter().any(|elem| matches_single(elem, event_value)),
3538 _ => false,
3539 }
3540}
3541
3542fn matches_single(pattern_elem: &Value, event_value: &Value) -> bool {
3543 match pattern_elem {
3544 Value::Object(obj) => {
3545 if let Some(prefix_val) = obj.get("prefix") {
3546 if let (Some(prefix), Some(actual)) = (prefix_val.as_str(), event_value.as_str()) {
3547 return actual.starts_with(prefix);
3548 }
3549 return false;
3550 }
3551 if let Some(exists_val) = obj.get("exists") {
3552 let should_exist = exists_val.as_bool().unwrap_or(true);
3553 let does_exist = !event_value.is_null();
3554 return should_exist == does_exist;
3555 }
3556 if let Some(anything_but_val) = obj.get("anything-but") {
3557 return match anything_but_val {
3558 Value::String(s) => event_value.as_str() != Some(s.as_str()),
3559 Value::Array(arr) => !arr.iter().any(|v| values_equal(v, event_value)),
3560 Value::Number(_) => event_value != anything_but_val,
3561 _ => true,
3562 };
3563 }
3564 if let Some(numeric_val) = obj.get("numeric") {
3565 return matches_numeric(numeric_val, event_value);
3566 }
3567 false
3568 }
3569 _ => values_equal(pattern_elem, event_value),
3570 }
3571}
3572
3573#[allow(clippy::too_many_arguments)]
3577fn archive_matching_event(
3578 state: &mut crate::state::EventBridgeState,
3579 event: &PutEvent,
3580 event_bus_name: &str,
3581 source: &str,
3582 detail_type: &str,
3583 detail: &str,
3584 account_id: &str,
3585 region: &str,
3586 resources: &[String],
3587) {
3588 let archive_keys: Vec<String> = state.archives.keys().cloned().collect();
3589 for akey in archive_keys {
3590 let (archive_bus, archive_pattern, archive_enabled) = {
3591 let a = &state.archives[&akey];
3592 (
3593 state.resolve_bus_name(&a.event_source_arn),
3594 a.event_pattern.clone(),
3595 a.state == "ENABLED",
3596 )
3597 };
3598 if archive_bus != event_bus_name || !archive_enabled {
3599 continue;
3600 }
3601 let pattern_matches = matches_pattern(
3602 archive_pattern.as_deref(),
3603 source,
3604 detail_type,
3605 detail,
3606 account_id,
3607 region,
3608 resources,
3609 );
3610 if !pattern_matches {
3611 continue;
3612 }
3613 if let Some(archive) = state.archives.get_mut(&akey) {
3614 archive.event_count += 1;
3615 archive.size_bytes += detail.len() as i64;
3616 archive.events.push(event.clone());
3617 }
3618 }
3619}
3620
3621struct StartReplayInput {
3623 name: String,
3624 description: Option<String>,
3625 event_source_arn: String,
3626 destination: Value,
3627 destination_arn: String,
3628 event_start_time: DateTime<Utc>,
3629 event_end_time: DateTime<Utc>,
3630}
3631
3632impl StartReplayInput {
3633 fn from_body(body: &Value) -> Result<Self, AwsServiceError> {
3634 validate_required("ReplayName", &body["ReplayName"])?;
3635 let name = body["ReplayName"]
3636 .as_str()
3637 .ok_or_else(|| missing("ReplayName"))?
3638 .to_string();
3639 validate_string_length("replayName", &name, 1, 64)?;
3640 validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
3641 validate_required("EventSourceArn", &body["EventSourceArn"])?;
3642 let description = body["Description"].as_str().map(|s| s.to_string());
3643 let event_source_arn = body["EventSourceArn"]
3644 .as_str()
3645 .ok_or_else(|| missing("EventSourceArn"))?
3646 .to_string();
3647 validate_string_length("eventSourceArn", &event_source_arn, 1, 1600)?;
3648 validate_required("EventStartTime", &body["EventStartTime"])?;
3649 validate_required("EventEndTime", &body["EventEndTime"])?;
3650 validate_required("Destination", &body["Destination"])?;
3651 let destination = body["Destination"].clone();
3652
3653 let event_start_time = body["EventStartTime"]
3654 .as_f64()
3655 .and_then(|f| DateTime::from_timestamp(f as i64, 0))
3656 .unwrap_or_else(Utc::now);
3657 let event_end_time = body["EventEndTime"]
3658 .as_f64()
3659 .and_then(|f| DateTime::from_timestamp(f as i64, 0))
3660 .unwrap_or_else(Utc::now);
3661
3662 let destination_arn = destination["Arn"].as_str().unwrap_or("").to_string();
3663 if !destination_arn.contains(":event-bus/") {
3664 return Err(AwsServiceError::aws_error(
3665 StatusCode::BAD_REQUEST,
3666 "ValidationException",
3667 "Parameter Destination.Arn is not valid. Reason: Must contain an event bus ARN.",
3668 ));
3669 }
3670
3671 Ok(Self {
3672 name,
3673 description,
3674 event_source_arn,
3675 destination,
3676 destination_arn,
3677 event_start_time,
3678 event_end_time,
3679 })
3680 }
3681}
3682
3683#[allow(clippy::too_many_arguments)]
3688fn collect_replay_events_with_targets(
3689 state: &crate::state::EventBridgeState,
3690 archive_name: &str,
3691 bus_name: &str,
3692 event_start_time: DateTime<Utc>,
3693 event_end_time: DateTime<Utc>,
3694 account_id: &str,
3695 region: &str,
3696) -> Vec<(PutEvent, Vec<EventTarget>)> {
3697 let Some(archive) = state.archives.get(archive_name) else {
3698 return Vec::new();
3699 };
3700
3701 let replay_events: Vec<PutEvent> = archive
3702 .events
3703 .iter()
3704 .filter(|e| e.time >= event_start_time && e.time < event_end_time)
3705 .cloned()
3706 .collect();
3707
3708 let mut events_to_deliver: Vec<(PutEvent, Vec<EventTarget>)> = Vec::new();
3709 for event in replay_events {
3710 let matching_targets: Vec<EventTarget> = state
3711 .rules
3712 .values()
3713 .filter(|r| {
3714 r.event_bus_name == bus_name
3715 && r.state == "ENABLED"
3716 && matches_pattern(
3717 r.event_pattern.as_deref(),
3718 &event.source,
3719 &event.detail_type,
3720 &event.detail,
3721 account_id,
3722 region,
3723 &event.resources,
3724 )
3725 })
3726 .flat_map(|r| r.targets.clone())
3727 .collect();
3728
3729 if !matching_targets.is_empty() {
3730 events_to_deliver.push((event, matching_targets));
3731 }
3732 }
3733 events_to_deliver
3734}
3735
3736fn matches_numeric(numeric_arr: &Value, event_value: &Value) -> bool {
3737 let arr = match numeric_arr.as_array() {
3738 Some(a) => a,
3739 None => return false,
3740 };
3741 let actual = match event_value.as_f64() {
3742 Some(n) => n,
3743 None => return false,
3744 };
3745 let mut i = 0;
3746 while i + 1 < arr.len() {
3747 let op = match arr[i].as_str() {
3748 Some(s) => s,
3749 None => return false,
3750 };
3751 let threshold = match arr[i + 1].as_f64() {
3752 Some(n) => n,
3753 None => return false,
3754 };
3755 let ok = match op {
3756 ">" => actual > threshold,
3757 ">=" => actual >= threshold,
3758 "<" => actual < threshold,
3759 "<=" => actual <= threshold,
3760 "=" => (actual - threshold).abs() < f64::EPSILON,
3761 _ => return false,
3762 };
3763 if !ok {
3764 return false;
3765 }
3766 i += 2;
3767 }
3768 true
3769}
3770
3771fn values_equal(a: &Value, b: &Value) -> bool {
3772 a == b
3773}
3774
3775fn resolve_json_path(event: &Value, path: &str) -> Option<Value> {
3777 let path = path.strip_prefix('$').unwrap_or(path);
3778 let mut current = event;
3779 for segment in path.split('.') {
3780 if segment.is_empty() {
3781 continue;
3782 }
3783 current = current.get(segment)?;
3784 }
3785 Some(current.clone())
3786}
3787
3788fn apply_input_transformer(transformer: &Value, event: &Value) -> String {
3790 let input_paths_map = transformer
3791 .get("InputPathsMap")
3792 .and_then(|v| v.as_object())
3793 .cloned()
3794 .unwrap_or_default();
3795 let template = transformer
3796 .get("InputTemplate")
3797 .and_then(|v| v.as_str())
3798 .unwrap_or("")
3799 .to_string();
3800
3801 let mut resolved: HashMap<String, Value> = HashMap::new();
3803 for (var_name, path_val) in &input_paths_map {
3804 if let Some(path_str) = path_val.as_str() {
3805 if let Some(val) = resolve_json_path(event, path_str) {
3806 resolved.insert(var_name.clone(), val);
3807 }
3808 }
3809 }
3810
3811 let mut result = template;
3813 for (var_name, val) in &resolved {
3814 let placeholder = format!("<{var_name}>");
3815 let replacement = match val {
3816 Value::String(s) => s.clone(),
3817 other => other.to_string(),
3818 };
3819 result = result.replace(&placeholder, &replacement);
3820 }
3821
3822 result
3823}
3824
3825fn missing(name: &str) -> AwsServiceError {
3826 AwsServiceError::aws_error(
3827 StatusCode::BAD_REQUEST,
3828 "ValidationException",
3829 format!("The request must contain the parameter {name}"),
3830 )
3831}
3832
3833fn function_name_from_arn(arn: &str) -> &str {
3838 let parts: Vec<&str> = arn.split(':').collect();
3839 if parts.len() >= 7 && parts[5] == "function" {
3840 parts[6]
3841 } else {
3842 arn
3843 }
3844}
3845
3846pub fn invoke_lambda_async(
3849 container_runtime: &Option<Arc<ContainerRuntime>>,
3850 lambda_state: &Option<SharedLambdaState>,
3851 function_arn: &str,
3852 payload: &str,
3853) {
3854 let runtime = match container_runtime {
3855 Some(rt) => rt.clone(),
3856 None => return,
3857 };
3858 let lambda_state = match lambda_state {
3859 Some(ls) => ls.clone(),
3860 None => return,
3861 };
3862 let func_name = function_name_from_arn(function_arn).to_string();
3863 let payload = payload.as_bytes().to_vec();
3864
3865 tokio::spawn(async move {
3866 let func = {
3867 let state = lambda_state.read();
3868 state.functions.get(&func_name).cloned()
3869 };
3870 let func = match func {
3871 Some(f) => f,
3872 None => {
3873 tracing::warn!(
3874 function = %func_name,
3875 "EventBridge Lambda target not found, skipping invocation"
3876 );
3877 return;
3878 }
3879 };
3880 match runtime.invoke(&func, &payload).await {
3881 Ok(_) => {
3882 tracing::info!(function = %func_name, "EventBridge Lambda invocation succeeded");
3883 }
3884 Err(e) => {
3885 tracing::warn!(
3886 function = %func_name,
3887 error = %e,
3888 "EventBridge Lambda invocation failed"
3889 );
3890 }
3891 }
3892 });
3893}
3894
3895pub fn deliver_to_logs(
3898 logs_state: &SharedLogsState,
3899 log_group_arn: &str,
3900 payload: &str,
3901 timestamp: chrono::DateTime<chrono::Utc>,
3902) {
3903 let group_name = if log_group_arn.contains(":log-group:") {
3906 log_group_arn
3907 .split(":log-group:")
3908 .nth(1)
3909 .unwrap_or(log_group_arn)
3910 .trim_end_matches(":*")
3911 } else {
3912 log_group_arn
3913 };
3914
3915 let stream_name = "events".to_string();
3916 let ts_millis = timestamp.timestamp_millis();
3917
3918 let mut state = logs_state.write();
3919 let region = state.region.clone();
3920 let account_id = state.account_id.clone();
3921
3922 let group = state
3924 .log_groups
3925 .entry(group_name.to_string())
3926 .or_insert_with(|| fakecloud_logs::state::LogGroup {
3927 name: group_name.to_string(),
3928 arn: Arn::new(
3929 "logs",
3930 ®ion,
3931 &account_id,
3932 &format!("log-group:{group_name}"),
3933 )
3934 .to_string(),
3935 creation_time: ts_millis,
3936 retention_in_days: None,
3937 kms_key_id: None,
3938 tags: HashMap::new(),
3939 log_streams: HashMap::new(),
3940 stored_bytes: 0,
3941 subscription_filters: Vec::new(),
3942 data_protection_policy: None,
3943 index_policies: Vec::new(),
3944 transformer: None,
3945 deletion_protection: false,
3946 });
3947
3948 let stream = group
3949 .log_streams
3950 .entry(stream_name.clone())
3951 .or_insert_with(|| fakecloud_logs::state::LogStream {
3952 name: stream_name,
3953 arn: format!("{}:log-stream:events", group.arn),
3954 creation_time: ts_millis,
3955 first_event_timestamp: None,
3956 last_event_timestamp: None,
3957 last_ingestion_time: None,
3958 upload_sequence_token: "1".to_string(),
3959 events: Vec::new(),
3960 });
3961
3962 stream.events.push(fakecloud_logs::state::LogEvent {
3963 timestamp: ts_millis,
3964 message: payload.to_string(),
3965 ingestion_time: ts_millis,
3966 });
3967 stream.last_event_timestamp = Some(ts_millis);
3968 stream.last_ingestion_time = Some(ts_millis);
3969 if stream.first_event_timestamp.is_none() {
3970 stream.first_event_timestamp = Some(ts_millis);
3971 }
3972}
3973
3974fn apply_connection_auth(
3976 mut builder: reqwest::RequestBuilder,
3977 conn: &Connection,
3978) -> reqwest::RequestBuilder {
3979 match conn.authorization_type.as_str() {
3980 "API_KEY" => {
3981 if let Some(params) = conn.auth_parameters.get("ApiKeyAuthParameters") {
3982 if let (Some(name), Some(value)) = (
3983 params["ApiKeyName"].as_str(),
3984 params["ApiKeyValue"].as_str(),
3985 ) {
3986 builder = builder.header(name, value);
3987 }
3988 }
3989 }
3990 "BASIC" => {
3991 if let Some(params) = conn.auth_parameters.get("BasicAuthParameters") {
3992 if let (Some(user), Some(pass)) =
3993 (params["Username"].as_str(), params["Password"].as_str())
3994 {
3995 builder = builder.basic_auth(user, Some(pass));
3996 }
3997 }
3998 }
3999 "OAUTH_CLIENT_CREDENTIALS" => {
4000 if let Some(params) = conn.auth_parameters.get("OAuthParameters") {
4003 if let (Some(client_id), Some(client_secret)) = (
4004 params["ClientParameters"]["ClientID"].as_str(),
4005 params["ClientParameters"]["ClientSecret"].as_str(),
4006 ) {
4007 builder = builder.basic_auth(client_id, Some(client_secret));
4008 }
4009 }
4010 }
4011 _ => {}
4012 }
4013 builder
4014}
4015
4016#[cfg(test)]
4017mod tests {
4018 use super::*;
4019
4020 fn test_matches(
4022 pattern_json: Option<&str>,
4023 source: &str,
4024 detail_type: &str,
4025 detail: &str,
4026 ) -> bool {
4027 matches_pattern(
4028 pattern_json,
4029 source,
4030 detail_type,
4031 detail,
4032 "123456789012",
4033 "us-east-1",
4034 &[],
4035 )
4036 }
4037
4038 #[test]
4039 fn pattern_matches_source() {
4040 assert!(test_matches(
4041 Some(r#"{"source": ["my.app"]}"#),
4042 "my.app",
4043 "OrderPlaced",
4044 "{}"
4045 ));
4046 assert!(!test_matches(
4047 Some(r#"{"source": ["other.app"]}"#),
4048 "my.app",
4049 "OrderPlaced",
4050 "{}"
4051 ));
4052 }
4053
4054 #[test]
4055 fn pattern_matches_detail_type() {
4056 assert!(test_matches(
4057 Some(r#"{"detail-type": ["OrderPlaced"]}"#),
4058 "my.app",
4059 "OrderPlaced",
4060 "{}"
4061 ));
4062 assert!(!test_matches(
4063 Some(r#"{"detail-type": ["OrderShipped"]}"#),
4064 "my.app",
4065 "OrderPlaced",
4066 "{}"
4067 ));
4068 }
4069
4070 #[test]
4071 fn pattern_matches_detail_field() {
4072 assert!(test_matches(
4073 Some(r#"{"detail": {"status": ["ACTIVE"]}}"#),
4074 "my.app",
4075 "StatusChange",
4076 r#"{"status": "ACTIVE"}"#
4077 ));
4078 assert!(!test_matches(
4079 Some(r#"{"detail": {"status": ["ACTIVE"]}}"#),
4080 "my.app",
4081 "StatusChange",
4082 r#"{"status": "INACTIVE"}"#
4083 ));
4084 }
4085
4086 #[test]
4087 fn no_pattern_matches_everything() {
4088 assert!(test_matches(None, "any", "any", "{}"));
4089 }
4090
4091 #[test]
4092 fn combined_pattern() {
4093 let pattern = r#"{"source": ["orders"], "detail-type": ["OrderPlaced"]}"#;
4094 assert!(test_matches(Some(pattern), "orders", "OrderPlaced", "{}"));
4095 assert!(!test_matches(Some(pattern), "orders", "OrderShipped", "{}"));
4096 assert!(!test_matches(Some(pattern), "other", "OrderPlaced", "{}"));
4097 }
4098
4099 #[test]
4100 fn nested_detail_pattern() {
4101 let pattern = r#"{"detail": {"order": {"status": ["PLACED"]}}}"#;
4102 assert!(test_matches(
4103 Some(pattern),
4104 "my.app",
4105 "OrderEvent",
4106 r#"{"order": {"status": "PLACED", "id": "123"}}"#
4107 ));
4108 assert!(!test_matches(
4109 Some(pattern),
4110 "my.app",
4111 "OrderEvent",
4112 r#"{"order": {"status": "SHIPPED", "id": "123"}}"#
4113 ));
4114 assert!(!test_matches(
4115 Some(pattern),
4116 "my.app",
4117 "OrderEvent",
4118 r#"{"order": {"id": "123"}}"#
4119 ));
4120 }
4121
4122 #[test]
4123 fn deeply_nested_detail_pattern() {
4124 let pattern = r#"{"detail": {"a": {"b": {"c": ["deep"]}}}}"#;
4125 assert!(test_matches(
4126 Some(pattern),
4127 "src",
4128 "type",
4129 r#"{"a": {"b": {"c": "deep"}}}"#
4130 ));
4131 assert!(!test_matches(
4132 Some(pattern),
4133 "src",
4134 "type",
4135 r#"{"a": {"b": {"c": "shallow"}}}"#
4136 ));
4137 }
4138
4139 #[test]
4140 fn prefix_matcher() {
4141 let pattern = r#"{"source": [{"prefix": "com.myapp"}]}"#;
4142 assert!(test_matches(
4143 Some(pattern),
4144 "com.myapp.orders",
4145 "OrderPlaced",
4146 "{}"
4147 ));
4148 assert!(test_matches(
4149 Some(pattern),
4150 "com.myapp",
4151 "OrderPlaced",
4152 "{}"
4153 ));
4154 assert!(!test_matches(
4155 Some(pattern),
4156 "com.other",
4157 "OrderPlaced",
4158 "{}"
4159 ));
4160 }
4161
4162 #[test]
4163 fn prefix_matcher_in_detail() {
4164 let pattern = r#"{"detail": {"region": [{"prefix": "us-"}]}}"#;
4165 assert!(test_matches(
4166 Some(pattern),
4167 "src",
4168 "type",
4169 r#"{"region": "us-east-1"}"#
4170 ));
4171 assert!(!test_matches(
4172 Some(pattern),
4173 "src",
4174 "type",
4175 r#"{"region": "eu-west-1"}"#
4176 ));
4177 }
4178
4179 #[test]
4180 fn exists_matcher() {
4181 let pattern = r#"{"detail": {"error": [{"exists": true}]}}"#;
4182 assert!(test_matches(
4183 Some(pattern),
4184 "src",
4185 "type",
4186 r#"{"error": "something broke"}"#
4187 ));
4188 assert!(!test_matches(
4189 Some(pattern),
4190 "src",
4191 "type",
4192 r#"{"status": "ok"}"#
4193 ));
4194
4195 let pattern = r#"{"detail": {"error": [{"exists": false}]}}"#;
4196 assert!(test_matches(
4197 Some(pattern),
4198 "src",
4199 "type",
4200 r#"{"status": "ok"}"#
4201 ));
4202 assert!(!test_matches(
4203 Some(pattern),
4204 "src",
4205 "type",
4206 r#"{"error": "something broke"}"#
4207 ));
4208 }
4209
4210 #[test]
4211 fn anything_but_matcher() {
4212 let pattern = r#"{"source": [{"anything-but": "internal"}]}"#;
4213 assert!(test_matches(Some(pattern), "external", "Event", "{}"));
4214 assert!(!test_matches(Some(pattern), "internal", "Event", "{}"));
4215
4216 let pattern = r#"{"source": [{"anything-but": ["internal", "test"]}]}"#;
4217 assert!(test_matches(Some(pattern), "external", "Event", "{}"));
4218 assert!(!test_matches(Some(pattern), "internal", "Event", "{}"));
4219 assert!(!test_matches(Some(pattern), "test", "Event", "{}"));
4220 }
4221
4222 #[test]
4223 fn anything_but_in_detail() {
4224 let pattern = r#"{"detail": {"env": [{"anything-but": "prod"}]}}"#;
4225 assert!(test_matches(
4226 Some(pattern),
4227 "src",
4228 "type",
4229 r#"{"env": "staging"}"#
4230 ));
4231 assert!(!test_matches(
4232 Some(pattern),
4233 "src",
4234 "type",
4235 r#"{"env": "prod"}"#
4236 ));
4237 }
4238
4239 #[test]
4240 fn numeric_greater_than() {
4241 let pattern = r#"{"detail": {"count": [{"numeric": [">", 100]}]}}"#;
4242 assert!(test_matches(
4243 Some(pattern),
4244 "src",
4245 "type",
4246 r#"{"count": 150}"#
4247 ));
4248 assert!(!test_matches(
4249 Some(pattern),
4250 "src",
4251 "type",
4252 r#"{"count": 100}"#
4253 ));
4254 assert!(!test_matches(
4255 Some(pattern),
4256 "src",
4257 "type",
4258 r#"{"count": 50}"#
4259 ));
4260 }
4261
4262 #[test]
4263 fn numeric_less_than() {
4264 let pattern = r#"{"detail": {"count": [{"numeric": ["<", 10]}]}}"#;
4265 assert!(test_matches(
4266 Some(pattern),
4267 "src",
4268 "type",
4269 r#"{"count": 5}"#
4270 ));
4271 assert!(!test_matches(
4272 Some(pattern),
4273 "src",
4274 "type",
4275 r#"{"count": 10}"#
4276 ));
4277 assert!(!test_matches(
4278 Some(pattern),
4279 "src",
4280 "type",
4281 r#"{"count": 15}"#
4282 ));
4283 }
4284
4285 #[test]
4286 fn numeric_range() {
4287 let pattern = r#"{"detail": {"count": [{"numeric": [">=", 50, "<", 200]}]}}"#;
4288 assert!(test_matches(
4289 Some(pattern),
4290 "src",
4291 "type",
4292 r#"{"count": 50}"#
4293 ));
4294 assert!(test_matches(
4295 Some(pattern),
4296 "src",
4297 "type",
4298 r#"{"count": 100}"#
4299 ));
4300 assert!(!test_matches(
4301 Some(pattern),
4302 "src",
4303 "type",
4304 r#"{"count": 200}"#
4305 ));
4306 assert!(!test_matches(
4307 Some(pattern),
4308 "src",
4309 "type",
4310 r#"{"count": 49}"#
4311 ));
4312 }
4313
4314 #[test]
4315 fn mixed_matchers_and_literals() {
4316 let pattern = r#"{"source": ["exact.match", {"prefix": "com.myapp"}]}"#;
4317 assert!(test_matches(Some(pattern), "exact.match", "Event", "{}"));
4318 assert!(test_matches(
4319 Some(pattern),
4320 "com.myapp.orders",
4321 "Event",
4322 "{}"
4323 ));
4324 assert!(!test_matches(Some(pattern), "other.source", "Event", "{}"));
4325 }
4326
4327 use crate::state::EventBridgeState;
4330 use fakecloud_core::delivery::DeliveryBus;
4331 use parking_lot::RwLock;
4332
4333 fn make_service() -> EventBridgeService {
4334 let state = Arc::new(RwLock::new(EventBridgeState::new(
4335 "123456789012",
4336 "us-east-1",
4337 )));
4338 let delivery = Arc::new(DeliveryBus::new());
4339 EventBridgeService::new(state, delivery)
4340 }
4341
4342 fn make_request(action: &str, body: Value) -> AwsRequest {
4343 AwsRequest {
4344 service: "events".to_string(),
4345 action: action.to_string(),
4346 region: "us-east-1".to_string(),
4347 account_id: "123456789012".to_string(),
4348 request_id: "test-id".to_string(),
4349 headers: http::HeaderMap::new(),
4350 query_params: HashMap::new(),
4351 body: serde_json::to_vec(&body).unwrap().into(),
4352 path_segments: vec![],
4353 raw_path: "/".to_string(),
4354 raw_query: String::new(),
4355 method: http::Method::POST,
4356 is_query_protocol: false,
4357 access_key_id: None,
4358 }
4359 }
4360
4361 fn create_connection(svc: &EventBridgeService, name: &str) {
4362 let req = make_request(
4363 "CreateConnection",
4364 json!({
4365 "Name": name,
4366 "AuthorizationType": "API_KEY",
4367 "AuthParameters": {
4368 "ApiKeyAuthParameters": {
4369 "ApiKeyName": "x-api-key",
4370 "ApiKeyValue": "secret"
4371 }
4372 }
4373 }),
4374 );
4375 svc.create_connection(&req).unwrap();
4376 }
4377
4378 fn create_api_destination(svc: &EventBridgeService, name: &str, conn_name: &str) {
4379 let conn_arn_field = {
4380 let state = svc.state.read();
4381 state.connections.get(conn_name).unwrap().arn.clone()
4382 };
4383 let req = make_request(
4384 "CreateApiDestination",
4385 json!({
4386 "Name": name,
4387 "ConnectionArn": conn_arn_field,
4388 "InvocationEndpoint": "https://example.com",
4389 "HttpMethod": "POST"
4390 }),
4391 );
4392 svc.create_api_destination(&req).unwrap();
4393 }
4394
4395 #[test]
4398 fn list_connections_returns_all_by_default() {
4399 let svc = make_service();
4400 create_connection(&svc, "conn-alpha");
4401 create_connection(&svc, "conn-beta");
4402 create_connection(&svc, "conn-gamma");
4403
4404 let req = make_request("ListConnections", json!({}));
4405 let resp = svc.list_connections(&req).unwrap();
4406 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4407 assert_eq!(body["Connections"].as_array().unwrap().len(), 3);
4408 assert!(body["NextToken"].is_null());
4409 }
4410
4411 #[test]
4412 fn list_connections_name_prefix_filter() {
4413 let svc = make_service();
4414 create_connection(&svc, "prod-conn-1");
4415 create_connection(&svc, "prod-conn-2");
4416 create_connection(&svc, "dev-conn-1");
4417
4418 let req = make_request("ListConnections", json!({ "NamePrefix": "prod-" }));
4419 let resp = svc.list_connections(&req).unwrap();
4420 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4421 let names: Vec<&str> = body["Connections"]
4422 .as_array()
4423 .unwrap()
4424 .iter()
4425 .map(|c| c["Name"].as_str().unwrap())
4426 .collect();
4427 assert_eq!(names.len(), 2);
4428 assert!(names.iter().all(|n| n.starts_with("prod-")));
4429 }
4430
4431 #[test]
4432 fn list_connections_state_filter() {
4433 let svc = make_service();
4434 create_connection(&svc, "conn-a");
4435 create_connection(&svc, "conn-b");
4436
4437 {
4439 let mut state = svc.state.write();
4440 state
4441 .connections
4442 .get_mut("conn-b")
4443 .unwrap()
4444 .connection_state = "DEAUTHORIZED".to_string();
4445 }
4446
4447 let req = make_request(
4448 "ListConnections",
4449 json!({ "ConnectionState": "AUTHORIZED" }),
4450 );
4451 let resp = svc.list_connections(&req).unwrap();
4452 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4453 let conns = body["Connections"].as_array().unwrap();
4454 assert_eq!(conns.len(), 1);
4455 assert_eq!(conns[0]["Name"].as_str().unwrap(), "conn-a");
4456 }
4457
4458 #[test]
4459 fn list_connections_pagination() {
4460 let svc = make_service();
4461 for i in 0..5 {
4462 create_connection(&svc, &format!("conn-{i:02}"));
4463 }
4464
4465 let req = make_request("ListConnections", json!({ "Limit": 2 }));
4467 let resp = svc.list_connections(&req).unwrap();
4468 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4469 assert_eq!(body["Connections"].as_array().unwrap().len(), 2);
4470 let token = body["NextToken"].as_str().unwrap();
4471 assert_eq!(token, "2");
4472
4473 let req = make_request("ListConnections", json!({ "Limit": 2, "NextToken": token }));
4475 let resp = svc.list_connections(&req).unwrap();
4476 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4477 assert_eq!(body["Connections"].as_array().unwrap().len(), 2);
4478 let token = body["NextToken"].as_str().unwrap();
4479 assert_eq!(token, "4");
4480
4481 let req = make_request("ListConnections", json!({ "Limit": 2, "NextToken": token }));
4483 let resp = svc.list_connections(&req).unwrap();
4484 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4485 assert_eq!(body["Connections"].as_array().unwrap().len(), 1);
4486 assert!(body["NextToken"].is_null());
4487 }
4488
4489 #[test]
4490 fn list_connections_pagination_with_filter() {
4491 let svc = make_service();
4492 for i in 0..4 {
4493 create_connection(&svc, &format!("prod-{i:02}"));
4494 }
4495 create_connection(&svc, "dev-00");
4496
4497 let req = make_request(
4498 "ListConnections",
4499 json!({ "NamePrefix": "prod-", "Limit": 2 }),
4500 );
4501 let resp = svc.list_connections(&req).unwrap();
4502 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4503 assert_eq!(body["Connections"].as_array().unwrap().len(), 2);
4504 assert!(body["NextToken"].as_str().is_some());
4505 }
4506
4507 #[test]
4510 fn list_api_destinations_returns_all_by_default() {
4511 let svc = make_service();
4512 create_connection(&svc, "my-conn");
4513 create_api_destination(&svc, "dest-alpha", "my-conn");
4514 create_api_destination(&svc, "dest-beta", "my-conn");
4515
4516 let req = make_request("ListApiDestinations", json!({}));
4517 let resp = svc.list_api_destinations(&req).unwrap();
4518 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4519 assert_eq!(body["ApiDestinations"].as_array().unwrap().len(), 2);
4520 assert!(body["NextToken"].is_null());
4521 }
4522
4523 #[test]
4524 fn list_api_destinations_name_prefix_filter() {
4525 let svc = make_service();
4526 create_connection(&svc, "my-conn");
4527 create_api_destination(&svc, "prod-dest-1", "my-conn");
4528 create_api_destination(&svc, "prod-dest-2", "my-conn");
4529 create_api_destination(&svc, "dev-dest-1", "my-conn");
4530
4531 let req = make_request("ListApiDestinations", json!({ "NamePrefix": "prod-" }));
4532 let resp = svc.list_api_destinations(&req).unwrap();
4533 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4534 let names: Vec<&str> = body["ApiDestinations"]
4535 .as_array()
4536 .unwrap()
4537 .iter()
4538 .map(|d| d["Name"].as_str().unwrap())
4539 .collect();
4540 assert_eq!(names.len(), 2);
4541 assert!(names.iter().all(|n| n.starts_with("prod-")));
4542 }
4543
4544 #[test]
4545 fn list_api_destinations_connection_arn_filter() {
4546 let svc = make_service();
4547 create_connection(&svc, "conn-a");
4548 create_connection(&svc, "conn-b");
4549 create_api_destination(&svc, "dest-1", "conn-a");
4550 create_api_destination(&svc, "dest-2", "conn-b");
4551 create_api_destination(&svc, "dest-3", "conn-a");
4552
4553 let conn_a_arn = {
4554 let state = svc.state.read();
4555 state.connections.get("conn-a").unwrap().arn.clone()
4556 };
4557
4558 let req = make_request(
4559 "ListApiDestinations",
4560 json!({ "ConnectionArn": conn_a_arn }),
4561 );
4562 let resp = svc.list_api_destinations(&req).unwrap();
4563 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4564 let names: Vec<&str> = body["ApiDestinations"]
4565 .as_array()
4566 .unwrap()
4567 .iter()
4568 .map(|d| d["Name"].as_str().unwrap())
4569 .collect();
4570 assert_eq!(names.len(), 2);
4571 assert!(names.contains(&"dest-1"));
4572 assert!(names.contains(&"dest-3"));
4573 }
4574
4575 #[test]
4576 fn list_api_destinations_pagination() {
4577 let svc = make_service();
4578 create_connection(&svc, "my-conn");
4579 for i in 0..5 {
4580 create_api_destination(&svc, &format!("dest-{i:02}"), "my-conn");
4581 }
4582
4583 let req = make_request("ListApiDestinations", json!({ "Limit": 2 }));
4585 let resp = svc.list_api_destinations(&req).unwrap();
4586 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4587 assert_eq!(body["ApiDestinations"].as_array().unwrap().len(), 2);
4588 let token = body["NextToken"].as_str().unwrap();
4589 assert_eq!(token, "2");
4590
4591 let req = make_request(
4593 "ListApiDestinations",
4594 json!({ "Limit": 2, "NextToken": token }),
4595 );
4596 let resp = svc.list_api_destinations(&req).unwrap();
4597 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4598 assert_eq!(body["ApiDestinations"].as_array().unwrap().len(), 2);
4599 let token = body["NextToken"].as_str().unwrap();
4600 assert_eq!(token, "4");
4601
4602 let req = make_request(
4604 "ListApiDestinations",
4605 json!({ "Limit": 2, "NextToken": token }),
4606 );
4607 let resp = svc.list_api_destinations(&req).unwrap();
4608 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4609 assert_eq!(body["ApiDestinations"].as_array().unwrap().len(), 1);
4610 assert!(body["NextToken"].is_null());
4611 }
4612
4613 fn create_event_bus(svc: &EventBridgeService, name: &str) {
4616 let req = make_request("CreateEventBus", json!({ "Name": name }));
4617 svc.create_event_bus(&req).unwrap();
4618 }
4619
4620 #[test]
4621 fn list_event_buses_pagination() {
4622 let svc = make_service();
4623 for i in 0..4 {
4625 create_event_bus(&svc, &format!("bus-{i:02}"));
4626 }
4627
4628 let req = make_request("ListEventBuses", json!({ "Limit": 2 }));
4630 let resp = svc.list_event_buses(&req).unwrap();
4631 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4632 assert_eq!(body["EventBuses"].as_array().unwrap().len(), 2);
4633 let token = body["NextToken"].as_str().unwrap();
4634 assert_eq!(token, "2");
4635
4636 let req = make_request("ListEventBuses", json!({ "Limit": 2, "NextToken": token }));
4638 let resp = svc.list_event_buses(&req).unwrap();
4639 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4640 assert_eq!(body["EventBuses"].as_array().unwrap().len(), 2);
4641 let token = body["NextToken"].as_str().unwrap();
4642 assert_eq!(token, "4");
4643
4644 let req = make_request("ListEventBuses", json!({ "Limit": 2, "NextToken": token }));
4646 let resp = svc.list_event_buses(&req).unwrap();
4647 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4648 assert_eq!(body["EventBuses"].as_array().unwrap().len(), 1);
4649 assert!(body["NextToken"].is_null());
4650 }
4651
4652 #[test]
4653 fn list_event_buses_no_pagination_returns_all() {
4654 let svc = make_service();
4655 create_event_bus(&svc, "bus-alpha");
4656 create_event_bus(&svc, "bus-beta");
4657
4658 let req = make_request("ListEventBuses", json!({}));
4659 let resp = svc.list_event_buses(&req).unwrap();
4660 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4661 assert_eq!(body["EventBuses"].as_array().unwrap().len(), 3);
4663 assert!(body["NextToken"].is_null());
4664 }
4665
4666 #[test]
4669 fn put_events_never_includes_endpoint_id_in_response() {
4670 let svc = make_service();
4671 let req = make_request(
4673 "PutEvents",
4674 json!({
4675 "EndpointId": "my-endpoint.abc123",
4676 "Entries": [{
4677 "Source": "my.source",
4678 "DetailType": "MyType",
4679 "Detail": "{}",
4680 "EventBusName": "default"
4681 }]
4682 }),
4683 );
4684 let resp = svc.put_events(&req).unwrap();
4685 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4686 assert!(
4687 !body.as_object().unwrap().contains_key("EndpointId"),
4688 "EndpointId should never be in the PutEvents response"
4689 );
4690 assert_eq!(body["FailedEntryCount"], 0);
4691 }
4692
4693 fn create_archive(svc: &EventBridgeService, name: &str) {
4696 let req = make_request(
4697 "CreateArchive",
4698 json!({
4699 "ArchiveName": name,
4700 "EventSourceArn": "arn:aws:events:us-east-1:123456789012:event-bus/default"
4701 }),
4702 );
4703 svc.create_archive(&req).unwrap();
4704 }
4705
4706 #[test]
4707 fn list_archives_pagination() {
4708 let svc = make_service();
4709 for i in 0..5 {
4710 create_archive(&svc, &format!("archive-{i:02}"));
4711 }
4712
4713 let req = make_request("ListArchives", json!({ "Limit": 2 }));
4715 let resp = svc.list_archives(&req).unwrap();
4716 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4717 assert_eq!(body["Archives"].as_array().unwrap().len(), 2);
4718 let token = body["NextToken"].as_str().unwrap();
4719 assert_eq!(token, "2");
4720
4721 let req = make_request("ListArchives", json!({ "Limit": 2, "NextToken": token }));
4723 let resp = svc.list_archives(&req).unwrap();
4724 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4725 assert_eq!(body["Archives"].as_array().unwrap().len(), 2);
4726 let token = body["NextToken"].as_str().unwrap();
4727 assert_eq!(token, "4");
4728
4729 let req = make_request("ListArchives", json!({ "Limit": 2, "NextToken": token }));
4731 let resp = svc.list_archives(&req).unwrap();
4732 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4733 assert_eq!(body["Archives"].as_array().unwrap().len(), 1);
4734 assert!(body["NextToken"].is_null());
4735 }
4736
4737 fn create_replay(svc: &EventBridgeService, name: &str) {
4740 let archive_arn = {
4742 let state = svc.state.read();
4743 if state.archives.contains_key("replay-archive") {
4744 state.archives["replay-archive"].arn.clone()
4745 } else {
4746 drop(state);
4747 create_archive(svc, "replay-archive");
4748 svc.state.read().archives["replay-archive"].arn.clone()
4749 }
4750 };
4751 let req = make_request(
4752 "StartReplay",
4753 json!({
4754 "ReplayName": name,
4755 "EventSourceArn": archive_arn,
4756 "EventStartTime": 1000000.0,
4757 "EventEndTime": 2000000.0,
4758 "Destination": {
4759 "Arn": "arn:aws:events:us-east-1:123456789012:event-bus/default"
4760 }
4761 }),
4762 );
4763 svc.start_replay(&req).unwrap();
4764 }
4765
4766 #[test]
4767 fn list_replays_pagination() {
4768 let svc = make_service();
4769 for i in 0..5 {
4770 create_replay(&svc, &format!("replay-{i:02}"));
4771 }
4772
4773 let req = make_request("ListReplays", json!({ "Limit": 2 }));
4775 let resp = svc.list_replays(&req).unwrap();
4776 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4777 assert_eq!(body["Replays"].as_array().unwrap().len(), 2);
4778 let token = body["NextToken"].as_str().unwrap();
4779 assert_eq!(token, "2");
4780
4781 let req = make_request("ListReplays", json!({ "Limit": 2, "NextToken": token }));
4783 let resp = svc.list_replays(&req).unwrap();
4784 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4785 assert_eq!(body["Replays"].as_array().unwrap().len(), 2);
4786 let token = body["NextToken"].as_str().unwrap();
4787 assert_eq!(token, "4");
4788
4789 let req = make_request("ListReplays", json!({ "Limit": 2, "NextToken": token }));
4791 let resp = svc.list_replays(&req).unwrap();
4792 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4793 assert_eq!(body["Replays"].as_array().unwrap().len(), 1);
4794 assert!(body["NextToken"].is_null());
4795 }
4796
4797 #[test]
4798 fn list_event_buses_invalid_next_token_returns_error() {
4799 let svc = make_service();
4800
4801 let req = make_request("ListEventBuses", json!({ "NextToken": "not-a-number" }));
4802 let result = svc.list_event_buses(&req);
4803 assert!(
4804 result.is_err(),
4805 "non-numeric NextToken should return an error"
4806 );
4807 }
4808
4809 #[test]
4812 fn test_event_pattern_match() {
4813 let svc = make_service();
4814 let req = make_request(
4815 "TestEventPattern",
4816 json!({
4817 "EventPattern": r#"{"source": ["my.app"]}"#,
4818 "Event": r#"{"source": "my.app", "detail-type": "Test", "detail": {}}"#
4819 }),
4820 );
4821 let resp = svc.test_event_pattern(&req).unwrap();
4822 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4823 assert_eq!(body["Result"], true);
4824 }
4825
4826 #[test]
4827 fn test_event_pattern_no_match() {
4828 let svc = make_service();
4829 let req = make_request(
4830 "TestEventPattern",
4831 json!({
4832 "EventPattern": r#"{"source": ["other.app"]}"#,
4833 "Event": r#"{"source": "my.app", "detail-type": "Test", "detail": {}}"#
4834 }),
4835 );
4836 let resp = svc.test_event_pattern(&req).unwrap();
4837 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4838 assert_eq!(body["Result"], false);
4839 }
4840
4841 #[test]
4842 fn test_event_pattern_detail_match() {
4843 let svc = make_service();
4844 let req = make_request(
4845 "TestEventPattern",
4846 json!({
4847 "EventPattern": r#"{"detail": {"status": ["PLACED"]}}"#,
4848 "Event": r#"{"source": "my.app", "detail-type": "Order", "detail": {"status": "PLACED", "id": "123"}}"#
4849 }),
4850 );
4851 let resp = svc.test_event_pattern(&req).unwrap();
4852 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4853 assert_eq!(body["Result"], true);
4854 }
4855
4856 #[test]
4859 fn update_event_bus_description() {
4860 let svc = make_service();
4861 create_event_bus(&svc, "my-bus");
4862
4863 let req = make_request(
4864 "UpdateEventBus",
4865 json!({ "Name": "my-bus", "Description": "Updated desc" }),
4866 );
4867 let resp = svc.update_event_bus(&req).unwrap();
4868 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4869 assert_eq!(body["Name"], "my-bus");
4870
4871 let req = make_request("DescribeEventBus", json!({ "Name": "my-bus" }));
4873 let resp = svc.describe_event_bus(&req).unwrap();
4874 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4875 assert_eq!(body["Description"], "Updated desc");
4876 }
4877
4878 #[test]
4879 fn update_event_bus_not_found() {
4880 let svc = make_service();
4881 let req = make_request(
4882 "UpdateEventBus",
4883 json!({ "Name": "ghost-bus", "Description": "nope" }),
4884 );
4885 assert!(svc.update_event_bus(&req).is_err());
4886 }
4887
4888 fn create_endpoint_helper(svc: &EventBridgeService, name: &str) {
4891 let req = make_request(
4892 "CreateEndpoint",
4893 json!({
4894 "Name": name,
4895 "RoutingConfig": {
4896 "FailoverConfig": {
4897 "Primary": { "HealthCheck": "" },
4898 "Secondary": { "Route": "us-west-2" }
4899 }
4900 },
4901 "EventBuses": [
4902 { "EventBusArn": "arn:aws:events:us-east-1:123456789012:event-bus/default" }
4903 ]
4904 }),
4905 );
4906 svc.create_endpoint(&req).unwrap();
4907 }
4908
4909 #[test]
4910 fn endpoint_create_describe_delete() {
4911 let svc = make_service();
4912 create_endpoint_helper(&svc, "my-endpoint");
4913
4914 let req = make_request("DescribeEndpoint", json!({ "Name": "my-endpoint" }));
4916 let resp = svc.describe_endpoint(&req).unwrap();
4917 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4918 assert_eq!(body["Name"], "my-endpoint");
4919 assert_eq!(body["State"], "ACTIVE");
4920 assert!(body["EndpointId"].as_str().unwrap().contains("my-endpoint"));
4921
4922 let req = make_request("DeleteEndpoint", json!({ "Name": "my-endpoint" }));
4924 svc.delete_endpoint(&req).unwrap();
4925
4926 let req = make_request("DescribeEndpoint", json!({ "Name": "my-endpoint" }));
4928 assert!(svc.describe_endpoint(&req).is_err());
4929 }
4930
4931 #[test]
4932 fn endpoint_list_and_update() {
4933 let svc = make_service();
4934 create_endpoint_helper(&svc, "ep-alpha");
4935 create_endpoint_helper(&svc, "ep-beta");
4936
4937 let req = make_request("ListEndpoints", json!({}));
4939 let resp = svc.list_endpoints(&req).unwrap();
4940 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4941 assert_eq!(body["Endpoints"].as_array().unwrap().len(), 2);
4942
4943 let req = make_request(
4945 "UpdateEndpoint",
4946 json!({ "Name": "ep-alpha", "Description": "updated" }),
4947 );
4948 let resp = svc.update_endpoint(&req).unwrap();
4949 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4950 assert_eq!(body["Name"], "ep-alpha");
4951
4952 let req = make_request("DescribeEndpoint", json!({ "Name": "ep-alpha" }));
4954 let resp = svc.describe_endpoint(&req).unwrap();
4955 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4956 assert_eq!(body["Description"], "updated");
4957 }
4958
4959 #[test]
4960 fn endpoint_duplicate_fails() {
4961 let svc = make_service();
4962 create_endpoint_helper(&svc, "dup-ep");
4963 let req = make_request(
4964 "CreateEndpoint",
4965 json!({
4966 "Name": "dup-ep",
4967 "RoutingConfig": {},
4968 "EventBuses": []
4969 }),
4970 );
4971 assert!(svc.create_endpoint(&req).is_err());
4972 }
4973
4974 #[test]
4977 fn deauthorize_connection_sets_state() {
4978 let svc = make_service();
4979 create_connection(&svc, "deauth-conn");
4980
4981 let req = make_request("DeauthorizeConnection", json!({ "Name": "deauth-conn" }));
4982 let resp = svc.deauthorize_connection(&req).unwrap();
4983 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4984 assert_eq!(body["ConnectionState"], "DEAUTHORIZING");
4985 assert!(body["ConnectionArn"]
4986 .as_str()
4987 .unwrap()
4988 .contains("deauth-conn"));
4989
4990 let req = make_request("DescribeConnection", json!({ "Name": "deauth-conn" }));
4992 let resp = svc.describe_connection(&req).unwrap();
4993 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4994 assert_eq!(body["ConnectionState"], "DEAUTHORIZING");
4995 }
4996
4997 #[test]
4998 fn deauthorize_connection_not_found() {
4999 let svc = make_service();
5000 let req = make_request("DeauthorizeConnection", json!({ "Name": "ghost-conn" }));
5001 assert!(svc.deauthorize_connection(&req).is_err());
5002 }
5003
5004 #[test]
5007 fn partner_event_source_crud() {
5008 let svc = make_service();
5009
5010 let req = make_request(
5012 "CreatePartnerEventSource",
5013 json!({ "Name": "partner/test", "Account": "123456789012" }),
5014 );
5015 svc.create_partner_event_source(&req).unwrap();
5016
5017 let req = make_request(
5019 "DescribePartnerEventSource",
5020 json!({ "Name": "partner/test" }),
5021 );
5022 let resp = svc.describe_partner_event_source(&req).unwrap();
5023 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5024 assert_eq!(body["Name"], "partner/test");
5025
5026 let req = make_request("ListPartnerEventSources", json!({"NamePrefix": "partner/"}));
5028 let resp = svc.list_partner_event_sources(&req).unwrap();
5029 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5030 assert_eq!(body["PartnerEventSources"].as_array().unwrap().len(), 1);
5031
5032 let req = make_request(
5034 "ListPartnerEventSourceAccounts",
5035 json!({ "EventSourceName": "partner/test" }),
5036 );
5037 let resp = svc.list_partner_event_source_accounts(&req).unwrap();
5038 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5039 assert_eq!(
5040 body["PartnerEventSourceAccounts"].as_array().unwrap().len(),
5041 1
5042 );
5043
5044 let req = make_request("DescribeEventSource", json!({ "Name": "partner/test" }));
5046 let resp = svc.describe_event_source(&req).unwrap();
5047 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5048 assert_eq!(body["Name"], "partner/test");
5049 assert_eq!(body["State"], "ACTIVE");
5050
5051 let req = make_request("ListEventSources", json!({}));
5053 let resp = svc.list_event_sources(&req).unwrap();
5054 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5055 assert_eq!(body["EventSources"].as_array().unwrap().len(), 1);
5056
5057 let req = make_request(
5059 "DeletePartnerEventSource",
5060 json!({ "Name": "partner/test", "Account": "123456789012" }),
5061 );
5062 svc.delete_partner_event_source(&req).unwrap();
5063
5064 let req = make_request(
5066 "DescribePartnerEventSource",
5067 json!({ "Name": "partner/test" }),
5068 );
5069 assert!(svc.describe_partner_event_source(&req).is_err());
5070 }
5071
5072 #[test]
5073 fn activate_deactivate_event_source() {
5074 let svc = make_service();
5075
5076 let req = make_request(
5078 "CreatePartnerEventSource",
5079 json!({ "Name": "aws.partner/test", "Account": "123456789012" }),
5080 );
5081 svc.create_partner_event_source(&req).unwrap();
5082
5083 let req = make_request(
5085 "DeactivateEventSource",
5086 json!({ "Name": "aws.partner/test" }),
5087 );
5088 svc.deactivate_event_source(&req).unwrap();
5089 {
5090 let state = svc.state.read();
5091 assert_eq!(
5092 state.partner_event_sources["aws.partner/test"].state,
5093 "INACTIVE"
5094 );
5095 }
5096
5097 let req = make_request("ActivateEventSource", json!({ "Name": "aws.partner/test" }));
5099 svc.activate_event_source(&req).unwrap();
5100 {
5101 let state = svc.state.read();
5102 assert_eq!(
5103 state.partner_event_sources["aws.partner/test"].state,
5104 "ACTIVE"
5105 );
5106 }
5107
5108 let req = make_request("ActivateEventSource", json!({ "Name": "nonexistent" }));
5110 assert!(svc.activate_event_source(&req).is_err());
5111
5112 let req = make_request("DeactivateEventSource", json!({ "Name": "nonexistent" }));
5113 assert!(svc.deactivate_event_source(&req).is_err());
5114 }
5115
5116 #[test]
5117 fn delete_partner_event_source_verifies_account() {
5118 let svc = make_service();
5119
5120 let req = make_request(
5122 "CreatePartnerEventSource",
5123 json!({ "Name": "aws.partner/test", "Account": "123456789012" }),
5124 );
5125 svc.create_partner_event_source(&req).unwrap();
5126
5127 let req = make_request(
5129 "DeletePartnerEventSource",
5130 json!({ "Name": "aws.partner/test", "Account": "999999999999" }),
5131 );
5132 assert!(svc.delete_partner_event_source(&req).is_err());
5133 assert!(svc
5135 .state
5136 .read()
5137 .partner_event_sources
5138 .contains_key("aws.partner/test"));
5139
5140 let req = make_request(
5142 "DeletePartnerEventSource",
5143 json!({ "Name": "aws.partner/test", "Account": "123456789012" }),
5144 );
5145 svc.delete_partner_event_source(&req).unwrap();
5146 assert!(!svc
5147 .state
5148 .read()
5149 .partner_event_sources
5150 .contains_key("aws.partner/test"));
5151
5152 let req = make_request(
5154 "DeletePartnerEventSource",
5155 json!({ "Name": "aws.partner/test", "Account": "123456789012" }),
5156 );
5157 assert!(svc.delete_partner_event_source(&req).is_err());
5158 }
5159
5160 #[test]
5161 fn put_partner_events() {
5162 let svc = make_service();
5163 let req = make_request(
5164 "PutPartnerEvents",
5165 json!({
5166 "Entries": [
5167 { "Source": "partner.app", "DetailType": "Test", "Detail": "{}" }
5168 ]
5169 }),
5170 );
5171 let resp = svc.put_partner_events(&req).unwrap();
5172 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5173 assert_eq!(body["FailedEntryCount"], 0);
5174 assert_eq!(body["Entries"].as_array().unwrap().len(), 1);
5175 assert!(body["Entries"][0]["EventId"].as_str().is_some());
5176 }
5177
5178 #[allow(clippy::type_complexity)]
5182 fn make_service_with_sqs_recorder() -> (
5183 EventBridgeService,
5184 Arc<parking_lot::Mutex<Vec<(String, String)>>>,
5185 ) {
5186 use fakecloud_core::delivery::SqsDelivery;
5187
5188 struct RecordingSqsDelivery {
5189 messages: Arc<parking_lot::Mutex<Vec<(String, String)>>>,
5190 }
5191
5192 impl SqsDelivery for RecordingSqsDelivery {
5193 fn deliver_to_queue(
5194 &self,
5195 queue_arn: &str,
5196 message_body: &str,
5197 _attributes: &HashMap<String, String>,
5198 ) {
5199 self.messages
5200 .lock()
5201 .push((queue_arn.to_string(), message_body.to_string()));
5202 }
5203 }
5204
5205 let messages: Arc<parking_lot::Mutex<Vec<(String, String)>>> =
5206 Arc::new(parking_lot::Mutex::new(Vec::new()));
5207 let state = Arc::new(RwLock::new(EventBridgeState::new(
5208 "123456789012",
5209 "us-east-1",
5210 )));
5211 let delivery = Arc::new(DeliveryBus::new().with_sqs(Arc::new(RecordingSqsDelivery {
5212 messages: messages.clone(),
5213 })));
5214 let svc = EventBridgeService::new(state, delivery);
5215 (svc, messages)
5216 }
5217
5218 #[test]
5219 fn start_replay_delivers_archived_events_to_sqs_target() {
5220 let (svc, messages) = make_service_with_sqs_recorder();
5221 let queue_arn = "arn:aws:sqs:us-east-1:123456789012:replay-queue";
5222
5223 let req = make_request(
5225 "PutRule",
5226 json!({
5227 "Name": "replay-test-rule",
5228 "EventPattern": r#"{"source": ["my.app"]}"#,
5229 "State": "ENABLED"
5230 }),
5231 );
5232 svc.put_rule(&req).unwrap();
5233
5234 let req = make_request(
5235 "PutTargets",
5236 json!({
5237 "Rule": "replay-test-rule",
5238 "Targets": [{
5239 "Id": "sqs-target",
5240 "Arn": queue_arn
5241 }]
5242 }),
5243 );
5244 svc.put_targets(&req).unwrap();
5245
5246 let req = make_request(
5248 "CreateArchive",
5249 json!({
5250 "ArchiveName": "test-archive",
5251 "EventSourceArn": "arn:aws:events:us-east-1:123456789012:event-bus/default"
5252 }),
5253 );
5254 svc.create_archive(&req).unwrap();
5255
5256 let req = make_request(
5258 "PutEvents",
5259 json!({
5260 "Entries": [
5261 {
5262 "Source": "my.app",
5263 "DetailType": "OrderCreated",
5264 "Detail": "{\"orderId\": \"1\"}",
5265 "EventBusName": "default"
5266 },
5267 {
5268 "Source": "my.app",
5269 "DetailType": "OrderShipped",
5270 "Detail": "{\"orderId\": \"2\"}",
5271 "EventBusName": "default"
5272 }
5273 ]
5274 }),
5275 );
5276 let resp = svc.put_events(&req).unwrap();
5277 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5278 assert_eq!(body["FailedEntryCount"], 0);
5279
5280 {
5282 let state = svc.state.read();
5283 let archive = state.archives.get("test-archive").unwrap();
5284 assert_eq!(archive.events.len(), 2);
5285 assert_eq!(archive.event_count, 2);
5286 }
5287
5288 messages.lock().clear();
5290
5291 let archive_arn = {
5293 let state = svc.state.read();
5294 state.archives.get("test-archive").unwrap().arn.clone()
5295 };
5296
5297 let start_ts = 0.0_f64;
5299 let end_ts = (chrono::Utc::now().timestamp() + 3600) as f64;
5300
5301 let req = make_request(
5302 "StartReplay",
5303 json!({
5304 "ReplayName": "my-replay",
5305 "EventSourceArn": archive_arn,
5306 "Destination": {
5307 "Arn": "arn:aws:events:us-east-1:123456789012:event-bus/default"
5308 },
5309 "EventStartTime": start_ts,
5310 "EventEndTime": end_ts
5311 }),
5312 );
5313 let resp = svc.start_replay(&req).unwrap();
5314 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5315 assert_eq!(body["State"], "STARTING");
5316
5317 let delivered = messages.lock();
5319 assert_eq!(
5320 delivered.len(),
5321 2,
5322 "expected 2 replayed events delivered to SQS"
5323 );
5324 for (arn, msg) in delivered.iter() {
5325 assert_eq!(arn, queue_arn);
5326 let event: Value = serde_json::from_str(msg).unwrap();
5327 assert_eq!(event["source"], "my.app");
5328 assert!(event["replay-name"].as_str().is_some());
5330 }
5331
5332 let state = svc.state.read();
5334 let replay = state.replays.get("my-replay").unwrap();
5335 assert_eq!(replay.state, "COMPLETED");
5336 }
5337
5338 #[test]
5339 fn apply_connection_auth_api_key() {
5340 let conn = Connection {
5341 name: "test-conn".to_string(),
5342 arn: "arn:aws:events:us-east-1:123456789012:connection/test-conn/uuid".to_string(),
5343 description: None,
5344 authorization_type: "API_KEY".to_string(),
5345 auth_parameters: json!({
5346 "ApiKeyAuthParameters": {
5347 "ApiKeyName": "x-api-key",
5348 "ApiKeyValue": "my-secret"
5349 }
5350 }),
5351 connection_state: "AUTHORIZED".to_string(),
5352 secret_arn: "arn:aws:secretsmanager:us-east-1:123456789012:secret:test".to_string(),
5353 creation_time: Utc::now(),
5354 last_modified_time: Utc::now(),
5355 last_authorized_time: Utc::now(),
5356 };
5357
5358 let client = reqwest::Client::new();
5359 let builder = client
5360 .post("http://localhost:12345/test")
5361 .header("Content-Type", "application/json");
5362 let builder = apply_connection_auth(builder, &conn);
5363
5364 let request = builder.body("{}").build().unwrap();
5366 assert_eq!(
5367 request
5368 .headers()
5369 .get("x-api-key")
5370 .unwrap()
5371 .to_str()
5372 .unwrap(),
5373 "my-secret"
5374 );
5375 }
5376
5377 #[test]
5378 fn apply_connection_auth_basic() {
5379 let conn = Connection {
5380 name: "basic-conn".to_string(),
5381 arn: "arn:aws:events:us-east-1:123456789012:connection/basic-conn/uuid".to_string(),
5382 description: None,
5383 authorization_type: "BASIC".to_string(),
5384 auth_parameters: json!({
5385 "BasicAuthParameters": {
5386 "Username": "user",
5387 "Password": "pass"
5388 }
5389 }),
5390 connection_state: "AUTHORIZED".to_string(),
5391 secret_arn: "arn:aws:secretsmanager:us-east-1:123456789012:secret:test".to_string(),
5392 creation_time: Utc::now(),
5393 last_modified_time: Utc::now(),
5394 last_authorized_time: Utc::now(),
5395 };
5396
5397 let client = reqwest::Client::new();
5398 let builder = client.post("http://localhost:12345/test");
5399 let builder = apply_connection_auth(builder, &conn);
5400
5401 let request = builder.body("{}").build().unwrap();
5402 let auth_header = request
5403 .headers()
5404 .get("authorization")
5405 .unwrap()
5406 .to_str()
5407 .unwrap();
5408 assert!(
5409 auth_header.starts_with("Basic "),
5410 "Expected Basic auth header, got: {auth_header}"
5411 );
5412 }
5413
5414 #[tokio::test]
5415 async fn put_events_with_api_destination_target_resolves_destination() {
5416 let state = Arc::new(RwLock::new(EventBridgeState::new(
5420 "123456789012",
5421 "us-east-1",
5422 )));
5423 let delivery = Arc::new(DeliveryBus::new());
5424 let svc = EventBridgeService::new(state, delivery);
5425
5426 create_connection(&svc, "my-conn");
5428 let conn_arn = {
5429 let state = svc.state.read();
5430 state.connections.get("my-conn").unwrap().arn.clone()
5431 };
5432 let req = make_request(
5433 "CreateApiDestination",
5434 json!({
5435 "Name": "my-dest",
5436 "ConnectionArn": conn_arn,
5437 "InvocationEndpoint": "http://127.0.0.1:1/noop",
5438 "HttpMethod": "POST"
5439 }),
5440 );
5441 svc.create_api_destination(&req).unwrap();
5442
5443 let dest_arn = {
5444 let state = svc.state.read();
5445 state.api_destinations.get("my-dest").unwrap().arn.clone()
5446 };
5447
5448 let req = make_request(
5450 "PutRule",
5451 json!({
5452 "Name": "api-dest-rule",
5453 "EventPattern": r#"{"source":["test.app"]}"#,
5454 "State": "ENABLED"
5455 }),
5456 );
5457 svc.put_rule(&req).unwrap();
5458
5459 let req = make_request(
5460 "PutTargets",
5461 json!({
5462 "Rule": "api-dest-rule",
5463 "Targets": [{ "Id": "dest-target", "Arn": dest_arn }]
5464 }),
5465 );
5466 svc.put_targets(&req).unwrap();
5467
5468 let req = make_request(
5470 "PutEvents",
5471 json!({
5472 "Entries": [{
5473 "Source": "test.app",
5474 "DetailType": "TestEvent",
5475 "Detail": r#"{"key":"value"}"#
5476 }]
5477 }),
5478 );
5479 let resp = svc.put_events(&req).unwrap();
5480 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5481 assert_eq!(body["FailedEntryCount"], 0);
5482 assert_eq!(body["Entries"].as_array().unwrap().len(), 1);
5483 assert!(body["Entries"][0]["EventId"].as_str().is_some());
5484 }
5485
5486 #[test]
5487 fn test_function_name_from_arn() {
5488 assert_eq!(
5490 super::function_name_from_arn("arn:aws:lambda:us-east-1:123456789012:function:my-func"),
5491 "my-func"
5492 );
5493 assert_eq!(
5495 super::function_name_from_arn(
5496 "arn:aws:lambda:us-east-1:123456789012:function:my-func:prod"
5497 ),
5498 "my-func"
5499 );
5500 assert_eq!(
5502 super::function_name_from_arn(
5503 "arn:aws:lambda:us-east-1:123456789012:function:my-func:42"
5504 ),
5505 "my-func"
5506 );
5507 assert_eq!(super::function_name_from_arn("my-func"), "my-func");
5509 }
5510}