1use async_trait::async_trait;
2use chrono::{DateTime, Utc};
3use http::StatusCode;
4use serde_json::{json, Value};
5
6use std::collections::HashMap;
7use std::sync::Arc;
8
9use tokio::sync::Mutex as AsyncMutex;
10
11use fakecloud_aws::arn::Arn;
12use fakecloud_core::delivery::DeliveryBus;
13use fakecloud_core::pagination::paginate;
14use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
15use fakecloud_core::validation::*;
16use fakecloud_persistence::SnapshotStore;
17
18use fakecloud_lambda::runtime::ContainerRuntime;
19use fakecloud_lambda::state::{LambdaInvocation, SharedLambdaState};
20use fakecloud_logs::state::SharedLogsState;
21
22use crate::state::{
23 ApiDestination, Archive, Connection, Endpoint, EventBridgeSnapshot, EventBridgeState, EventBus,
24 EventRule, EventTarget, PartnerEventSource, PutEvent, Replay, SharedEventBridgeState,
25 EVENTBRIDGE_SNAPSHOT_SCHEMA_VERSION,
26};
27
28fn validate_put_events_entry(source: &str, detail_type: &str, detail: &str) -> Result<(), Value> {
33 if source.is_empty() {
34 return Err(json!({
35 "ErrorCode": "InvalidArgument",
36 "ErrorMessage": "Parameter Source is not valid. Reason: Source is a required argument.",
37 }));
38 }
39 if detail_type.is_empty() {
40 return Err(json!({
41 "ErrorCode": "InvalidArgument",
42 "ErrorMessage": "Parameter DetailType is not valid. Reason: DetailType is a required argument.",
43 }));
44 }
45 if detail.is_empty() {
46 return Err(json!({
47 "ErrorCode": "InvalidArgument",
48 "ErrorMessage": "Parameter Detail is not valid. Reason: Detail is a required argument.",
49 }));
50 }
51 if serde_json::from_str::<Value>(detail).is_err() {
52 return Err(json!({
53 "ErrorCode": "MalformedDetail",
54 "ErrorMessage": "Detail is malformed.",
55 }));
56 }
57 Ok(())
58}
59
60fn parse_put_events_time(raw: &Value) -> DateTime<Utc> {
65 if let Some(s) = raw.as_str() {
66 return DateTime::parse_from_rfc3339(s)
67 .map(|dt| dt.with_timezone(&Utc))
68 .unwrap_or_else(|_| Utc::now());
69 }
70 if let Some(ts) = raw.as_f64() {
71 return DateTime::from_timestamp(ts as i64, ((ts.fract()) * 1_000_000_000.0) as u32)
72 .unwrap_or_else(Utc::now);
73 }
74 if let Some(ts) = raw.as_i64() {
75 return DateTime::from_timestamp(ts, 0).unwrap_or_else(Utc::now);
76 }
77 Utc::now()
78}
79
80fn is_mutating_action(action: &str) -> bool {
82 matches!(
83 action,
84 "CreateEventBus"
85 | "DeleteEventBus"
86 | "UpdateEventBus"
87 | "PutRule"
88 | "DeleteRule"
89 | "EnableRule"
90 | "DisableRule"
91 | "PutTargets"
92 | "RemoveTargets"
93 | "PutEvents"
94 | "PutPermission"
95 | "RemovePermission"
96 | "TagResource"
97 | "UntagResource"
98 | "CreateArchive"
99 | "UpdateArchive"
100 | "DeleteArchive"
101 | "CreateConnection"
102 | "UpdateConnection"
103 | "DeleteConnection"
104 | "DeauthorizeConnection"
105 | "CreateApiDestination"
106 | "UpdateApiDestination"
107 | "DeleteApiDestination"
108 | "StartReplay"
109 | "CancelReplay"
110 | "CreatePartnerEventSource"
111 | "DeletePartnerEventSource"
112 | "ActivateEventSource"
113 | "DeactivateEventSource"
114 | "PutPartnerEvents"
115 | "CreateEndpoint"
116 | "DeleteEndpoint"
117 | "UpdateEndpoint"
118 )
119}
120
121pub struct EventBridgeService {
122 state: SharedEventBridgeState,
123 delivery: Arc<DeliveryBus>,
124 lambda_state: Option<SharedLambdaState>,
125 logs_state: Option<SharedLogsState>,
126 container_runtime: Option<Arc<ContainerRuntime>>,
127 snapshot_store: Option<Arc<dyn SnapshotStore>>,
128 snapshot_lock: Arc<AsyncMutex<()>>,
129}
130
131impl EventBridgeService {
132 pub fn new(state: SharedEventBridgeState, delivery: Arc<DeliveryBus>) -> Self {
133 Self {
134 state,
135 delivery,
136 lambda_state: None,
137 logs_state: None,
138 container_runtime: None,
139 snapshot_store: None,
140 snapshot_lock: Arc::new(AsyncMutex::new(())),
141 }
142 }
143
144 pub fn with_lambda(mut self, lambda_state: SharedLambdaState) -> Self {
145 self.lambda_state = Some(lambda_state);
146 self
147 }
148
149 pub fn with_logs(mut self, logs_state: SharedLogsState) -> Self {
150 self.logs_state = Some(logs_state);
151 self
152 }
153
154 pub fn with_runtime(mut self, runtime: Arc<ContainerRuntime>) -> Self {
155 self.container_runtime = Some(runtime);
156 self
157 }
158
159 pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
160 self.snapshot_store = Some(store);
161 self
162 }
163
164 async fn save_snapshot(&self) {
168 let Some(store) = self.snapshot_store.clone() else {
169 return;
170 };
171 let _guard = self.snapshot_lock.lock().await;
172 let snapshot = EventBridgeSnapshot {
173 schema_version: EVENTBRIDGE_SNAPSHOT_SCHEMA_VERSION,
174 accounts: Some(self.state.read().clone()),
175 state: None,
176 };
177 let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
178 let bytes = serde_json::to_vec(&snapshot)
179 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
180 store.save(&bytes)
181 })
182 .await;
183 match join {
184 Ok(Ok(())) => {}
185 Ok(Err(err)) => tracing::error!(%err, "failed to write eventbridge snapshot"),
186 Err(err) => tracing::error!(%err, "eventbridge snapshot task panicked"),
187 }
188 }
189}
190
191#[async_trait]
192impl AwsService for EventBridgeService {
193 fn service_name(&self) -> &str {
194 "events"
195 }
196
197 async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
198 let mutates = is_mutating_action(req.action.as_str());
199 let result = match req.action.as_str() {
200 "CreateEventBus" => self.create_event_bus(&req),
201 "DeleteEventBus" => self.delete_event_bus(&req),
202 "ListEventBuses" => self.list_event_buses(&req),
203 "DescribeEventBus" => self.describe_event_bus(&req),
204 "PutRule" => self.put_rule(&req),
205 "DeleteRule" => self.delete_rule(&req),
206 "ListRules" => self.list_rules(&req),
207 "DescribeRule" => self.describe_rule(&req),
208 "EnableRule" => self.enable_rule(&req),
209 "DisableRule" => self.disable_rule(&req),
210 "PutTargets" => self.put_targets(&req),
211 "RemoveTargets" => self.remove_targets(&req),
212 "ListTargetsByRule" => self.list_targets_by_rule(&req),
213 "ListRuleNamesByTarget" => self.list_rule_names_by_target(&req),
214 "PutEvents" => self.put_events(&req),
215 "PutPermission" => self.put_permission(&req),
216 "RemovePermission" => self.remove_permission(&req),
217 "TagResource" => self.tag_resource(&req),
218 "UntagResource" => self.untag_resource(&req),
219 "ListTagsForResource" => self.list_tags_for_resource(&req),
220 "CreateArchive" => self.create_archive(&req),
221 "DescribeArchive" => self.describe_archive(&req),
222 "ListArchives" => self.list_archives(&req),
223 "UpdateArchive" => self.update_archive(&req),
224 "DeleteArchive" => self.delete_archive(&req),
225 "CreateConnection" => self.create_connection(&req),
226 "DescribeConnection" => self.describe_connection(&req),
227 "ListConnections" => self.list_connections(&req),
228 "UpdateConnection" => self.update_connection(&req),
229 "DeleteConnection" => self.delete_connection(&req),
230 "CreateApiDestination" => self.create_api_destination(&req),
231 "DescribeApiDestination" => self.describe_api_destination(&req),
232 "ListApiDestinations" => self.list_api_destinations(&req),
233 "UpdateApiDestination" => self.update_api_destination(&req),
234 "DeleteApiDestination" => self.delete_api_destination(&req),
235 "StartReplay" => self.start_replay(&req),
236 "DescribeReplay" => self.describe_replay(&req),
237 "ListReplays" => self.list_replays(&req),
238 "CancelReplay" => self.cancel_replay(&req),
239 "CreatePartnerEventSource" => self.create_partner_event_source(&req),
240 "DeletePartnerEventSource" => self.delete_partner_event_source(&req),
241 "DescribePartnerEventSource" => self.describe_partner_event_source(&req),
242 "ListPartnerEventSources" => self.list_partner_event_sources(&req),
243 "ListPartnerEventSourceAccounts" => self.list_partner_event_source_accounts(&req),
244 "ActivateEventSource" => self.activate_event_source(&req),
245 "DeactivateEventSource" => self.deactivate_event_source(&req),
246 "DescribeEventSource" => self.describe_event_source(&req),
247 "ListEventSources" => self.list_event_sources(&req),
248 "PutPartnerEvents" => self.put_partner_events(&req),
249 "TestEventPattern" => self.test_event_pattern(&req),
250 "UpdateEventBus" => self.update_event_bus(&req),
251 "CreateEndpoint" => self.create_endpoint(&req),
252 "DeleteEndpoint" => self.delete_endpoint(&req),
253 "DescribeEndpoint" => self.describe_endpoint(&req),
254 "ListEndpoints" => self.list_endpoints(&req),
255 "UpdateEndpoint" => self.update_endpoint(&req),
256 "DeauthorizeConnection" => self.deauthorize_connection(&req),
257 _ => Err(AwsServiceError::action_not_implemented(
258 "events",
259 &req.action,
260 )),
261 };
262 if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
263 self.save_snapshot().await;
264 }
265 result
266 }
267
268 fn supported_actions(&self) -> &[&str] {
269 &[
270 "CreateEventBus",
271 "DeleteEventBus",
272 "ListEventBuses",
273 "DescribeEventBus",
274 "PutRule",
275 "DeleteRule",
276 "ListRules",
277 "DescribeRule",
278 "EnableRule",
279 "DisableRule",
280 "PutTargets",
281 "RemoveTargets",
282 "ListTargetsByRule",
283 "ListRuleNamesByTarget",
284 "PutEvents",
285 "PutPermission",
286 "RemovePermission",
287 "TagResource",
288 "UntagResource",
289 "ListTagsForResource",
290 "CreateArchive",
291 "DescribeArchive",
292 "ListArchives",
293 "UpdateArchive",
294 "DeleteArchive",
295 "CreateConnection",
296 "DescribeConnection",
297 "ListConnections",
298 "UpdateConnection",
299 "DeleteConnection",
300 "CreateApiDestination",
301 "DescribeApiDestination",
302 "ListApiDestinations",
303 "UpdateApiDestination",
304 "DeleteApiDestination",
305 "StartReplay",
306 "DescribeReplay",
307 "ListReplays",
308 "CancelReplay",
309 "CreatePartnerEventSource",
310 "DeletePartnerEventSource",
311 "DescribePartnerEventSource",
312 "ListPartnerEventSources",
313 "ListPartnerEventSourceAccounts",
314 "ActivateEventSource",
315 "DeactivateEventSource",
316 "DescribeEventSource",
317 "ListEventSources",
318 "PutPartnerEvents",
319 "TestEventPattern",
320 "UpdateEventBus",
321 "CreateEndpoint",
322 "DeleteEndpoint",
323 "DescribeEndpoint",
324 "ListEndpoints",
325 "UpdateEndpoint",
326 "DeauthorizeConnection",
327 ]
328 }
329}
330
331fn parse_tags(body: &Value) -> HashMap<String, String> {
332 let mut tags = HashMap::new();
333 if let Some(arr) = body["Tags"].as_array() {
334 for tag in arr {
335 if let (Some(key), Some(val)) = (tag["Key"].as_str(), tag["Value"].as_str()) {
336 tags.insert(key.to_string(), val.to_string());
337 }
338 }
339 }
340 tags
341}
342
343fn parse_target(target: &Value) -> EventTarget {
344 EventTarget {
345 id: target["Id"].as_str().unwrap_or("").to_string(),
346 arn: target["Arn"].as_str().unwrap_or("").to_string(),
347 input: target["Input"].as_str().map(|s| s.to_string()),
348 input_path: target["InputPath"].as_str().map(|s| s.to_string()),
349 input_transformer: target.get("InputTransformer").cloned(),
350 sqs_parameters: target.get("SqsParameters").cloned(),
351 }
352}
353
354fn target_to_json(t: &EventTarget) -> Value {
355 let mut obj = json!({ "Id": t.id, "Arn": t.arn });
356 if let Some(ref input) = t.input {
357 obj["Input"] = json!(input);
358 }
359 if let Some(ref input_path) = t.input_path {
360 obj["InputPath"] = json!(input_path);
361 }
362 if let Some(ref it) = t.input_transformer {
363 obj["InputTransformer"] = it.clone();
364 }
365 if let Some(ref sp) = t.sqs_parameters {
366 obj["SqsParameters"] = sp.clone();
367 }
368 obj
369}
370
371impl EventBridgeService {
373 fn create_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
374 let body = req.json_body();
375 validate_required("Name", &body["Name"])?;
376 let name = body["Name"]
377 .as_str()
378 .ok_or_else(|| missing("Name"))?
379 .to_string();
380 validate_string_length("name", &name, 1, 256)?;
381 validate_optional_string_length(
382 "eventSourceName",
383 body["EventSourceName"].as_str(),
384 1,
385 256,
386 )?;
387 validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
388 validate_optional_string_length(
389 "kmsKeyIdentifier",
390 body["KmsKeyIdentifier"].as_str(),
391 0,
392 2048,
393 )?;
394
395 if name.contains('/') && !name.starts_with("aws.partner/") {
397 return Err(AwsServiceError::aws_error(
398 StatusCode::BAD_REQUEST,
399 "ValidationException",
400 "Event bus name must not contain '/'.",
401 ));
402 }
403
404 if name.starts_with("aws.partner/") {
406 let event_source = body["EventSourceName"].as_str().unwrap_or("");
407 let accounts_r = self.state.read();
408 let empty_r = EventBridgeState::new(&req.account_id, &req.region);
409 let state_r = accounts_r.get(&req.account_id).unwrap_or(&empty_r);
410 let has_source = state_r.partner_event_sources.contains_key(event_source);
411 drop(accounts_r);
412 if !has_source {
413 return Err(AwsServiceError::aws_error(
414 StatusCode::BAD_REQUEST,
415 "ResourceNotFoundException",
416 format!("Event source {event_source} does not exist."),
417 ));
418 }
419 }
420
421 let mut accounts = self.state.write();
422 let state = accounts.get_or_create(&req.account_id);
423
424 if state.buses.contains_key(&name) {
425 return Err(AwsServiceError::aws_error(
426 StatusCode::BAD_REQUEST,
427 "ResourceAlreadyExistsException",
428 format!("Event bus {name} already exists."),
429 ));
430 }
431
432 let arn = format!(
433 "arn:aws:events:{}:{}:event-bus/{}",
434 req.region, state.account_id, name
435 );
436 let now = Utc::now();
437 let description = body["Description"].as_str().map(|s| s.to_string());
438 let kms_key_identifier = body["KmsKeyIdentifier"].as_str().map(|s| s.to_string());
439 let dead_letter_config = body.get("DeadLetterConfig").cloned();
440
441 let tags = parse_tags(&body);
442
443 let bus = EventBus {
444 name: name.clone(),
445 arn: arn.clone(),
446 tags,
447 policy: None,
448 description,
449 kms_key_identifier,
450 dead_letter_config,
451 creation_time: now,
452 last_modified_time: now,
453 };
454 state.buses.insert(name, bus);
455
456 Ok(AwsResponse::ok_json(json!({ "EventBusArn": arn })))
457 }
458
459 fn delete_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
460 let body = req.json_body();
461 validate_required("Name", &body["Name"])?;
462 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
463 validate_string_length("name", name, 1, 256)?;
464
465 if name == "default" {
466 return Err(AwsServiceError::aws_error(
467 StatusCode::BAD_REQUEST,
468 "ValidationException",
469 format!("Cannot delete event bus {name}."),
470 ));
471 }
472
473 let mut accounts = self.state.write();
474 let state = accounts.get_or_create(&req.account_id);
475 state.buses.remove(name);
476 state.rules.retain(|k, _| k.0 != name);
477
478 Ok(AwsResponse::ok_json(json!({})))
479 }
480
481 fn list_event_buses(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
482 let body = req.json_body();
483 validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 256)?;
484 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
485 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
486 let name_prefix = body["NamePrefix"].as_str();
487 let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
488 if let Some(t) = body["NextToken"].as_str() {
489 t.parse::<usize>().map_err(|_| {
490 AwsServiceError::aws_error(
491 StatusCode::BAD_REQUEST,
492 "InvalidNextTokenException",
493 format!("Invalid NextToken value: '{t}'"),
494 )
495 })?;
496 }
497
498 let accounts = self.state.read();
499 let empty = EventBridgeState::new(&req.account_id, &req.region);
500 let state = accounts.get(&req.account_id).unwrap_or(&empty);
501 let filtered: Vec<&_> = state
502 .buses
503 .values()
504 .filter(|b| match name_prefix {
505 Some(prefix) => b.name.starts_with(prefix),
506 None => true,
507 })
508 .collect();
509
510 let (page, next_token) = paginate(&filtered, body["NextToken"].as_str(), limit);
511 let buses: Vec<Value> = page
512 .iter()
513 .map(|b| json!({ "Name": b.name, "Arn": b.arn }))
514 .collect();
515 let mut resp = json!({ "EventBuses": buses });
516 if let Some(token) = next_token {
517 resp["NextToken"] = json!(token);
518 }
519
520 Ok(AwsResponse::ok_json(resp))
521 }
522
523 fn describe_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
524 let body = req.json_body();
525 validate_optional_string_length("name", body["Name"].as_str(), 1, 1600)?;
526 let name = body["Name"].as_str().unwrap_or("default");
527
528 let accounts = self.state.read();
529 let empty = EventBridgeState::new(&req.account_id, &req.region);
530 let state = accounts.get(&req.account_id).unwrap_or(&empty);
531 let bus = state.buses.get(name).ok_or_else(|| {
532 AwsServiceError::aws_error(
533 StatusCode::BAD_REQUEST,
534 "ResourceNotFoundException",
535 format!("Event bus {name} does not exist."),
536 )
537 })?;
538
539 let mut resp = json!({
540 "Name": bus.name,
541 "Arn": bus.arn,
542 "CreationTime": bus.creation_time.timestamp() as f64,
543 "LastModifiedTime": bus.last_modified_time.timestamp() as f64,
544 });
545
546 if let Some(ref policy) = bus.policy {
547 resp["Policy"] = Value::String(serde_json::to_string(policy).unwrap());
548 }
549 if let Some(ref desc) = bus.description {
550 resp["Description"] = json!(desc);
551 }
552 if let Some(ref kms) = bus.kms_key_identifier {
553 resp["KmsKeyIdentifier"] = json!(kms);
554 }
555 if let Some(ref dlc) = bus.dead_letter_config {
556 resp["DeadLetterConfig"] = dlc.clone();
557 }
558
559 Ok(AwsResponse::ok_json(resp))
560 }
561
562 fn put_permission(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
565 let body = req.json_body();
566 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 256)?;
567 validate_optional_string_length("action", body["Action"].as_str(), 1, 64)?;
568 validate_optional_string_length("principal", body["Principal"].as_str(), 1, 12)?;
569 validate_optional_string_length("statementId", body["StatementId"].as_str(), 1, 64)?;
570 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
571
572 let mut accounts = self.state.write();
573 let state = accounts.get_or_create(&req.account_id);
574
575 let bus = state.buses.get_mut(event_bus_name).ok_or_else(|| {
576 AwsServiceError::aws_error(
577 StatusCode::BAD_REQUEST,
578 "ResourceNotFoundException",
579 format!("Event bus {event_bus_name} does not exist."),
580 )
581 })?;
582
583 if let Some(policy_str) = body["Policy"].as_str() {
585 if let Ok(policy) = serde_json::from_str::<Value>(policy_str) {
586 bus.policy = Some(policy);
587 return Ok(AwsResponse::ok_json(json!({})));
588 }
589 }
590
591 let action = body["Action"].as_str().unwrap_or("");
593 let principal = body["Principal"].as_str().unwrap_or("");
594 let statement_id = body["StatementId"].as_str().unwrap_or("");
595
596 if action != "events:PutEvents" {
598 return Err(AwsServiceError::aws_error(
599 StatusCode::BAD_REQUEST,
600 "ValidationException",
601 "Provided value in parameter 'action' is not supported.",
602 ));
603 }
604
605 let statement = json!({
606 "Sid": statement_id,
607 "Effect": "Allow",
608 "Principal": { "AWS": Arn::global("iam", principal, "root").to_string() },
609 "Action": action,
610 "Resource": bus.arn,
611 });
612
613 let policy = bus.policy.get_or_insert_with(|| {
614 json!({
615 "Version": "2012-10-17",
616 "Statement": [],
617 })
618 });
619
620 if let Some(stmts) = policy["Statement"].as_array_mut() {
621 stmts.push(statement);
622 }
623
624 Ok(AwsResponse::ok_json(json!({})))
625 }
626
627 fn remove_permission(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
628 let body = req.json_body();
629 validate_optional_string_length("statementId", body["StatementId"].as_str(), 1, 64)?;
630 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 256)?;
631 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
632 let statement_id = body["StatementId"].as_str().unwrap_or("");
633 let remove_all = body["RemoveAllPermissions"].as_bool().unwrap_or(false);
634
635 let mut accounts = self.state.write();
636 let state = accounts.get_or_create(&req.account_id);
637
638 let bus = state.buses.get_mut(event_bus_name).ok_or_else(|| {
639 AwsServiceError::aws_error(
640 StatusCode::BAD_REQUEST,
641 "ResourceNotFoundException",
642 format!("Event bus {event_bus_name} does not exist."),
643 )
644 })?;
645
646 if remove_all {
647 bus.policy = None;
648 return Ok(AwsResponse::ok_json(json!({})));
649 }
650
651 let policy = bus.policy.as_mut().ok_or_else(|| {
652 AwsServiceError::aws_error(
653 StatusCode::BAD_REQUEST,
654 "ResourceNotFoundException",
655 "EventBus does not have a policy.",
656 )
657 })?;
658
659 if let Some(stmts) = policy["Statement"].as_array_mut() {
660 let before = stmts.len();
661 stmts.retain(|s| s["Sid"].as_str() != Some(statement_id));
662 if stmts.len() == before {
663 return Err(AwsServiceError::aws_error(
664 StatusCode::BAD_REQUEST,
665 "ResourceNotFoundException",
666 "Statement with the provided id does not exist.",
667 ));
668 }
669 if stmts.is_empty() {
670 bus.policy = None;
671 }
672 }
673
674 Ok(AwsResponse::ok_json(json!({})))
675 }
676
677 fn put_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
680 let body = req.json_body();
681 validate_required("Name", &body["Name"])?;
682 let name = body["Name"]
683 .as_str()
684 .ok_or_else(|| missing("Name"))?
685 .to_string();
686 validate_string_length("name", &name, 1, 64)?;
687 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
688 validate_optional_string_length(
689 "scheduleExpression",
690 body["ScheduleExpression"].as_str(),
691 0,
692 256,
693 )?;
694 validate_optional_string_length("eventPattern", body["EventPattern"].as_str(), 0, 4096)?;
695 validate_optional_enum(
696 "state",
697 body["State"].as_str(),
698 &[
699 "ENABLED",
700 "DISABLED",
701 "ENABLED_WITH_ALL_CLOUDTRAIL_MANAGEMENT_EVENTS",
702 ],
703 )?;
704 validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
705 validate_optional_string_length("roleArn", body["RoleArn"].as_str(), 1, 1600)?;
706
707 let raw_bus = body["EventBusName"]
708 .as_str()
709 .unwrap_or("default")
710 .to_string();
711
712 let mut accounts = self.state.write();
713 let state = accounts.get_or_create(&req.account_id);
714 let event_bus_name = state.resolve_bus_name(&raw_bus);
715
716 let event_pattern = body["EventPattern"].as_str().and_then(|s| {
717 if s.is_empty() {
718 None
719 } else {
720 Some(s.to_string())
721 }
722 });
723 let schedule_expression = body["ScheduleExpression"].as_str().and_then(|s| {
724 if s.is_empty() {
725 None
726 } else {
727 Some(s.to_string())
728 }
729 });
730 let description = body["Description"].as_str().map(|s| s.to_string());
731 let role_arn = body["RoleArn"].as_str().map(|s| s.to_string());
732 let rule_state = body["State"].as_str().unwrap_or("ENABLED").to_string();
733
734 if schedule_expression.is_some() && event_bus_name != "default" {
736 return Err(AwsServiceError::aws_error(
737 StatusCode::BAD_REQUEST,
738 "ValidationException",
739 "ScheduleExpression is supported only on the default event bus.",
740 ));
741 }
742
743 if !state.buses.contains_key(&event_bus_name) {
744 return Err(AwsServiceError::aws_error(
745 StatusCode::BAD_REQUEST,
746 "ResourceNotFoundException",
747 format!("Event bus {event_bus_name} does not exist."),
748 ));
749 }
750
751 let arn = if event_bus_name == "default" {
752 format!(
753 "arn:aws:events:{}:{}:rule/{}",
754 req.region, state.account_id, name
755 )
756 } else {
757 format!(
758 "arn:aws:events:{}:{}:rule/{}/{}",
759 req.region, state.account_id, event_bus_name, name
760 )
761 };
762
763 let key = (event_bus_name.clone(), name.clone());
764 let targets = state
765 .rules
766 .get(&key)
767 .map(|r| r.targets.clone())
768 .unwrap_or_default();
769
770 let tags = parse_tags(&body);
771
772 let rule = EventRule {
773 name: name.clone(),
774 arn: arn.clone(),
775 event_bus_name,
776 event_pattern,
777 schedule_expression,
778 state: rule_state,
779 description,
780 role_arn,
781 managed_by: None,
782 created_by: None,
783 targets,
784 tags,
785 last_fired: None,
786 };
787
788 state.rules.insert(key, rule);
789 Ok(AwsResponse::ok_json(json!({ "RuleArn": arn })))
790 }
791
792 fn delete_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
793 let body = req.json_body();
794 validate_required("Name", &body["Name"])?;
795 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
796 validate_string_length("name", name, 1, 64)?;
797 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
798 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
799
800 let mut accounts = self.state.write();
801 let state = accounts.get_or_create(&req.account_id);
802 let bus_name = state.resolve_bus_name(event_bus_name);
803 let key = (bus_name, name.to_string());
804
805 if let Some(rule) = state.rules.get(&key) {
807 if !rule.targets.is_empty() {
808 return Err(AwsServiceError::aws_error(
809 StatusCode::BAD_REQUEST,
810 "ValidationException",
811 "Rule can't be deleted since it has targets.",
812 ));
813 }
814 }
815
816 state.rules.remove(&key);
817 Ok(AwsResponse::ok_json(json!({})))
818 }
819
820 fn list_rules(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
821 let body = req.json_body();
822 validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 64)?;
823 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
824 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
825 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
826 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
827 let name_prefix = body["NamePrefix"].as_str();
828 let limit = body["Limit"].as_u64().map(|n| n as usize);
829 let next_token = body["NextToken"].as_str();
830
831 let accounts = self.state.read();
832 let empty = EventBridgeState::new(&req.account_id, &req.region);
833 let state = accounts.get(&req.account_id).unwrap_or(&empty);
834 let bus_name = state.resolve_bus_name(event_bus_name);
835
836 let mut rules: Vec<&EventRule> = state
837 .rules
838 .values()
839 .filter(|r| r.event_bus_name == bus_name)
840 .filter(|r| match name_prefix {
841 Some(prefix) => r.name.starts_with(prefix),
842 None => true,
843 })
844 .collect();
845 rules.sort_by(|a, b| a.name.cmp(&b.name));
846
847 let start = next_token
849 .and_then(|t| t.parse::<usize>().ok())
850 .unwrap_or(0)
851 .min(rules.len());
852 let rules_slice = &rules[start..];
853
854 let (page, new_next_token) = if let Some(lim) = limit {
855 if rules_slice.len() > lim {
856 (&rules_slice[..lim], Some((start + lim).to_string()))
857 } else {
858 (rules_slice, None)
859 }
860 } else {
861 (rules_slice, None)
862 };
863
864 let rules_json: Vec<Value> = page
865 .iter()
866 .map(|r| {
867 let mut obj = json!({
868 "Name": r.name,
869 "Arn": r.arn,
870 "EventBusName": r.event_bus_name,
871 "State": r.state,
872 });
873 if let Some(ref desc) = r.description {
874 obj["Description"] = json!(desc);
875 }
876 if let Some(ref ep) = r.event_pattern {
877 obj["EventPattern"] = json!(ep);
878 }
879 if let Some(ref se) = r.schedule_expression {
880 obj["ScheduleExpression"] = json!(se);
881 }
882 if let Some(ref mb) = r.managed_by {
883 obj["ManagedBy"] = json!(mb);
884 }
885 obj
886 })
887 .collect();
888
889 let mut resp = json!({ "Rules": rules_json });
890 if let Some(token) = new_next_token {
891 resp["NextToken"] = json!(token);
892 }
893
894 Ok(AwsResponse::ok_json(resp))
895 }
896
897 fn describe_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
898 let body = req.json_body();
899 validate_required("Name", &body["Name"])?;
900 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
901 validate_string_length("name", name, 1, 64)?;
902 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
903 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
904
905 let accounts = self.state.read();
906 let empty = EventBridgeState::new(&req.account_id, &req.region);
907 let state = accounts.get(&req.account_id).unwrap_or(&empty);
908 let bus_name = state.resolve_bus_name(event_bus_name);
909 let key = (bus_name.clone(), name.to_string());
910
911 let rule = state.rules.get(&key).ok_or_else(|| {
912 AwsServiceError::aws_error(
913 StatusCode::BAD_REQUEST,
914 "ResourceNotFoundException",
915 format!("Rule {name} does not exist."),
916 )
917 })?;
918
919 let mut resp = json!({
920 "Name": rule.name,
921 "Arn": rule.arn,
922 "EventBusName": rule.event_bus_name,
923 "State": rule.state,
924 });
925
926 if let Some(ref desc) = rule.description {
927 resp["Description"] = json!(desc);
928 }
929 if let Some(ref ep) = rule.event_pattern {
930 resp["EventPattern"] = json!(ep);
931 }
932 if let Some(ref se) = rule.schedule_expression {
933 resp["ScheduleExpression"] = json!(se);
934 }
935 if let Some(ref role) = rule.role_arn {
936 resp["RoleArn"] = json!(role);
937 }
938 if let Some(ref mb) = rule.managed_by {
939 resp["ManagedBy"] = json!(mb);
940 }
941 if let Some(ref cb) = rule.created_by {
942 resp["CreatedBy"] = json!(cb);
943 }
944 if rule.event_bus_name != "default" && rule.created_by.is_none() {
946 resp["CreatedBy"] = json!(state.account_id);
947 }
948
949 Ok(AwsResponse::ok_json(resp))
950 }
951
952 fn enable_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
953 let body = req.json_body();
954 validate_required("Name", &body["Name"])?;
955 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
956 validate_string_length("name", name, 1, 64)?;
957 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
958 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
959
960 let mut accounts = self.state.write();
961 let state = accounts.get_or_create(&req.account_id);
962 let bus_name = state.resolve_bus_name(event_bus_name);
963 let key = (bus_name, name.to_string());
964
965 let rule = state.rules.get_mut(&key).ok_or_else(|| {
966 AwsServiceError::aws_error(
967 StatusCode::BAD_REQUEST,
968 "ResourceNotFoundException",
969 format!("Rule {name} does not exist."),
970 )
971 })?;
972
973 rule.state = "ENABLED".to_string();
974 Ok(AwsResponse::ok_json(json!({})))
975 }
976
977 fn disable_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
978 let body = req.json_body();
979 validate_required("Name", &body["Name"])?;
980 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
981 validate_string_length("name", name, 1, 64)?;
982 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
983 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
984
985 let mut accounts = self.state.write();
986 let state = accounts.get_or_create(&req.account_id);
987 let bus_name = state.resolve_bus_name(event_bus_name);
988 let key = (bus_name, name.to_string());
989
990 let rule = state.rules.get_mut(&key).ok_or_else(|| {
991 AwsServiceError::aws_error(
992 StatusCode::BAD_REQUEST,
993 "ResourceNotFoundException",
994 format!("Rule {name} does not exist."),
995 )
996 })?;
997
998 rule.state = "DISABLED".to_string();
999 Ok(AwsResponse::ok_json(json!({})))
1000 }
1001
1002 fn put_targets(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1005 let body = req.json_body();
1006 validate_required("Rule", &body["Rule"])?;
1007 let rule_name = body["Rule"].as_str().ok_or_else(|| missing("Rule"))?;
1008 validate_string_length("rule", rule_name, 1, 64)?;
1009 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
1010 validate_required("Targets", &body["Targets"])?;
1011 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
1012 let targets = body["Targets"]
1013 .as_array()
1014 .ok_or_else(|| missing("Targets"))?;
1015
1016 for target in targets {
1018 let target_id = target["Id"].as_str().unwrap_or("");
1019 let target_arn = target["Arn"].as_str().unwrap_or("");
1020
1021 if target_arn.ends_with(".fifo") && target.get("SqsParameters").is_none() {
1022 return Err(AwsServiceError::aws_error(
1023 StatusCode::BAD_REQUEST,
1024 "ValidationException",
1025 format!(
1026 "Parameter(s) SqsParameters must be specified for target: {target_id}."
1027 ),
1028 ));
1029 }
1030
1031 if !target_arn.starts_with("arn:") {
1033 return Err(AwsServiceError::aws_error(
1034 StatusCode::BAD_REQUEST,
1035 "ValidationException",
1036 format!(
1037 "Parameter {target_arn} is not valid. Reason: Provided Arn is not in correct format."
1038 ),
1039 ));
1040 }
1041 }
1042
1043 let mut accounts = self.state.write();
1044 let state = accounts.get_or_create(&req.account_id);
1045 let bus_name = state.resolve_bus_name(event_bus_name);
1046 let key = (bus_name.clone(), rule_name.to_string());
1047
1048 let rule = state.rules.get_mut(&key).ok_or_else(|| {
1049 AwsServiceError::aws_error(
1050 StatusCode::BAD_REQUEST,
1051 "ResourceNotFoundException",
1052 format!("Rule {rule_name} does not exist on EventBus {bus_name}."),
1053 )
1054 })?;
1055
1056 for target in targets {
1057 let et = parse_target(target);
1058 rule.targets.retain(|t| t.id != et.id);
1060 rule.targets.push(et);
1061 }
1062
1063 Ok(AwsResponse::ok_json(json!({
1064 "FailedEntryCount": 0,
1065 "FailedEntries": [],
1066 })))
1067 }
1068
1069 fn remove_targets(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1070 let body = req.json_body();
1071 validate_required("Rule", &body["Rule"])?;
1072 let rule_name = body["Rule"].as_str().ok_or_else(|| missing("Rule"))?;
1073 validate_string_length("rule", rule_name, 1, 64)?;
1074 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
1075 validate_required("Ids", &body["Ids"])?;
1076 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
1077 let ids = body["Ids"].as_array().ok_or_else(|| missing("Ids"))?;
1078
1079 let target_ids: Vec<String> = ids
1080 .iter()
1081 .filter_map(|v| v.as_str().map(|s| s.to_string()))
1082 .collect();
1083
1084 let mut accounts = self.state.write();
1085 let state = accounts.get_or_create(&req.account_id);
1086 let bus_name = state.resolve_bus_name(event_bus_name);
1087 let key = (bus_name.clone(), rule_name.to_string());
1088
1089 let rule = state.rules.get_mut(&key).ok_or_else(|| {
1090 AwsServiceError::aws_error(
1091 StatusCode::BAD_REQUEST,
1092 "ResourceNotFoundException",
1093 format!("Rule {rule_name} does not exist on EventBus {bus_name}."),
1094 )
1095 })?;
1096
1097 rule.targets.retain(|t| !target_ids.contains(&t.id));
1098
1099 Ok(AwsResponse::ok_json(json!({
1100 "FailedEntryCount": 0,
1101 "FailedEntries": [],
1102 })))
1103 }
1104
1105 fn list_targets_by_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1106 let body = req.json_body();
1107 validate_required("Rule", &body["Rule"])?;
1108 let rule_name = body["Rule"].as_str().ok_or_else(|| missing("Rule"))?;
1109 validate_string_length("rule", rule_name, 1, 64)?;
1110 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
1111 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1112 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1113 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
1114 let limit = body["Limit"].as_u64().map(|n| n as usize);
1115 let next_token = body["NextToken"].as_str();
1116
1117 let accounts = self.state.read();
1118 let empty = EventBridgeState::new(&req.account_id, &req.region);
1119 let state = accounts.get(&req.account_id).unwrap_or(&empty);
1120 let bus_name = state.resolve_bus_name(event_bus_name);
1121 let key = (bus_name, rule_name.to_string());
1122
1123 let rule = state.rules.get(&key).ok_or_else(|| {
1124 AwsServiceError::aws_error(
1125 StatusCode::BAD_REQUEST,
1126 "ResourceNotFoundException",
1127 format!("Rule {rule_name} does not exist."),
1128 )
1129 })?;
1130
1131 let all_targets = &rule.targets;
1132 let start = next_token
1133 .and_then(|t| t.parse::<usize>().ok())
1134 .unwrap_or(0)
1135 .min(all_targets.len());
1136 let slice = &all_targets[start..];
1137
1138 let (page, new_next_token) = if let Some(lim) = limit {
1139 if slice.len() > lim {
1140 (&slice[..lim], Some((start + lim).to_string()))
1141 } else {
1142 (slice, None)
1143 }
1144 } else {
1145 (slice, None)
1146 };
1147
1148 let targets: Vec<Value> = page.iter().map(target_to_json).collect();
1149
1150 let mut resp = json!({ "Targets": targets });
1151 if let Some(token) = new_next_token {
1152 resp["NextToken"] = json!(token);
1153 }
1154
1155 Ok(AwsResponse::ok_json(resp))
1156 }
1157
1158 fn list_rule_names_by_target(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1159 let body = req.json_body();
1160 validate_required("TargetArn", &body["TargetArn"])?;
1161 let target_arn = body["TargetArn"]
1162 .as_str()
1163 .ok_or_else(|| missing("TargetArn"))?;
1164 validate_string_length("targetArn", target_arn, 1, 1600)?;
1165 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
1166 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1167 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1168 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
1169 let limit = body["Limit"].as_u64().map(|n| n as usize);
1170 let next_token = body["NextToken"].as_str();
1171
1172 let accounts = self.state.read();
1173 let empty = EventBridgeState::new(&req.account_id, &req.region);
1174 let state = accounts.get(&req.account_id).unwrap_or(&empty);
1175 let bus_name = state.resolve_bus_name(event_bus_name);
1176
1177 let mut rule_names: Vec<String> = Vec::new();
1179 for rule in state.rules.values() {
1180 if rule.event_bus_name == bus_name
1181 && rule.targets.iter().any(|t| t.arn == target_arn)
1182 && !rule_names.contains(&rule.name)
1183 {
1184 rule_names.push(rule.name.clone());
1185 }
1186 }
1187 rule_names.sort();
1188
1189 let start = next_token
1190 .and_then(|t| t.parse::<usize>().ok())
1191 .unwrap_or(0)
1192 .min(rule_names.len());
1193 let slice = &rule_names[start..];
1194
1195 let (page, new_next_token) = if let Some(lim) = limit {
1196 if slice.len() > lim {
1197 (&slice[..lim], Some((start + lim).to_string()))
1198 } else {
1199 (slice, None)
1200 }
1201 } else {
1202 (slice, None)
1203 };
1204
1205 let mut resp = json!({ "RuleNames": page });
1206 if let Some(token) = new_next_token {
1207 resp["NextToken"] = json!(token);
1208 }
1209
1210 Ok(AwsResponse::ok_json(resp))
1211 }
1212
1213 fn create_partner_event_source(
1216 &self,
1217 req: &AwsRequest,
1218 ) -> Result<AwsResponse, AwsServiceError> {
1219 let body = req.json_body();
1220 validate_required("Name", &body["Name"])?;
1221 let name = body["Name"]
1222 .as_str()
1223 .ok_or_else(|| missing("Name"))?
1224 .to_string();
1225 validate_string_length("name", &name, 1, 256)?;
1226 validate_required("Account", &body["Account"])?;
1227 let account = body["Account"]
1228 .as_str()
1229 .ok_or_else(|| missing("Account"))?
1230 .to_string();
1231 validate_string_length("account", &account, 12, 12)?;
1232
1233 let mut accounts = self.state.write();
1234 let state = accounts.get_or_create(&req.account_id);
1235 if state.partner_event_sources.contains_key(&name) {
1236 return Err(AwsServiceError::aws_error(
1237 StatusCode::CONFLICT,
1238 "ResourceAlreadyExistsException",
1239 format!("Partner event source {name} already exists."),
1240 ));
1241 }
1242 let arn = format!(
1243 "arn:aws:events:{}::event-source/aws.partner/{}",
1244 state.region, name
1245 );
1246 let now = Utc::now();
1247 let ps = PartnerEventSource {
1248 name: name.clone(),
1249 arn: arn.clone(),
1250 account,
1251 creation_time: now,
1252 expiration_time: None,
1253 state: "ACTIVE".to_string(),
1254 };
1255 state.partner_event_sources.insert(name.clone(), ps);
1256
1257 Ok(AwsResponse::ok_json(json!({ "EventSourceArn": arn })))
1258 }
1259
1260 fn delete_partner_event_source(
1261 &self,
1262 req: &AwsRequest,
1263 ) -> Result<AwsResponse, AwsServiceError> {
1264 let body = req.json_body();
1265 validate_required("Name", &body["Name"])?;
1266 let name = body["Name"]
1267 .as_str()
1268 .ok_or_else(|| missing("Name"))?
1269 .to_string();
1270 validate_required("Account", &body["Account"])?;
1271 let account = body["Account"]
1272 .as_str()
1273 .ok_or_else(|| missing("Account"))?
1274 .to_string();
1275
1276 let mut accounts = self.state.write();
1277 let state = accounts.get_or_create(&req.account_id);
1278 match state.partner_event_sources.get(&name) {
1279 Some(ps) if ps.account == account => {
1280 state.partner_event_sources.remove(&name);
1281 }
1282 Some(_) => {
1283 return Err(AwsServiceError::aws_error(
1284 StatusCode::NOT_FOUND,
1285 "ResourceNotFoundException",
1286 format!("Partner event source {name} does not exist for account {account}."),
1287 ));
1288 }
1289 None => {
1290 return Err(AwsServiceError::aws_error(
1291 StatusCode::NOT_FOUND,
1292 "ResourceNotFoundException",
1293 format!("Partner event source {name} does not exist."),
1294 ));
1295 }
1296 }
1297
1298 Ok(AwsResponse::ok_json(json!({})))
1299 }
1300
1301 fn describe_partner_event_source(
1302 &self,
1303 req: &AwsRequest,
1304 ) -> Result<AwsResponse, AwsServiceError> {
1305 let body = req.json_body();
1306 validate_required("Name", &body["Name"])?;
1307 let name = body["Name"]
1308 .as_str()
1309 .ok_or_else(|| missing("Name"))?
1310 .to_string();
1311 validate_string_length("name", &name, 1, 256)?;
1312
1313 let accounts = self.state.read();
1314 let empty = EventBridgeState::new(&req.account_id, &req.region);
1315 let state = accounts.get(&req.account_id).unwrap_or(&empty);
1316 let ps = state.partner_event_sources.get(&name).ok_or_else(|| {
1317 AwsServiceError::aws_error(
1318 StatusCode::NOT_FOUND,
1319 "ResourceNotFoundException",
1320 format!("Partner event source {name} does not exist."),
1321 )
1322 })?;
1323
1324 Ok(AwsResponse::ok_json(json!({
1325 "Arn": ps.arn,
1326 "Name": ps.name,
1327 })))
1328 }
1329
1330 fn list_partner_event_sources(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1331 let body = req.json_body();
1332 validate_required("namePrefix", &body["NamePrefix"])?;
1333 let name_prefix = body["NamePrefix"]
1334 .as_str()
1335 .ok_or_else(|| missing("NamePrefix"))?;
1336 validate_string_length("namePrefix", name_prefix, 1, 256)?;
1337 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1338 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1339 let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
1340
1341 let accounts = self.state.read();
1342 let empty = EventBridgeState::new(&req.account_id, &req.region);
1343 let state = accounts.get(&req.account_id).unwrap_or(&empty);
1344 let all: Vec<Value> = state
1345 .partner_event_sources
1346 .values()
1347 .filter(|ps| ps.name.starts_with(name_prefix))
1348 .map(|ps| {
1349 json!({
1350 "Arn": ps.arn,
1351 "Name": ps.name,
1352 })
1353 })
1354 .collect();
1355
1356 let (sources, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
1357 let mut resp = json!({ "PartnerEventSources": sources });
1358 if let Some(token) = next_token {
1359 resp["NextToken"] = json!(token);
1360 }
1361
1362 Ok(AwsResponse::ok_json(resp))
1363 }
1364
1365 fn list_partner_event_source_accounts(
1366 &self,
1367 req: &AwsRequest,
1368 ) -> Result<AwsResponse, AwsServiceError> {
1369 let body = req.json_body();
1370 validate_required("EventSourceName", &body["EventSourceName"])?;
1371 let event_source_name = body["EventSourceName"]
1372 .as_str()
1373 .ok_or_else(|| missing("EventSourceName"))?;
1374 validate_string_length("eventSourceName", event_source_name, 1, 256)?;
1375 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1376 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1377
1378 let accounts = self.state.read();
1379 let empty = EventBridgeState::new(&req.account_id, &req.region);
1380 let state = accounts.get(&req.account_id).unwrap_or(&empty);
1381 let accounts: Vec<Value> = state
1382 .partner_event_sources
1383 .values()
1384 .filter(|ps| ps.name == event_source_name)
1385 .map(|ps| json!({ "Account": ps.account }))
1386 .collect();
1387
1388 Ok(AwsResponse::ok_json(json!({
1389 "PartnerEventSourceAccounts": accounts
1390 })))
1391 }
1392
1393 fn activate_event_source(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1394 let body = req.json_body();
1395 validate_required("Name", &body["Name"])?;
1396 let name = body["Name"]
1397 .as_str()
1398 .ok_or_else(|| missing("Name"))?
1399 .to_string();
1400
1401 let mut accounts = self.state.write();
1402 let state = accounts.get_or_create(&req.account_id);
1403 let ps = state.partner_event_sources.get_mut(&name).ok_or_else(|| {
1404 AwsServiceError::aws_error(
1405 StatusCode::NOT_FOUND,
1406 "ResourceNotFoundException",
1407 format!("Event source {name} does not exist."),
1408 )
1409 })?;
1410 ps.state = "ACTIVE".to_string();
1411
1412 Ok(AwsResponse::ok_json(json!({})))
1413 }
1414
1415 fn deactivate_event_source(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1416 let body = req.json_body();
1417 validate_required("Name", &body["Name"])?;
1418 let name = body["Name"]
1419 .as_str()
1420 .ok_or_else(|| missing("Name"))?
1421 .to_string();
1422
1423 let mut accounts = self.state.write();
1424 let state = accounts.get_or_create(&req.account_id);
1425 let ps = state.partner_event_sources.get_mut(&name).ok_or_else(|| {
1426 AwsServiceError::aws_error(
1427 StatusCode::NOT_FOUND,
1428 "ResourceNotFoundException",
1429 format!("Event source {name} does not exist."),
1430 )
1431 })?;
1432 ps.state = "INACTIVE".to_string();
1433
1434 Ok(AwsResponse::ok_json(json!({})))
1435 }
1436
1437 fn describe_event_source(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1438 let body = req.json_body();
1439 validate_required("Name", &body["Name"])?;
1440 let name = body["Name"]
1441 .as_str()
1442 .ok_or_else(|| missing("Name"))?
1443 .to_string();
1444
1445 let accounts = self.state.read();
1446 let empty = EventBridgeState::new(&req.account_id, &req.region);
1447 let state = accounts.get(&req.account_id).unwrap_or(&empty);
1448 let ps = state.partner_event_sources.get(&name).ok_or_else(|| {
1449 AwsServiceError::aws_error(
1450 StatusCode::NOT_FOUND,
1451 "ResourceNotFoundException",
1452 format!("Event source {name} does not exist."),
1453 )
1454 })?;
1455
1456 Ok(AwsResponse::ok_json(json!({
1457 "Arn": ps.arn,
1458 "Name": ps.name,
1459 "CreatedBy": ps.account,
1460 "CreationTime": ps.creation_time.timestamp() as f64,
1461 "State": ps.state,
1462 })))
1463 }
1464
1465 fn list_event_sources(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1466 let body = req.json_body();
1467 validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 256)?;
1468 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1469 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1470 let name_prefix = body["NamePrefix"].as_str();
1471 let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
1472
1473 let accounts = self.state.read();
1474 let empty = EventBridgeState::new(&req.account_id, &req.region);
1475 let state = accounts.get(&req.account_id).unwrap_or(&empty);
1476 let all: Vec<Value> = state
1477 .partner_event_sources
1478 .values()
1479 .filter(|ps| match name_prefix {
1480 Some(prefix) => ps.name.starts_with(prefix),
1481 None => true,
1482 })
1483 .map(|ps| {
1484 json!({
1485 "Arn": ps.arn,
1486 "Name": ps.name,
1487 "CreatedBy": ps.account,
1488 "CreationTime": ps.creation_time.timestamp() as f64,
1489 "State": ps.state,
1490 })
1491 })
1492 .collect();
1493
1494 let (sources, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
1495 let mut resp = json!({ "EventSources": sources });
1496 if let Some(token) = next_token {
1497 resp["NextToken"] = json!(token);
1498 }
1499
1500 Ok(AwsResponse::ok_json(resp))
1501 }
1502
1503 fn put_partner_events(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1504 let body = req.json_body();
1505 validate_required("Entries", &body["Entries"])?;
1506 let entries = body["Entries"]
1507 .as_array()
1508 .ok_or_else(|| missing("Entries"))?;
1509
1510 let mut result_entries = Vec::new();
1511 for _entry in entries {
1512 let event_id = uuid::Uuid::new_v4().to_string();
1513 result_entries.push(json!({ "EventId": event_id }));
1514 }
1515
1516 Ok(AwsResponse::ok_json(json!({
1517 "FailedEntryCount": 0,
1518 "Entries": result_entries,
1519 })))
1520 }
1521
1522 fn test_event_pattern(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1525 let body = req.json_body();
1526 validate_required("EventPattern", &body["EventPattern"])?;
1527 validate_required("Event", &body["Event"])?;
1528 let event_pattern = body["EventPattern"]
1529 .as_str()
1530 .ok_or_else(|| missing("EventPattern"))?;
1531 let event_str = body["Event"].as_str().ok_or_else(|| missing("Event"))?;
1532
1533 let event: Value = serde_json::from_str(event_str).map_err(|_| {
1535 AwsServiceError::aws_error(
1536 StatusCode::BAD_REQUEST,
1537 "InvalidEventPatternException",
1538 "Event is not valid JSON.",
1539 )
1540 })?;
1541
1542 let _pattern: Value = serde_json::from_str(event_pattern).map_err(|_| {
1544 AwsServiceError::aws_error(
1545 StatusCode::BAD_REQUEST,
1546 "InvalidEventPatternException",
1547 "Event pattern is not valid JSON.",
1548 )
1549 })?;
1550
1551 let source = event["source"].as_str().unwrap_or("");
1552 let detail_type = event["detail-type"].as_str().unwrap_or("");
1553 let detail = event
1554 .get("detail")
1555 .map(|v| serde_json::to_string(v).unwrap_or_default())
1556 .unwrap_or_else(|| "{}".to_string());
1557 let account = event["account"].as_str().unwrap_or("");
1558 let region = event["region"].as_str().unwrap_or("");
1559 let resources: Vec<String> = event["resources"]
1560 .as_array()
1561 .map(|arr| {
1562 arr.iter()
1563 .filter_map(|v| v.as_str().map(|s| s.to_string()))
1564 .collect()
1565 })
1566 .unwrap_or_default();
1567
1568 let result = matches_pattern(
1569 Some(event_pattern),
1570 source,
1571 detail_type,
1572 &detail,
1573 account,
1574 region,
1575 &resources,
1576 );
1577
1578 Ok(AwsResponse::ok_json(json!({ "Result": result })))
1579 }
1580
1581 fn update_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1584 let body = req.json_body();
1585 validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
1586 validate_optional_string_length(
1587 "kmsKeyIdentifier",
1588 body["KmsKeyIdentifier"].as_str(),
1589 0,
1590 2048,
1591 )?;
1592 let name = body["Name"].as_str().unwrap_or("default");
1593
1594 let mut accounts = self.state.write();
1595 let state = accounts.get_or_create(&req.account_id);
1596 let bus = state.buses.get_mut(name).ok_or_else(|| {
1597 AwsServiceError::aws_error(
1598 StatusCode::BAD_REQUEST,
1599 "ResourceNotFoundException",
1600 format!("Event bus {name} does not exist."),
1601 )
1602 })?;
1603
1604 if let Some(desc) = body["Description"].as_str() {
1605 bus.description = Some(desc.to_string());
1606 }
1607 if let Some(kms) = body["KmsKeyIdentifier"].as_str() {
1608 bus.kms_key_identifier = Some(kms.to_string());
1609 }
1610 if let Some(dlc) = body.get("DeadLetterConfig") {
1611 bus.dead_letter_config = Some(dlc.clone());
1612 }
1613 bus.last_modified_time = Utc::now();
1614
1615 let arn = bus.arn.clone();
1616 let bus_name = bus.name.clone();
1617
1618 Ok(AwsResponse::ok_json(json!({
1619 "Arn": arn,
1620 "Name": bus_name,
1621 })))
1622 }
1623
1624 fn create_endpoint(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1627 let body = req.json_body();
1628 validate_required("Name", &body["Name"])?;
1629 let name = body["Name"]
1630 .as_str()
1631 .ok_or_else(|| missing("Name"))?
1632 .to_string();
1633 validate_string_length("name", &name, 1, 64)?;
1634 validate_required("RoutingConfig", &body["RoutingConfig"])?;
1635 validate_required("EventBuses", &body["EventBuses"])?;
1636
1637 let description = body["Description"].as_str().map(|s| s.to_string());
1638 let routing_config = body["RoutingConfig"].clone();
1639 let replication_config = body.get("ReplicationConfig").cloned();
1640 let event_buses = body["EventBuses"].as_array().cloned().unwrap_or_default();
1641 let role_arn = body["RoleArn"].as_str().map(|s| s.to_string());
1642
1643 let mut accounts = self.state.write();
1644 let state = accounts.get_or_create(&req.account_id);
1645 if state.endpoints.contains_key(&name) {
1646 return Err(AwsServiceError::aws_error(
1647 StatusCode::CONFLICT,
1648 "ResourceAlreadyExistsException",
1649 format!("Endpoint {name} already exists."),
1650 ));
1651 }
1652
1653 let endpoint_id = format!("{}.abc123", name);
1654 let arn = format!(
1655 "arn:aws:events:{}:{}:endpoint/{}",
1656 req.region, state.account_id, name
1657 );
1658 let endpoint_url = format!(
1659 "https://{}.endpoint.events.{}.amazonaws.com",
1660 endpoint_id, req.region
1661 );
1662 let now = Utc::now();
1663
1664 let endpoint = Endpoint {
1665 name: name.clone(),
1666 arn: arn.clone(),
1667 endpoint_id: endpoint_id.clone(),
1668 endpoint_url: Some(endpoint_url),
1669 description,
1670 routing_config: routing_config.clone(),
1671 replication_config: replication_config.clone(),
1672 event_buses: event_buses.clone(),
1673 role_arn: role_arn.clone(),
1674 state: "ACTIVE".to_string(),
1675 creation_time: now,
1676 last_modified_time: now,
1677 };
1678 state.endpoints.insert(name.clone(), endpoint);
1679
1680 let mut resp = json!({
1681 "Name": name,
1682 "Arn": arn,
1683 "State": "ACTIVE",
1684 "RoutingConfig": routing_config,
1685 "EventBuses": event_buses,
1686 });
1687 if let Some(ref rc) = replication_config {
1688 resp["ReplicationConfig"] = rc.clone();
1689 }
1690 if let Some(ref ra) = role_arn {
1691 resp["RoleArn"] = json!(ra);
1692 }
1693
1694 Ok(AwsResponse::ok_json(resp))
1695 }
1696
1697 fn delete_endpoint(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1698 let body = req.json_body();
1699 validate_required("Name", &body["Name"])?;
1700 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
1701
1702 let mut accounts = self.state.write();
1703 let state = accounts.get_or_create(&req.account_id);
1704 state.endpoints.remove(name).ok_or_else(|| {
1705 AwsServiceError::aws_error(
1706 StatusCode::BAD_REQUEST,
1707 "ResourceNotFoundException",
1708 format!("Endpoint '{name}' does not exist."),
1709 )
1710 })?;
1711
1712 Ok(AwsResponse::ok_json(json!({})))
1713 }
1714
1715 fn describe_endpoint(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1716 let body = req.json_body();
1717 validate_required("Name", &body["Name"])?;
1718 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
1719
1720 let accounts = self.state.read();
1721 let empty = EventBridgeState::new(&req.account_id, &req.region);
1722 let state = accounts.get(&req.account_id).unwrap_or(&empty);
1723 let ep = state.endpoints.get(name).ok_or_else(|| {
1724 AwsServiceError::aws_error(
1725 StatusCode::BAD_REQUEST,
1726 "ResourceNotFoundException",
1727 format!("Endpoint '{name}' does not exist."),
1728 )
1729 })?;
1730
1731 let mut resp = json!({
1732 "Name": ep.name,
1733 "Arn": ep.arn,
1734 "EndpointId": ep.endpoint_id,
1735 "State": ep.state,
1736 "RoutingConfig": ep.routing_config,
1737 "EventBuses": ep.event_buses,
1738 "CreationTime": ep.creation_time.timestamp() as f64,
1739 "LastModifiedTime": ep.last_modified_time.timestamp() as f64,
1740 });
1741 if let Some(ref url) = ep.endpoint_url {
1742 resp["EndpointUrl"] = json!(url);
1743 }
1744 if let Some(ref desc) = ep.description {
1745 resp["Description"] = json!(desc);
1746 }
1747 if let Some(ref rc) = ep.replication_config {
1748 resp["ReplicationConfig"] = rc.clone();
1749 }
1750 if let Some(ref ra) = ep.role_arn {
1751 resp["RoleArn"] = json!(ra);
1752 }
1753
1754 Ok(AwsResponse::ok_json(resp))
1755 }
1756
1757 fn list_endpoints(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1758 let body = req.json_body();
1759 validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 64)?;
1760 validate_optional_string_length("homeRegion", body["HomeRegion"].as_str(), 9, 20)?;
1761 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1762 validate_optional_range_i64("maxResults", body["MaxResults"].as_i64(), 1, 100)?;
1763 let name_prefix = body["NamePrefix"].as_str();
1764 let limit = body["MaxResults"].as_i64().unwrap_or(100) as usize;
1765
1766 let accounts = self.state.read();
1767 let empty = EventBridgeState::new(&req.account_id, &req.region);
1768 let state = accounts.get(&req.account_id).unwrap_or(&empty);
1769 let all: Vec<Value> = state
1770 .endpoints
1771 .values()
1772 .filter(|ep| match name_prefix {
1773 Some(prefix) => ep.name.starts_with(prefix),
1774 None => true,
1775 })
1776 .map(|ep| {
1777 let mut obj = json!({
1778 "Name": ep.name,
1779 "Arn": ep.arn,
1780 "EndpointId": ep.endpoint_id,
1781 "State": ep.state,
1782 "RoutingConfig": ep.routing_config,
1783 "EventBuses": ep.event_buses,
1784 "CreationTime": ep.creation_time.timestamp() as f64,
1785 "LastModifiedTime": ep.last_modified_time.timestamp() as f64,
1786 });
1787 if let Some(ref url) = ep.endpoint_url {
1788 obj["EndpointUrl"] = json!(url);
1789 }
1790 obj
1791 })
1792 .collect();
1793
1794 let (endpoints, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
1795 let mut resp = json!({ "Endpoints": endpoints });
1796 if let Some(token) = next_token {
1797 resp["NextToken"] = json!(token);
1798 }
1799
1800 Ok(AwsResponse::ok_json(resp))
1801 }
1802
1803 fn update_endpoint(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1804 let body = req.json_body();
1805 validate_required("Name", &body["Name"])?;
1806 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
1807
1808 let mut accounts = self.state.write();
1809 let state = accounts.get_or_create(&req.account_id);
1810 let ep = state.endpoints.get_mut(name).ok_or_else(|| {
1811 AwsServiceError::aws_error(
1812 StatusCode::BAD_REQUEST,
1813 "ResourceNotFoundException",
1814 format!("Endpoint '{name}' does not exist."),
1815 )
1816 })?;
1817
1818 if let Some(desc) = body["Description"].as_str() {
1819 ep.description = Some(desc.to_string());
1820 }
1821 if !body["RoutingConfig"].is_null() {
1822 ep.routing_config = body["RoutingConfig"].clone();
1823 }
1824 if let Some(rc) = body.get("ReplicationConfig") {
1825 ep.replication_config = Some(rc.clone());
1826 }
1827 if let Some(buses) = body["EventBuses"].as_array() {
1828 ep.event_buses = buses.clone();
1829 }
1830 if let Some(ra) = body["RoleArn"].as_str() {
1831 ep.role_arn = Some(ra.to_string());
1832 }
1833 ep.last_modified_time = Utc::now();
1834
1835 let resp = json!({
1836 "Name": ep.name,
1837 "Arn": ep.arn,
1838 "EndpointId": ep.endpoint_id,
1839 "State": ep.state,
1840 "RoutingConfig": ep.routing_config,
1841 "EventBuses": ep.event_buses,
1842 });
1843
1844 Ok(AwsResponse::ok_json(resp))
1845 }
1846
1847 fn deauthorize_connection(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1850 let body = req.json_body();
1851 validate_required("Name", &body["Name"])?;
1852 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
1853 validate_string_length("name", name, 1, 64)?;
1854
1855 let mut accounts = self.state.write();
1856 let state = accounts.get_or_create(&req.account_id);
1857 let conn = state.connections.get_mut(name).ok_or_else(|| {
1858 AwsServiceError::aws_error(
1859 StatusCode::BAD_REQUEST,
1860 "ResourceNotFoundException",
1861 format!("Connection '{name}' does not exist."),
1862 )
1863 })?;
1864
1865 conn.connection_state = "DEAUTHORIZING".to_string();
1866 conn.last_modified_time = Utc::now();
1867
1868 let resp = json!({
1869 "ConnectionArn": conn.arn,
1870 "ConnectionState": conn.connection_state,
1871 "CreationTime": conn.creation_time.timestamp() as f64,
1872 "LastModifiedTime": conn.last_modified_time.timestamp() as f64,
1873 "LastAuthorizedTime": conn.last_authorized_time.timestamp() as f64,
1874 });
1875
1876 Ok(AwsResponse::ok_json(resp))
1877 }
1878
1879 fn put_events(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1882 let body = req.json_body();
1883 validate_required("Entries", &body["Entries"])?;
1884 validate_optional_string_length("endpointId", body["EndpointId"].as_str(), 1, 50)?;
1885 let entries = body["Entries"]
1886 .as_array()
1887 .ok_or_else(|| missing("Entries"))?;
1888
1889 if entries.is_empty() {
1891 return Err(AwsServiceError::aws_error(
1892 StatusCode::BAD_REQUEST,
1893 "ValidationException",
1894 "1 validation error detected: Value '[PutEventsRequestEntry]' at 'entries' failed to satisfy constraint: Member must have length greater than or equal to 1",
1895 ));
1896 }
1897 if entries.len() > 10 {
1898 return Err(AwsServiceError::aws_error(
1899 StatusCode::BAD_REQUEST,
1900 "ValidationException",
1901 "1 validation error detected: Value '[PutEventsRequestEntry]' at 'entries' failed to satisfy constraint: Member must have length less than or equal to 10",
1902 ));
1903 }
1904
1905 let mut accounts = self.state.write();
1906 let state = accounts.get_or_create(&req.account_id);
1907 let mut result_entries = Vec::new();
1908 let mut events_to_deliver = Vec::new();
1909 let mut failed_count = 0;
1910
1911 for entry in entries {
1912 let source = entry["Source"].as_str().unwrap_or("").to_string();
1913 let detail_type = entry["DetailType"].as_str().unwrap_or("").to_string();
1914 let detail = entry["Detail"].as_str().unwrap_or("").to_string();
1915
1916 if let Err(error) = validate_put_events_entry(&source, &detail_type, &detail) {
1917 failed_count += 1;
1918 result_entries.push(error);
1919 continue;
1920 }
1921
1922 let event_id = uuid::Uuid::new_v4().to_string();
1923 let raw_bus = entry["EventBusName"]
1924 .as_str()
1925 .unwrap_or("default")
1926 .to_string();
1927 let event_bus_name = state.resolve_bus_name(&raw_bus);
1928 let time = parse_put_events_time(&entry["Time"]);
1929 let resources: Vec<String> = entry["Resources"]
1930 .as_array()
1931 .map(|arr| {
1932 arr.iter()
1933 .filter_map(|v| v.as_str().map(|s| s.to_string()))
1934 .collect()
1935 })
1936 .unwrap_or_default();
1937
1938 let event = PutEvent {
1939 event_id: event_id.clone(),
1940 source: source.clone(),
1941 detail_type: detail_type.clone(),
1942 detail: detail.clone(),
1943 event_bus_name: event_bus_name.clone(),
1944 time,
1945 resources: resources.clone(),
1946 };
1947
1948 archive_matching_event(
1949 state,
1950 &event,
1951 &event_bus_name,
1952 &source,
1953 &detail_type,
1954 &detail,
1955 &req.account_id,
1956 &req.region,
1957 &resources,
1958 );
1959
1960 state.events.push(event);
1961
1962 let matching_targets: Vec<EventTarget> = state
1964 .rules
1965 .values()
1966 .filter(|r| {
1967 r.event_bus_name == event_bus_name
1968 && r.state == "ENABLED"
1969 && matches_pattern(
1970 r.event_pattern.as_deref(),
1971 &source,
1972 &detail_type,
1973 &detail,
1974 &req.account_id,
1975 &req.region,
1976 &resources,
1977 )
1978 })
1979 .flat_map(|r| r.targets.clone())
1980 .collect();
1981
1982 if !matching_targets.is_empty() {
1983 events_to_deliver.push((
1984 event_id.clone(),
1985 source,
1986 detail_type,
1987 detail,
1988 time,
1989 resources,
1990 matching_targets,
1991 ));
1992 }
1993
1994 result_entries.push(json!({ "EventId": event_id }));
1995 }
1996
1997 drop(accounts);
1999
2000 for (event_id, source, detail_type, detail, time, resources, targets) in events_to_deliver {
2002 let detail_value: Value = serde_json::from_str(&detail).unwrap_or(json!({}));
2003 let event_json = json!({
2004 "version": "0",
2005 "id": event_id,
2006 "source": source,
2007 "account": req.account_id,
2008 "detail-type": detail_type,
2009 "detail": detail_value,
2010 "time": time.format("%Y-%m-%dT%H:%M:%SZ").to_string(),
2011 "region": req.region,
2012 "resources": resources,
2013 });
2014 let event_str = event_json.to_string();
2015
2016 for target in targets {
2017 let arn = &target.arn;
2018 let body_str = if let Some(ref transformer) = target.input_transformer {
2020 apply_input_transformer(transformer, &event_json)
2021 } else if let Some(ref input) = target.input {
2022 input.clone()
2023 } else if let Some(ref input_path) = target.input_path {
2024 resolve_json_path(&event_json, input_path)
2025 .map(|v| v.to_string())
2026 .unwrap_or_else(|| event_str.clone())
2027 } else {
2028 event_str.clone()
2029 };
2030
2031 if arn.contains(":sqs:") {
2032 let group_id = target
2034 .sqs_parameters
2035 .as_ref()
2036 .and_then(|p| p["MessageGroupId"].as_str())
2037 .map(|s| s.to_string());
2038 if group_id.is_some() {
2039 self.delivery.send_to_sqs_with_attrs(
2043 arn,
2044 &body_str,
2045 &HashMap::new(),
2046 group_id.as_deref(),
2047 None,
2048 );
2049 } else {
2050 self.delivery.send_to_sqs(arn, &body_str, &HashMap::new());
2051 }
2052 } else if arn.contains(":sns:") {
2053 self.delivery
2054 .publish_to_sns(arn, &body_str, Some(&detail_type));
2055 } else if arn.contains(":lambda:") {
2056 tracing::info!(
2057 function_arn = %arn,
2058 payload = %body_str,
2059 "EventBridge delivering to Lambda function"
2060 );
2061 let now = Utc::now();
2062 let mut accounts = self.state.write();
2063 let state = accounts.get_or_create(&req.account_id);
2064 state
2065 .lambda_invocations
2066 .push(crate::state::LambdaInvocation {
2067 function_arn: arn.clone(),
2068 payload: body_str.clone(),
2069 timestamp: now,
2070 });
2071 drop(accounts);
2072 if let Some(ref ls) = self.lambda_state {
2074 ls.write().default_mut().invocations.push(LambdaInvocation {
2075 function_arn: arn.clone(),
2076 payload: body_str.clone(),
2077 timestamp: now,
2078 source: "aws:events".to_string(),
2079 });
2080 }
2081 invoke_lambda_async(
2083 &self.container_runtime,
2084 &self.lambda_state,
2085 arn,
2086 &body_str,
2087 );
2088 } else if arn.contains(":logs:") {
2089 tracing::info!(
2090 log_group_arn = %arn,
2091 payload = %body_str,
2092 "EventBridge delivering to CloudWatch Logs"
2093 );
2094 let now = Utc::now();
2095 let mut accounts = self.state.write();
2096 let state = accounts.get_or_create(&req.account_id);
2097 state.log_deliveries.push(crate::state::LogDelivery {
2098 log_group_arn: arn.clone(),
2099 payload: body_str.clone(),
2100 timestamp: now,
2101 });
2102 drop(accounts);
2103 if let Some(ref log_state) = self.logs_state {
2105 deliver_to_logs(log_state, arn, &body_str, now);
2106 }
2107 } else if arn.contains(":kinesis:") {
2108 tracing::info!(
2109 stream_arn = %arn,
2110 "EventBridge delivering to Kinesis stream"
2111 );
2112 self.delivery.send_to_kinesis(arn, &body_str, &event_id);
2114 } else if arn.contains(":states:") {
2115 tracing::info!(
2116 state_machine_arn = %arn,
2117 "EventBridge delivering to Step Functions"
2118 );
2119 self.delivery.start_stepfunctions_execution(arn, &body_str);
2120 let mut accounts = self.state.write();
2121 let state = accounts.get_or_create(&req.account_id);
2122 state
2123 .step_function_executions
2124 .push(crate::state::StepFunctionExecution {
2125 state_machine_arn: arn.clone(),
2126 payload: body_str.clone(),
2127 timestamp: Utc::now(),
2128 });
2129 } else if arn.contains(":api-destination/") {
2130 let accounts = self.state.read();
2132 let empty = EventBridgeState::new(&req.account_id, &req.region);
2133 let state = accounts.get(&req.account_id).unwrap_or(&empty);
2134 let dest = state.api_destinations.values().find(|d| d.arn == *arn);
2135 if let Some(dest) = dest {
2136 let url = dest.invocation_endpoint.clone();
2137 let method = dest.http_method.clone();
2138 let conn = state
2139 .connections
2140 .values()
2141 .find(|c| c.arn == dest.connection_arn)
2142 .cloned();
2143 drop(accounts);
2144
2145 let payload = body_str.clone();
2146 tokio::spawn(async move {
2147 let client = reqwest::Client::new();
2148 let mut req_builder = match method.as_str() {
2149 "GET" => client.get(&url),
2150 "PUT" => client.put(&url),
2151 "DELETE" => client.delete(&url),
2152 "PATCH" => client.patch(&url),
2153 "HEAD" => client.head(&url),
2154 _ => client.post(&url),
2155 };
2156 req_builder = req_builder.header("Content-Type", "application/json");
2157
2158 if let Some(conn) = conn {
2160 req_builder = apply_connection_auth(req_builder, &conn);
2161 }
2162
2163 let result = req_builder.body(payload).send().await;
2164 if let Err(e) = result {
2165 tracing::warn!(
2166 endpoint = %url,
2167 error = %e,
2168 "EventBridge ApiDestination delivery failed"
2169 );
2170 }
2171 });
2172 }
2173 } else if arn.starts_with("https://") || arn.starts_with("http://") {
2174 let url = arn.clone();
2176 let payload = body_str.clone();
2177 tokio::spawn(async move {
2178 let client = reqwest::Client::new();
2179 let result = client
2180 .post(&url)
2181 .header("Content-Type", "application/json")
2182 .body(payload)
2183 .send()
2184 .await;
2185 if let Err(e) = result {
2186 tracing::warn!(
2187 endpoint = %url,
2188 error = %e,
2189 "EventBridge HTTP target delivery failed"
2190 );
2191 }
2192 });
2193 }
2194 }
2195 }
2196
2197 let resp = json!({
2198 "FailedEntryCount": failed_count,
2199 "Entries": result_entries,
2200 });
2201
2202 Ok(AwsResponse::ok_json(resp))
2203 }
2204
2205 fn tag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2208 let body = req.json_body();
2209 validate_required("ResourceARN", &body["ResourceARN"])?;
2210 let arn = body["ResourceARN"]
2211 .as_str()
2212 .ok_or_else(|| missing("ResourceARN"))?;
2213 validate_string_length("resourceARN", arn, 1, 1600)?;
2214 validate_required("Tags", &body["Tags"])?;
2215
2216 let mut accounts = self.state.write();
2217 let state = accounts.get_or_create(&req.account_id);
2218 let tag_map = find_tags_mut(state, arn)?;
2219
2220 fakecloud_core::tags::apply_tags(tag_map, &body, "Tags", "Key", "Value").map_err(|f| {
2221 AwsServiceError::aws_error(
2222 StatusCode::BAD_REQUEST,
2223 "ValidationException",
2224 format!("{f} must be a list"),
2225 )
2226 })?;
2227
2228 Ok(AwsResponse::ok_json(json!({})))
2229 }
2230
2231 fn untag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2232 let body = req.json_body();
2233 validate_required("ResourceARN", &body["ResourceARN"])?;
2234 let arn = body["ResourceARN"]
2235 .as_str()
2236 .ok_or_else(|| missing("ResourceARN"))?;
2237 validate_string_length("resourceARN", arn, 1, 1600)?;
2238 validate_required("TagKeys", &body["TagKeys"])?;
2239
2240 let mut accounts = self.state.write();
2241 let state = accounts.get_or_create(&req.account_id);
2242 let tag_map = find_tags_mut(state, arn)?;
2243
2244 fakecloud_core::tags::remove_tags(tag_map, &body, "TagKeys").map_err(|f| {
2245 AwsServiceError::aws_error(
2246 StatusCode::BAD_REQUEST,
2247 "ValidationException",
2248 format!("{f} must be a list"),
2249 )
2250 })?;
2251
2252 Ok(AwsResponse::ok_json(json!({})))
2253 }
2254
2255 fn list_tags_for_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2256 let body = req.json_body();
2257 validate_required("ResourceARN", &body["ResourceARN"])?;
2258 let arn = body["ResourceARN"]
2259 .as_str()
2260 .ok_or_else(|| missing("ResourceARN"))?;
2261 validate_string_length("resourceARN", arn, 1, 1600)?;
2262
2263 let accounts = self.state.read();
2264 let empty = EventBridgeState::new(&req.account_id, &req.region);
2265 let state = accounts.get(&req.account_id).unwrap_or(&empty);
2266 let tag_map = find_tags(state, arn)?;
2267
2268 let tags = fakecloud_core::tags::tags_to_json(tag_map, "Key", "Value");
2269
2270 Ok(AwsResponse::ok_json(json!({ "Tags": tags })))
2271 }
2272
2273 fn create_archive(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2276 let body = req.json_body();
2277 validate_required("ArchiveName", &body["ArchiveName"])?;
2278 let name = body["ArchiveName"]
2279 .as_str()
2280 .ok_or_else(|| missing("ArchiveName"))?
2281 .to_string();
2282 validate_string_length("archiveName", &name, 1, 48)?;
2283 validate_required("EventSourceArn", &body["EventSourceArn"])?;
2284 let event_source_arn = body["EventSourceArn"]
2285 .as_str()
2286 .ok_or_else(|| missing("EventSourceArn"))?
2287 .to_string();
2288 validate_string_length("eventSourceArn", &event_source_arn, 1, 1600)?;
2289 validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2290 validate_optional_string_length("eventPattern", body["EventPattern"].as_str(), 0, 4096)?;
2291 if let Some(rd) = body["RetentionDays"].as_i64() {
2292 validate_range_i64("retentionDays", rd, 0, i64::MAX)?;
2293 }
2294 let description = body["Description"].as_str().map(|s| s.to_string());
2295 let event_pattern = body["EventPattern"].as_str().map(|s| s.to_string());
2296 let retention_days = body["RetentionDays"].as_i64().unwrap_or(0);
2297
2298 if let Some(ref pattern) = event_pattern {
2300 validate_event_pattern(pattern)?;
2301 }
2302
2303 let mut accounts = self.state.write();
2304 let state = accounts.get_or_create(&req.account_id);
2305
2306 let bus_name = state.resolve_bus_name(&event_source_arn);
2308 if !state.buses.contains_key(&bus_name) {
2309 return Err(AwsServiceError::aws_error(
2310 StatusCode::BAD_REQUEST,
2311 "ResourceNotFoundException",
2312 format!("Event bus {bus_name} does not exist."),
2313 ));
2314 }
2315
2316 if state.archives.contains_key(&name) {
2318 return Err(AwsServiceError::aws_error(
2319 StatusCode::BAD_REQUEST,
2320 "ResourceAlreadyExistsException",
2321 format!("Archive {name} already exists."),
2322 ));
2323 }
2324
2325 let now = Utc::now();
2326 let arn = format!(
2327 "arn:aws:events:{}:{}:archive/{}",
2328 req.region, state.account_id, name
2329 );
2330
2331 let archive = Archive {
2332 name: name.clone(),
2333 arn: arn.clone(),
2334 event_source_arn: event_source_arn.clone(),
2335 description,
2336 event_pattern: event_pattern.clone(),
2337 retention_days,
2338 state: "ENABLED".to_string(),
2339 creation_time: now,
2340 event_count: 0,
2341 size_bytes: 0,
2342 events: Vec::new(),
2343 };
2344 state.archives.insert(name.clone(), archive);
2345
2346 let rule_name = format!("Events-Archive-{name}");
2348 let rule_arn = format!(
2349 "arn:aws:events:{}:{}:rule/{}",
2350 req.region, state.account_id, rule_name
2351 );
2352 let rule_event_pattern = {
2354 let mut merged = if let Some(ref ep) = event_pattern {
2355 serde_json::from_str::<Value>(ep).unwrap_or_else(|_| json!({}))
2356 } else {
2357 json!({})
2358 };
2359 if let Some(obj) = merged.as_object_mut() {
2360 obj.insert("replay-name".to_string(), json!([{"exists": false}]));
2361 }
2362 serde_json::to_string(&merged).unwrap_or_default()
2363 };
2364
2365 let archive_target = EventTarget {
2367 id: name.clone(),
2368 arn: format!("arn:aws:events:{}:::", req.region),
2369 input: None,
2370 input_path: None,
2371 input_transformer: Some(json!({
2372 "InputPathsMap": {},
2373 "InputTemplate": format!(
2374 "{{\"archive-arn\": \"{}\", \"event\": <aws.events.event.json>, \"ingestion-time\": <aws.events.event.ingestion-time>}}",
2375 arn
2376 )
2377 })),
2378 sqs_parameters: None,
2379 };
2380
2381 let archive_rule = EventRule {
2382 name: rule_name.clone(),
2383 arn: rule_arn,
2384 event_bus_name: bus_name.clone(),
2385 event_pattern: Some(rule_event_pattern),
2386 schedule_expression: None,
2387 state: "ENABLED".to_string(),
2388 description: None,
2389 role_arn: None,
2390 managed_by: Some("prod.vhs.events.aws.internal".to_string()),
2391 created_by: Some(state.account_id.clone()),
2392 targets: vec![archive_target],
2393 tags: HashMap::new(),
2394 last_fired: None,
2395 };
2396 let key = (bus_name, rule_name);
2397 state.rules.insert(key, archive_rule);
2398
2399 Ok(AwsResponse::ok_json(json!({
2400 "ArchiveArn": arn,
2401 "CreationTime": now.timestamp() as f64,
2402 "State": "ENABLED",
2403 })))
2404 }
2405
2406 fn describe_archive(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2407 let body = req.json_body();
2408 validate_required("ArchiveName", &body["ArchiveName"])?;
2409 let name = body["ArchiveName"]
2410 .as_str()
2411 .ok_or_else(|| missing("ArchiveName"))?;
2412 validate_string_length("archiveName", name, 1, 48)?;
2413
2414 let accounts = self.state.read();
2415 let empty = EventBridgeState::new(&req.account_id, &req.region);
2416 let state = accounts.get(&req.account_id).unwrap_or(&empty);
2417 let archive = state.archives.get(name).ok_or_else(|| {
2418 AwsServiceError::aws_error(
2419 StatusCode::BAD_REQUEST,
2420 "ResourceNotFoundException",
2421 format!("Archive {name} does not exist."),
2422 )
2423 })?;
2424
2425 let mut resp = json!({
2426 "ArchiveArn": archive.arn,
2427 "ArchiveName": archive.name,
2428 "CreationTime": archive.creation_time.timestamp() as f64,
2429 "EventCount": archive.event_count,
2430 "EventSourceArn": archive.event_source_arn,
2431 "RetentionDays": archive.retention_days,
2432 "SizeBytes": archive.size_bytes,
2433 "State": archive.state,
2434 });
2435 if let Some(ref desc) = archive.description {
2436 resp["Description"] = json!(desc);
2437 }
2438 if let Some(ref ep) = archive.event_pattern {
2439 resp["EventPattern"] = json!(ep);
2440 }
2441
2442 Ok(AwsResponse::ok_json(resp))
2443 }
2444
2445 fn list_archives(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2446 let body = req.json_body();
2447 validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 48)?;
2448 validate_optional_string_length(
2449 "eventSourceArn",
2450 body["EventSourceArn"].as_str(),
2451 1,
2452 1600,
2453 )?;
2454 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
2455 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
2456 let name_prefix = body["NamePrefix"].as_str();
2457 let source_arn = body["EventSourceArn"].as_str();
2458 let archive_state = body["State"].as_str();
2459
2460 let filter_count = [
2462 name_prefix.is_some(),
2463 source_arn.is_some(),
2464 archive_state.is_some(),
2465 ]
2466 .iter()
2467 .filter(|&&x| x)
2468 .count();
2469 if filter_count > 1 {
2470 return Err(AwsServiceError::aws_error(
2471 StatusCode::BAD_REQUEST,
2472 "ValidationException",
2473 "At most one filter is allowed for ListArchives. Use either : State, EventSourceArn, or NamePrefix.",
2474 ));
2475 }
2476
2477 if let Some(s) = archive_state {
2479 let valid = [
2480 "ENABLED",
2481 "DISABLED",
2482 "CREATING",
2483 "UPDATING",
2484 "CREATE_FAILED",
2485 "UPDATE_FAILED",
2486 ];
2487 if !valid.contains(&s) {
2488 return Err(AwsServiceError::aws_error(
2489 StatusCode::BAD_REQUEST,
2490 "ValidationException",
2491 format!(
2492 "1 validation error detected: Value '{}' at 'state' failed to satisfy constraint: Member must satisfy enum value set: [ENABLED, DISABLED, CREATING, UPDATING, CREATE_FAILED, UPDATE_FAILED]",
2493 s
2494 ),
2495 ));
2496 }
2497 }
2498
2499 let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
2500
2501 let accounts = self.state.read();
2502 let empty = EventBridgeState::new(&req.account_id, &req.region);
2503 let state = accounts.get(&req.account_id).unwrap_or(&empty);
2504 let all: Vec<Value> = state
2505 .archives
2506 .values()
2507 .filter(|a| {
2508 if let Some(prefix) = name_prefix {
2509 a.name.starts_with(prefix)
2510 } else if let Some(arn) = source_arn {
2511 a.event_source_arn == arn
2512 } else if let Some(s) = archive_state {
2513 a.state == s
2514 } else {
2515 true
2516 }
2517 })
2518 .map(|a| {
2519 json!({
2520 "ArchiveName": a.name,
2521 "CreationTime": a.creation_time.timestamp() as f64,
2522 "EventCount": a.event_count,
2523 "EventSourceArn": a.event_source_arn,
2524 "RetentionDays": a.retention_days,
2525 "SizeBytes": a.size_bytes,
2526 "State": a.state,
2527 })
2528 })
2529 .collect();
2530
2531 let (archives, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
2532 let mut resp = json!({ "Archives": archives });
2533 if let Some(token) = next_token {
2534 resp["NextToken"] = json!(token);
2535 }
2536
2537 Ok(AwsResponse::ok_json(resp))
2538 }
2539
2540 fn update_archive(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2541 let body = req.json_body();
2542 validate_required("ArchiveName", &body["ArchiveName"])?;
2543 let name = body["ArchiveName"]
2544 .as_str()
2545 .ok_or_else(|| missing("ArchiveName"))?;
2546 validate_string_length("archiveName", name, 1, 48)?;
2547 validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2548 validate_optional_string_length("eventPattern", body["EventPattern"].as_str(), 0, 4096)?;
2549 if let Some(rd) = body["RetentionDays"].as_i64() {
2550 validate_range_i64("retentionDays", rd, 0, i64::MAX)?;
2551 }
2552
2553 if let Some(pattern) = body["EventPattern"].as_str() {
2555 validate_event_pattern(pattern)?;
2556 }
2557
2558 let mut accounts = self.state.write();
2559 let state = accounts.get_or_create(&req.account_id);
2560 let archive = state.archives.get_mut(name).ok_or_else(|| {
2561 AwsServiceError::aws_error(
2562 StatusCode::BAD_REQUEST,
2563 "ResourceNotFoundException",
2564 format!("Archive {name} does not exist."),
2565 )
2566 })?;
2567
2568 if let Some(desc) = body["Description"].as_str() {
2569 archive.description = Some(desc.to_string());
2570 }
2571 if let Some(pattern) = body["EventPattern"].as_str() {
2572 archive.event_pattern = Some(pattern.to_string());
2573 }
2574 if let Some(days) = body["RetentionDays"].as_i64() {
2575 archive.retention_days = days;
2576 }
2577
2578 Ok(AwsResponse::ok_json(json!({
2579 "ArchiveArn": archive.arn,
2580 "CreationTime": archive.creation_time.timestamp() as f64,
2581 "State": archive.state,
2582 })))
2583 }
2584
2585 fn delete_archive(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2586 let body = req.json_body();
2587 validate_required("ArchiveName", &body["ArchiveName"])?;
2588 let name = body["ArchiveName"]
2589 .as_str()
2590 .ok_or_else(|| missing("ArchiveName"))?;
2591 validate_string_length("archiveName", name, 1, 48)?;
2592
2593 let mut accounts = self.state.write();
2594 let state = accounts.get_or_create(&req.account_id);
2595 if !state.archives.contains_key(name) {
2596 return Err(AwsServiceError::aws_error(
2597 StatusCode::BAD_REQUEST,
2598 "ResourceNotFoundException",
2599 format!("Archive {name} does not exist."),
2600 ));
2601 }
2602
2603 state.archives.remove(name);
2604
2605 let rule_name = format!("Events-Archive-{name}");
2607 state.rules.retain(|k, _| k.1 != rule_name);
2608
2609 Ok(AwsResponse::ok_json(json!({})))
2610 }
2611
2612 fn create_connection(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2615 let body = req.json_body();
2616 validate_required("Name", &body["Name"])?;
2617 let name = body["Name"]
2618 .as_str()
2619 .ok_or_else(|| missing("Name"))?
2620 .to_string();
2621 validate_string_length("name", &name, 1, 64)?;
2622 validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2623 validate_required("AuthorizationType", &body["AuthorizationType"])?;
2624 let description = body["Description"].as_str().map(|s| s.to_string());
2625 let auth_type = body["AuthorizationType"]
2626 .as_str()
2627 .ok_or_else(|| missing("AuthorizationType"))?
2628 .to_string();
2629 validate_enum(
2630 "authorizationType",
2631 &auth_type,
2632 &["BASIC", "OAUTH_CLIENT_CREDENTIALS", "API_KEY"],
2633 )?;
2634 validate_optional_string_length(
2635 "kmsKeyIdentifier",
2636 body["KmsKeyIdentifier"].as_str(),
2637 0,
2638 2048,
2639 )?;
2640 validate_required("AuthParameters", &body["AuthParameters"])?;
2641 let auth_params = body["AuthParameters"].clone();
2642
2643 let mut accounts = self.state.write();
2644 let state = accounts.get_or_create(&req.account_id);
2645 let now = Utc::now();
2646 let conn_uuid = uuid::Uuid::new_v4();
2647 let arn = format!(
2648 "arn:aws:events:{}:{}:connection/{}/{}",
2649 req.region, state.account_id, name, conn_uuid
2650 );
2651 let secret_arn = format!(
2652 "arn:aws:secretsmanager:{}:{}:secret:events!connection/{}/{}",
2653 req.region, state.account_id, name, conn_uuid
2654 );
2655
2656 let conn = Connection {
2657 name: name.clone(),
2658 arn: arn.clone(),
2659 description,
2660 authorization_type: auth_type.clone(),
2661 auth_parameters: auth_params,
2662 connection_state: "AUTHORIZED".to_string(),
2663 secret_arn: secret_arn.clone(),
2664 creation_time: now,
2665 last_modified_time: now,
2666 last_authorized_time: now,
2667 };
2668 state.connections.insert(name, conn);
2669
2670 Ok(AwsResponse::ok_json(json!({
2671 "ConnectionArn": arn,
2672 "ConnectionState": "AUTHORIZED",
2673 "CreationTime": now.timestamp() as f64,
2674 "LastModifiedTime": now.timestamp() as f64,
2675 })))
2676 }
2677
2678 fn describe_connection(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2679 let body = req.json_body();
2680 validate_required("Name", &body["Name"])?;
2681 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
2682 validate_string_length("name", name, 1, 64)?;
2683
2684 let accounts = self.state.read();
2685 let empty = EventBridgeState::new(&req.account_id, &req.region);
2686 let state = accounts.get(&req.account_id).unwrap_or(&empty);
2687 let conn = state.connections.get(name).ok_or_else(|| {
2688 AwsServiceError::aws_error(
2689 StatusCode::BAD_REQUEST,
2690 "ResourceNotFoundException",
2691 format!("Connection '{name}' does not exist."),
2692 )
2693 })?;
2694
2695 let auth_params_response =
2697 build_auth_params_response(&conn.authorization_type, &conn.auth_parameters);
2698
2699 let mut resp = json!({
2700 "ConnectionArn": conn.arn,
2701 "Name": conn.name,
2702 "AuthorizationType": conn.authorization_type,
2703 "AuthParameters": auth_params_response,
2704 "ConnectionState": conn.connection_state,
2705 "SecretArn": conn.secret_arn,
2706 "CreationTime": conn.creation_time.timestamp() as f64,
2707 "LastModifiedTime": conn.last_modified_time.timestamp() as f64,
2708 "LastAuthorizedTime": conn.last_authorized_time.timestamp() as f64,
2709 });
2710 if let Some(ref desc) = conn.description {
2711 resp["Description"] = json!(desc);
2712 }
2713
2714 Ok(AwsResponse::ok_json(resp))
2715 }
2716
2717 fn list_connections(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2718 let body = req.json_body();
2719 validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 64)?;
2720 validate_optional_enum(
2721 "connectionState",
2722 body["ConnectionState"].as_str(),
2723 &[
2724 "CREATING",
2725 "UPDATING",
2726 "DELETING",
2727 "AUTHORIZED",
2728 "DEAUTHORIZED",
2729 "AUTHORIZING",
2730 "DEAUTHORIZING",
2731 "ACTIVE",
2732 "FAILED_CONNECTIVITY",
2733 ],
2734 )?;
2735 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
2736 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
2737
2738 let name_prefix = body["NamePrefix"].as_str();
2739 let connection_state = body["ConnectionState"].as_str();
2740 let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
2741
2742 let accounts = self.state.read();
2743 let empty = EventBridgeState::new(&req.account_id, &req.region);
2744 let state = accounts.get(&req.account_id).unwrap_or(&empty);
2745 let all: Vec<Value> = state
2746 .connections
2747 .values()
2748 .filter(|c| {
2749 if let Some(prefix) = name_prefix {
2750 if !c.name.starts_with(prefix) {
2751 return false;
2752 }
2753 }
2754 if let Some(cs) = connection_state {
2755 if c.connection_state != cs {
2756 return false;
2757 }
2758 }
2759 true
2760 })
2761 .map(|c| {
2762 json!({
2763 "ConnectionArn": c.arn,
2764 "Name": c.name,
2765 "AuthorizationType": c.authorization_type,
2766 "ConnectionState": c.connection_state,
2767 "CreationTime": c.creation_time.timestamp() as f64,
2768 "LastModifiedTime": c.last_modified_time.timestamp() as f64,
2769 "LastAuthorizedTime": c.last_authorized_time.timestamp() as f64,
2770 })
2771 })
2772 .collect();
2773
2774 let (conns, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
2775 let mut resp = json!({ "Connections": conns });
2776 if let Some(token) = next_token {
2777 resp["NextToken"] = json!(token);
2778 }
2779
2780 Ok(AwsResponse::ok_json(resp))
2781 }
2782
2783 fn update_connection(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2784 let body = req.json_body();
2785 validate_required("Name", &body["Name"])?;
2786 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
2787 validate_string_length("name", name, 1, 64)?;
2788 validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2789 validate_optional_enum(
2790 "authorizationType",
2791 body["AuthorizationType"].as_str(),
2792 &["BASIC", "OAUTH_CLIENT_CREDENTIALS", "API_KEY"],
2793 )?;
2794
2795 let mut accounts = self.state.write();
2796 let state = accounts.get_or_create(&req.account_id);
2797 let conn = state.connections.get_mut(name).ok_or_else(|| {
2798 AwsServiceError::aws_error(
2799 StatusCode::BAD_REQUEST,
2800 "ResourceNotFoundException",
2801 format!("Connection '{name}' does not exist."),
2802 )
2803 })?;
2804
2805 if let Some(desc) = body["Description"].as_str() {
2806 conn.description = Some(desc.to_string());
2807 }
2808 if let Some(auth_type) = body["AuthorizationType"].as_str() {
2809 conn.authorization_type = auth_type.to_string();
2810 }
2811 if body.get("AuthParameters").is_some() {
2812 conn.auth_parameters = body["AuthParameters"].clone();
2813 }
2814 conn.last_modified_time = Utc::now();
2815
2816 Ok(AwsResponse::ok_json(json!({
2817 "ConnectionArn": conn.arn,
2818 "ConnectionState": conn.connection_state,
2819 "CreationTime": conn.creation_time.timestamp() as f64,
2820 "LastModifiedTime": conn.last_modified_time.timestamp() as f64,
2821 "LastAuthorizedTime": conn.last_authorized_time.timestamp() as f64,
2822 })))
2823 }
2824
2825 fn delete_connection(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2826 let body = req.json_body();
2827 validate_required("Name", &body["Name"])?;
2828 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
2829 validate_string_length("name", name, 1, 64)?;
2830
2831 let mut accounts = self.state.write();
2832 let state = accounts.get_or_create(&req.account_id);
2833 let conn = state.connections.remove(name).ok_or_else(|| {
2834 AwsServiceError::aws_error(
2835 StatusCode::BAD_REQUEST,
2836 "ResourceNotFoundException",
2837 format!("Connection '{name}' does not exist."),
2838 )
2839 })?;
2840
2841 Ok(AwsResponse::ok_json(json!({
2842 "ConnectionArn": conn.arn,
2843 "ConnectionState": conn.connection_state,
2844 "CreationTime": conn.creation_time.timestamp() as f64,
2845 "LastModifiedTime": conn.last_modified_time.timestamp() as f64,
2846 "LastAuthorizedTime": conn.last_authorized_time.timestamp() as f64,
2847 })))
2848 }
2849
2850 fn create_api_destination(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2853 let body = req.json_body();
2854 validate_required("Name", &body["Name"])?;
2855 let name = body["Name"]
2856 .as_str()
2857 .ok_or_else(|| missing("Name"))?
2858 .to_string();
2859 validate_string_length("name", &name, 1, 64)?;
2860 validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2861 validate_required("ConnectionArn", &body["ConnectionArn"])?;
2862 let description = body["Description"].as_str().map(|s| s.to_string());
2863 let connection_arn = body["ConnectionArn"]
2864 .as_str()
2865 .ok_or_else(|| missing("ConnectionArn"))?
2866 .to_string();
2867 validate_string_length("connectionArn", &connection_arn, 1, 1600)?;
2868 validate_required("InvocationEndpoint", &body["InvocationEndpoint"])?;
2869 let endpoint = body["InvocationEndpoint"]
2870 .as_str()
2871 .ok_or_else(|| missing("InvocationEndpoint"))?
2872 .to_string();
2873 validate_string_length("invocationEndpoint", &endpoint, 1, 2048)?;
2874 validate_required("HttpMethod", &body["HttpMethod"])?;
2875 let http_method = body["HttpMethod"]
2876 .as_str()
2877 .ok_or_else(|| missing("HttpMethod"))?
2878 .to_string();
2879 validate_enum(
2880 "httpMethod",
2881 &http_method,
2882 &["POST", "GET", "HEAD", "OPTIONS", "PUT", "PATCH", "DELETE"],
2883 )?;
2884 let rate_limit = body["InvocationRateLimitPerSecond"].as_i64();
2885 if let Some(r) = rate_limit {
2886 validate_range_i64("invocationRateLimitPerSecond", r, 1, i64::MAX)?;
2887 }
2888
2889 let mut accounts = self.state.write();
2890 let state = accounts.get_or_create(&req.account_id);
2891 let now = Utc::now();
2892 let dest_uuid = uuid::Uuid::new_v4();
2893 let arn = format!(
2894 "arn:aws:events:{}:{}:api-destination/{}/{}",
2895 req.region, state.account_id, name, dest_uuid
2896 );
2897
2898 let dest = ApiDestination {
2899 name: name.clone(),
2900 arn: arn.clone(),
2901 description,
2902 connection_arn,
2903 invocation_endpoint: endpoint,
2904 http_method,
2905 invocation_rate_limit_per_second: rate_limit,
2906 state: "ACTIVE".to_string(),
2907 creation_time: now,
2908 last_modified_time: now,
2909 };
2910 state.api_destinations.insert(name, dest);
2911
2912 Ok(AwsResponse::ok_json(json!({
2913 "ApiDestinationArn": arn,
2914 "ApiDestinationState": "ACTIVE",
2915 "CreationTime": now.timestamp() as f64,
2916 "LastModifiedTime": now.timestamp() as f64,
2917 })))
2918 }
2919
2920 fn describe_api_destination(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2921 let body = req.json_body();
2922 validate_required("Name", &body["Name"])?;
2923 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
2924 validate_string_length("name", name, 1, 64)?;
2925
2926 let accounts = self.state.read();
2927 let empty = EventBridgeState::new(&req.account_id, &req.region);
2928 let state = accounts.get(&req.account_id).unwrap_or(&empty);
2929 let dest = state.api_destinations.get(name).ok_or_else(|| {
2930 AwsServiceError::aws_error(
2931 StatusCode::BAD_REQUEST,
2932 "ResourceNotFoundException",
2933 format!("An api-destination '{name}' does not exist."),
2934 )
2935 })?;
2936
2937 let mut resp = json!({
2938 "ApiDestinationArn": dest.arn,
2939 "Name": dest.name,
2940 "ConnectionArn": dest.connection_arn,
2941 "InvocationEndpoint": dest.invocation_endpoint,
2942 "HttpMethod": dest.http_method,
2943 "ApiDestinationState": dest.state,
2944 "CreationTime": dest.creation_time.timestamp() as f64,
2945 "LastModifiedTime": dest.last_modified_time.timestamp() as f64,
2946 });
2947 if let Some(ref desc) = dest.description {
2948 resp["Description"] = json!(desc);
2949 }
2950 if let Some(rate) = dest.invocation_rate_limit_per_second {
2951 resp["InvocationRateLimitPerSecond"] = json!(rate);
2952 }
2953
2954 Ok(AwsResponse::ok_json(resp))
2955 }
2956
2957 fn list_api_destinations(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2958 let body = req.json_body();
2959 validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 64)?;
2960 validate_optional_string_length("connectionArn", body["ConnectionArn"].as_str(), 1, 1600)?;
2961 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
2962 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
2963
2964 let name_prefix = body["NamePrefix"].as_str();
2965 let connection_arn = body["ConnectionArn"].as_str();
2966 let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
2967
2968 let accounts = self.state.read();
2969 let empty = EventBridgeState::new(&req.account_id, &req.region);
2970 let state = accounts.get(&req.account_id).unwrap_or(&empty);
2971 let all: Vec<Value> = state
2972 .api_destinations
2973 .values()
2974 .filter(|d| {
2975 if let Some(prefix) = name_prefix {
2976 if !d.name.starts_with(prefix) {
2977 return false;
2978 }
2979 }
2980 if let Some(arn) = connection_arn {
2981 if d.connection_arn != arn {
2982 return false;
2983 }
2984 }
2985 true
2986 })
2987 .map(|d| {
2988 let mut obj = json!({
2989 "ApiDestinationArn": d.arn,
2990 "Name": d.name,
2991 "ConnectionArn": d.connection_arn,
2992 "InvocationEndpoint": d.invocation_endpoint,
2993 "HttpMethod": d.http_method,
2994 "ApiDestinationState": d.state,
2995 "CreationTime": d.creation_time.timestamp() as f64,
2996 "LastModifiedTime": d.last_modified_time.timestamp() as f64,
2997 });
2998 if let Some(rate) = d.invocation_rate_limit_per_second {
2999 obj["InvocationRateLimitPerSecond"] = json!(rate);
3000 }
3001 obj
3002 })
3003 .collect();
3004
3005 let (dests, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
3006 let mut resp = json!({ "ApiDestinations": dests });
3007 if let Some(token) = next_token {
3008 resp["NextToken"] = json!(token);
3009 }
3010
3011 Ok(AwsResponse::ok_json(resp))
3012 }
3013
3014 fn update_api_destination(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3015 let body = req.json_body();
3016 validate_required("Name", &body["Name"])?;
3017 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
3018 validate_string_length("name", name, 1, 64)?;
3019 validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
3020 validate_optional_string_length("connectionArn", body["ConnectionArn"].as_str(), 1, 1600)?;
3021 validate_optional_string_length(
3022 "invocationEndpoint",
3023 body["InvocationEndpoint"].as_str(),
3024 1,
3025 2048,
3026 )?;
3027 validate_optional_enum(
3028 "httpMethod",
3029 body["HttpMethod"].as_str(),
3030 &["POST", "GET", "HEAD", "OPTIONS", "PUT", "PATCH", "DELETE"],
3031 )?;
3032 if let Some(r) = body["InvocationRateLimitPerSecond"].as_i64() {
3033 validate_range_i64("invocationRateLimitPerSecond", r, 1, i64::MAX)?;
3034 }
3035
3036 let mut accounts = self.state.write();
3037 let state = accounts.get_or_create(&req.account_id);
3038 let dest = state.api_destinations.get_mut(name).ok_or_else(|| {
3039 AwsServiceError::aws_error(
3040 StatusCode::BAD_REQUEST,
3041 "ResourceNotFoundException",
3042 format!("An api-destination '{name}' does not exist."),
3043 )
3044 })?;
3045
3046 if let Some(desc) = body["Description"].as_str() {
3047 dest.description = Some(desc.to_string());
3048 }
3049 if let Some(endpoint) = body["InvocationEndpoint"].as_str() {
3050 dest.invocation_endpoint = endpoint.to_string();
3051 }
3052 if let Some(method) = body["HttpMethod"].as_str() {
3053 dest.http_method = method.to_string();
3054 }
3055 if let Some(rate) = body["InvocationRateLimitPerSecond"].as_i64() {
3056 dest.invocation_rate_limit_per_second = Some(rate);
3057 }
3058 if let Some(conn) = body["ConnectionArn"].as_str() {
3059 dest.connection_arn = conn.to_string();
3060 }
3061 dest.last_modified_time = Utc::now();
3062
3063 Ok(AwsResponse::ok_json(json!({
3064 "ApiDestinationArn": dest.arn,
3065 "ApiDestinationState": dest.state,
3066 "CreationTime": dest.creation_time.timestamp() as f64,
3067 "LastModifiedTime": dest.last_modified_time.timestamp() as f64,
3068 })))
3069 }
3070
3071 fn delete_api_destination(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3072 let body = req.json_body();
3073 validate_required("Name", &body["Name"])?;
3074 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
3075 validate_string_length("name", name, 1, 64)?;
3076
3077 let mut accounts = self.state.write();
3078 let state = accounts.get_or_create(&req.account_id);
3079 if !state.api_destinations.contains_key(name) {
3080 return Err(AwsServiceError::aws_error(
3081 StatusCode::BAD_REQUEST,
3082 "ResourceNotFoundException",
3083 format!("An api-destination '{name}' does not exist."),
3084 ));
3085 }
3086 state.api_destinations.remove(name);
3087
3088 Ok(AwsResponse::ok_json(json!({})))
3089 }
3090
3091 fn start_replay(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3094 let input = StartReplayInput::from_body(&req.json_body())?;
3095
3096 let mut accounts = self.state.write();
3097 let state = accounts.get_or_create(&req.account_id);
3098
3099 let bus_name = state.resolve_bus_name(&input.destination_arn);
3101 if !state.buses.contains_key(&bus_name) {
3102 return Err(AwsServiceError::aws_error(
3103 StatusCode::BAD_REQUEST,
3104 "ResourceNotFoundException",
3105 format!("Event bus {bus_name} does not exist."),
3106 ));
3107 }
3108
3109 let archive_name = input
3110 .event_source_arn
3111 .rsplit_once("archive/")
3112 .map(|(_, n)| n.to_string())
3113 .unwrap_or_default();
3114 let archive = state.archives.get(&archive_name).ok_or_else(|| {
3115 AwsServiceError::aws_error(
3116 StatusCode::BAD_REQUEST,
3117 "ValidationException",
3118 format!(
3119 "Parameter EventSourceArn is not valid. Reason: Archive {archive_name} does not exist."
3120 ),
3121 )
3122 })?;
3123 let archive_bus = state.resolve_bus_name(&archive.event_source_arn);
3124 if archive_bus != bus_name {
3125 return Err(AwsServiceError::aws_error(
3126 StatusCode::BAD_REQUEST,
3127 "ValidationException",
3128 "Parameter Destination.Arn is not valid. Reason: Cross event bus replay is not permitted.",
3129 ));
3130 }
3131
3132 if input.event_end_time <= input.event_start_time {
3133 return Err(AwsServiceError::aws_error(
3134 StatusCode::BAD_REQUEST,
3135 "ValidationException",
3136 "Parameter EventEndTime is not valid. Reason: EventStartTime must be before EventEndTime.",
3137 ));
3138 }
3139
3140 if state.replays.contains_key(&input.name) {
3141 return Err(AwsServiceError::aws_error(
3142 StatusCode::BAD_REQUEST,
3143 "ResourceAlreadyExistsException",
3144 format!("Replay {} already exists.", input.name),
3145 ));
3146 }
3147
3148 let now = Utc::now();
3149 let arn = format!(
3150 "arn:aws:events:{}:{}:replay/{}",
3151 req.region, state.account_id, input.name
3152 );
3153
3154 let events_to_deliver = collect_replay_events_with_targets(
3155 state,
3156 &archive_name,
3157 &bus_name,
3158 input.event_start_time,
3159 input.event_end_time,
3160 &req.account_id,
3161 &req.region,
3162 );
3163
3164 let replay = Replay {
3165 name: input.name.clone(),
3166 arn: arn.clone(),
3167 description: input.description,
3168 event_source_arn: input.event_source_arn,
3169 destination: input.destination,
3170 event_start_time: input.event_start_time,
3171 event_end_time: input.event_end_time,
3172 state: "COMPLETED".to_string(),
3173 replay_start_time: now,
3174 replay_end_time: Some(now),
3175 };
3176 state.replays.insert(input.name, replay);
3177
3178 drop(accounts);
3179
3180 for (event, targets) in events_to_deliver {
3181 let detail_value: Value = serde_json::from_str(&event.detail).unwrap_or(json!({}));
3182 let event_json = json!({
3183 "version": "0",
3184 "id": event.event_id,
3185 "source": event.source,
3186 "account": req.account_id,
3187 "detail-type": event.detail_type,
3188 "detail": detail_value,
3189 "time": event.time.format("%Y-%m-%dT%H:%M:%SZ").to_string(),
3190 "region": req.region,
3191 "resources": event.resources,
3192 "replay-name": arn,
3193 });
3194 let event_str = event_json.to_string();
3195
3196 for target in targets {
3197 self.deliver_replay_event_to_target(
3198 &target,
3199 &event,
3200 &event_json,
3201 &event_str,
3202 &req.account_id,
3203 );
3204 }
3205 }
3206
3207 Ok(AwsResponse::ok_json(json!({
3208 "ReplayArn": arn,
3209 "ReplayStartTime": now.timestamp() as f64,
3210 "State": "STARTING",
3211 })))
3212 }
3213
3214 fn deliver_replay_event_to_target(
3215 &self,
3216 target: &EventTarget,
3217 event: &PutEvent,
3218 event_json: &Value,
3219 event_str: &str,
3220 account_id: &str,
3221 ) {
3222 let target_arn = &target.arn;
3223 let body_str = if let Some(ref transformer) = target.input_transformer {
3224 apply_input_transformer(transformer, event_json)
3225 } else if let Some(ref input) = target.input {
3226 input.clone()
3227 } else if let Some(ref input_path) = target.input_path {
3228 resolve_json_path(event_json, input_path)
3229 .map(|v| v.to_string())
3230 .unwrap_or_else(|| event_str.to_string())
3231 } else {
3232 event_str.to_string()
3233 };
3234
3235 if target_arn.contains(":sqs:") {
3236 let group_id = target
3237 .sqs_parameters
3238 .as_ref()
3239 .and_then(|p| p["MessageGroupId"].as_str())
3240 .map(|s| s.to_string());
3241 if group_id.is_some() {
3242 self.delivery.send_to_sqs_with_attrs(
3243 target_arn,
3244 &body_str,
3245 &HashMap::new(),
3246 group_id.as_deref(),
3247 None,
3248 );
3249 } else {
3250 self.delivery
3251 .send_to_sqs(target_arn, &body_str, &HashMap::new());
3252 }
3253 } else if target_arn.contains(":sns:") {
3254 self.delivery
3255 .publish_to_sns(target_arn, &body_str, Some(&event.detail_type));
3256 } else if target_arn.contains(":lambda:") {
3257 let mut accounts = self.state.write();
3258 let state = accounts.get_or_create(account_id);
3259 state
3260 .lambda_invocations
3261 .push(crate::state::LambdaInvocation {
3262 function_arn: target_arn.clone(),
3263 payload: body_str.clone(),
3264 timestamp: Utc::now(),
3265 });
3266 drop(accounts);
3267 if let Some(ref ls) = self.lambda_state {
3268 ls.write()
3269 .get_or_create(account_id)
3270 .invocations
3271 .push(LambdaInvocation {
3272 function_arn: target_arn.clone(),
3273 payload: body_str.clone(),
3274 timestamp: Utc::now(),
3275 source: "aws:events".to_string(),
3276 });
3277 }
3278 invoke_lambda_async(
3279 &self.container_runtime,
3280 &self.lambda_state,
3281 target_arn,
3282 &body_str,
3283 );
3284 } else if target_arn.contains(":logs:") {
3285 let mut accounts = self.state.write();
3286 let state = accounts.get_or_create(account_id);
3287 state.log_deliveries.push(crate::state::LogDelivery {
3288 log_group_arn: target_arn.clone(),
3289 payload: body_str.clone(),
3290 timestamp: Utc::now(),
3291 });
3292 drop(accounts);
3293 if let Some(ref log_state) = self.logs_state {
3294 deliver_to_logs(log_state, target_arn, &body_str, Utc::now());
3295 }
3296 } else if target_arn.contains(":states:") {
3297 self.delivery
3298 .start_stepfunctions_execution(target_arn, &body_str);
3299 let mut accounts = self.state.write();
3300 let state = accounts.get_or_create(account_id);
3301 state
3302 .step_function_executions
3303 .push(crate::state::StepFunctionExecution {
3304 state_machine_arn: target_arn.clone(),
3305 payload: body_str.clone(),
3306 timestamp: Utc::now(),
3307 });
3308 } else if target_arn.starts_with("https://") || target_arn.starts_with("http://") {
3309 let url = target_arn.clone();
3310 let payload = body_str.clone();
3311 tokio::spawn(async move {
3312 let client = reqwest::Client::new();
3313 let _ = client
3314 .post(&url)
3315 .header("Content-Type", "application/json")
3316 .body(payload)
3317 .send()
3318 .await;
3319 });
3320 }
3321 }
3322
3323 fn describe_replay(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3324 let body = req.json_body();
3325 validate_required("ReplayName", &body["ReplayName"])?;
3326 let name = body["ReplayName"]
3327 .as_str()
3328 .ok_or_else(|| missing("ReplayName"))?;
3329 validate_string_length("replayName", name, 1, 64)?;
3330
3331 let accounts = self.state.read();
3332 let empty = EventBridgeState::new(&req.account_id, &req.region);
3333 let state = accounts.get(&req.account_id).unwrap_or(&empty);
3334 let replay = state.replays.get(name).ok_or_else(|| {
3335 AwsServiceError::aws_error(
3336 StatusCode::BAD_REQUEST,
3337 "ResourceNotFoundException",
3338 format!("Replay {name} does not exist."),
3339 )
3340 })?;
3341
3342 let mut resp = json!({
3343 "Destination": replay.destination,
3344 "EventSourceArn": replay.event_source_arn,
3345 "EventStartTime": replay.event_start_time.timestamp() as f64,
3346 "EventEndTime": replay.event_end_time.timestamp() as f64,
3347 "ReplayArn": replay.arn,
3348 "ReplayName": replay.name,
3349 "ReplayStartTime": replay.replay_start_time.timestamp() as f64,
3350 "State": replay.state,
3351 });
3352 if let Some(ref desc) = replay.description {
3353 resp["Description"] = json!(desc);
3354 }
3355 if let Some(ref end) = replay.replay_end_time {
3356 resp["ReplayEndTime"] = json!(end.timestamp() as f64);
3357 }
3358
3359 Ok(AwsResponse::ok_json(resp))
3360 }
3361
3362 fn list_replays(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3363 let body = req.json_body();
3364 validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 64)?;
3365 validate_optional_string_length(
3366 "eventSourceArn",
3367 body["EventSourceArn"].as_str(),
3368 1,
3369 1600,
3370 )?;
3371 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
3372 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
3373 let name_prefix = body["NamePrefix"].as_str();
3374 let source_arn = body["EventSourceArn"].as_str();
3375 let replay_state = body["State"].as_str();
3376
3377 let filter_count = [
3379 name_prefix.is_some(),
3380 source_arn.is_some(),
3381 replay_state.is_some(),
3382 ]
3383 .iter()
3384 .filter(|&&x| x)
3385 .count();
3386 if filter_count > 1 {
3387 return Err(AwsServiceError::aws_error(
3388 StatusCode::BAD_REQUEST,
3389 "ValidationException",
3390 "At most one filter is allowed for ListReplays. Use either : State, EventSourceArn, or NamePrefix.",
3391 ));
3392 }
3393
3394 if let Some(s) = replay_state {
3396 let valid = [
3397 "CANCELLED",
3398 "CANCELLING",
3399 "COMPLETED",
3400 "FAILED",
3401 "RUNNING",
3402 "STARTING",
3403 ];
3404 if !valid.contains(&s) {
3405 return Err(AwsServiceError::aws_error(
3406 StatusCode::BAD_REQUEST,
3407 "ValidationException",
3408 format!(
3409 "1 validation error detected: Value '{}' at 'state' failed to satisfy constraint: Member must satisfy enum value set: [CANCELLED, CANCELLING, COMPLETED, FAILED, RUNNING, STARTING]",
3410 s
3411 ),
3412 ));
3413 }
3414 }
3415
3416 let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
3417
3418 let accounts = self.state.read();
3419 let empty = EventBridgeState::new(&req.account_id, &req.region);
3420 let state = accounts.get(&req.account_id).unwrap_or(&empty);
3421 let all: Vec<Value> = state
3422 .replays
3423 .values()
3424 .filter(|r| {
3425 if let Some(prefix) = name_prefix {
3426 r.name.starts_with(prefix)
3427 } else if let Some(arn) = source_arn {
3428 r.event_source_arn == arn
3429 } else if let Some(s) = replay_state {
3430 r.state == s
3431 } else {
3432 true
3433 }
3434 })
3435 .map(|r| {
3436 let mut obj = json!({
3437 "EventSourceArn": r.event_source_arn,
3438 "EventStartTime": r.event_start_time.timestamp() as f64,
3439 "EventEndTime": r.event_end_time.timestamp() as f64,
3440 "ReplayName": r.name,
3441 "ReplayStartTime": r.replay_start_time.timestamp() as f64,
3442 "State": r.state,
3443 });
3444 if let Some(ref end) = r.replay_end_time {
3445 obj["ReplayEndTime"] = json!(end.timestamp() as f64);
3446 }
3447 obj
3448 })
3449 .collect();
3450
3451 let (replays, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
3452 let mut resp = json!({ "Replays": replays });
3453 if let Some(token) = next_token {
3454 resp["NextToken"] = json!(token);
3455 }
3456
3457 Ok(AwsResponse::ok_json(resp))
3458 }
3459
3460 fn cancel_replay(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3461 let body = req.json_body();
3462 validate_required("ReplayName", &body["ReplayName"])?;
3463 let name = body["ReplayName"]
3464 .as_str()
3465 .ok_or_else(|| missing("ReplayName"))?;
3466 validate_string_length("replayName", name, 1, 64)?;
3467
3468 let mut accounts = self.state.write();
3469 let state = accounts.get_or_create(&req.account_id);
3470 let replay = state.replays.get_mut(name).ok_or_else(|| {
3471 AwsServiceError::aws_error(
3472 StatusCode::BAD_REQUEST,
3473 "ResourceNotFoundException",
3474 format!("Replay {name} does not exist."),
3475 )
3476 })?;
3477
3478 if replay.state == "CANCELLED" || replay.state == "CANCELLING" {
3480 return Err(AwsServiceError::aws_error(
3481 StatusCode::BAD_REQUEST,
3482 "IllegalStatusException",
3483 format!("Replay {name} is not in a valid state for this operation."),
3484 ));
3485 }
3486
3487 let arn = replay.arn.clone();
3488 replay.state = "CANCELLED".to_string();
3489
3490 Ok(AwsResponse::ok_json(json!({
3491 "ReplayArn": arn,
3492 "State": "CANCELLING",
3493 })))
3494 }
3495}
3496
3497fn find_tags_mut<'a>(
3500 state: &'a mut crate::state::EventBridgeState,
3501 arn: &str,
3502) -> Result<&'a mut HashMap<String, String>, AwsServiceError> {
3503 for bus in state.buses.values_mut() {
3505 if bus.arn == arn {
3506 return Ok(&mut bus.tags);
3507 }
3508 }
3509 for rule in state.rules.values_mut() {
3511 if rule.arn == arn {
3512 return Ok(&mut rule.tags);
3513 }
3514 }
3515
3516 let error_msg = if arn.contains(":rule/") {
3518 let parts: Vec<&str> = arn.rsplitn(2, ":rule/").collect();
3520 if let Some(rule_path) = parts.first() {
3521 if let Some((bus, rule_name)) = rule_path.rsplit_once('/') {
3522 format!("Rule {rule_name} does not exist on EventBus {bus}.")
3523 } else {
3524 format!("Rule {} does not exist on EventBus default.", rule_path)
3525 }
3526 } else {
3527 format!("Resource {arn} not found.")
3528 }
3529 } else {
3530 format!("Resource {arn} not found.")
3531 };
3532
3533 Err(AwsServiceError::aws_error(
3534 StatusCode::BAD_REQUEST,
3535 "ResourceNotFoundException",
3536 error_msg,
3537 ))
3538}
3539
3540fn find_tags<'a>(
3541 state: &'a crate::state::EventBridgeState,
3542 arn: &str,
3543) -> Result<&'a HashMap<String, String>, AwsServiceError> {
3544 for bus in state.buses.values() {
3545 if bus.arn == arn {
3546 return Ok(&bus.tags);
3547 }
3548 }
3549 for rule in state.rules.values() {
3550 if rule.arn == arn {
3551 return Ok(&rule.tags);
3552 }
3553 }
3554
3555 let error_msg = if arn.contains(":rule/") {
3556 let parts: Vec<&str> = arn.rsplitn(2, ":rule/").collect();
3557 if let Some(rule_path) = parts.first() {
3558 if let Some((bus, rule_name)) = rule_path.rsplit_once('/') {
3559 format!("Rule {rule_name} does not exist on EventBus {bus}.")
3560 } else {
3561 format!("Rule {} does not exist on EventBus default.", rule_path)
3562 }
3563 } else {
3564 format!("Resource {arn} not found.")
3565 }
3566 } else {
3567 format!("Resource {arn} not found.")
3568 };
3569
3570 Err(AwsServiceError::aws_error(
3571 StatusCode::BAD_REQUEST,
3572 "ResourceNotFoundException",
3573 error_msg,
3574 ))
3575}
3576
3577fn validate_event_pattern(pattern: &str) -> Result<(), AwsServiceError> {
3580 let parsed: Value = serde_json::from_str(pattern).map_err(|_| {
3581 AwsServiceError::aws_error(
3582 StatusCode::BAD_REQUEST,
3583 "InvalidEventPatternException",
3584 "Event pattern is not valid. Reason: Invalid JSON",
3585 )
3586 })?;
3587
3588 validate_pattern_values(&parsed, "")?;
3589 Ok(())
3590}
3591
3592fn validate_pattern_values(value: &Value, path: &str) -> Result<(), AwsServiceError> {
3593 match value {
3594 Value::Object(obj) => {
3595 for (key, val) in obj {
3596 let new_path = if path.is_empty() {
3597 key.clone()
3598 } else {
3599 format!("{path}.{key}")
3600 };
3601 match val {
3602 Value::Object(_) => validate_pattern_values(val, &new_path)?,
3603 Value::Array(_) => {} _ => {
3605 return Err(AwsServiceError::aws_error(
3606 StatusCode::BAD_REQUEST,
3607 "InvalidEventPatternException",
3608 format!(
3609 "Event pattern is not valid. Reason: '{}' must be an object or an array",
3610 key
3611 ),
3612 ));
3613 }
3614 }
3615 }
3616 Ok(())
3617 }
3618 _ => Ok(()),
3619 }
3620}
3621
3622fn build_auth_params_response(auth_type: &str, params: &Value) -> Value {
3625 match auth_type {
3626 "API_KEY" => {
3627 let mut resp = json!({});
3628 if let Some(api_key) = params.get("ApiKeyAuthParameters") {
3629 resp["ApiKeyAuthParameters"] = json!({
3630 "ApiKeyName": api_key["ApiKeyName"],
3631 });
3632 }
3633 resp
3634 }
3635 "BASIC" => {
3636 let mut resp = json!({});
3637 if let Some(basic) = params.get("BasicAuthParameters") {
3638 resp["BasicAuthParameters"] = json!({
3639 "Username": basic["Username"],
3640 });
3641 }
3642 resp
3643 }
3644 "OAUTH_CLIENT_CREDENTIALS" => {
3645 let mut resp = json!({});
3646 if let Some(oauth) = params.get("OAuthParameters") {
3647 resp["OAuthParameters"] = json!({
3648 "AuthorizationEndpoint": oauth["AuthorizationEndpoint"],
3649 "HttpMethod": oauth["HttpMethod"],
3650 "ClientParameters": {
3651 "ClientID": oauth.get("ClientParameters").and_then(|c| c.get("ClientID")),
3652 },
3653 });
3654 }
3655 resp
3656 }
3657 _ => params.clone(),
3658 }
3659}
3660
3661pub fn matches_pattern(
3665 pattern_json: Option<&str>,
3666 source: &str,
3667 detail_type: &str,
3668 detail: &str,
3669 account: &str,
3670 region: &str,
3671 resources: &[String],
3672) -> bool {
3673 let pattern_json = match pattern_json {
3674 Some(p) => p,
3675 None => return true,
3676 };
3677
3678 let pattern: Value = match serde_json::from_str(pattern_json) {
3679 Ok(v) => v,
3680 Err(_) => return false,
3681 };
3682
3683 let pattern_obj = match pattern.as_object() {
3684 Some(o) => o,
3685 None => return false,
3686 };
3687
3688 let detail_value: Value = serde_json::from_str(detail).unwrap_or(json!({}));
3689 let event = json!({
3690 "source": source,
3691 "detail-type": detail_type,
3692 "detail": detail_value,
3693 "account": account,
3694 "region": region,
3695 "resources": resources,
3696 });
3697
3698 for (key, pattern_value) in pattern_obj {
3699 let event_value = &event[key];
3700 if !matches_value(pattern_value, event_value) {
3701 return false;
3702 }
3703 }
3704
3705 true
3706}
3707
3708fn matches_value(pattern: &Value, event_value: &Value) -> bool {
3709 match pattern {
3710 Value::Object(obj) => {
3711 for (key, sub_pattern) in obj {
3712 let sub_value = &event_value[key];
3713 if !matches_value(sub_pattern, sub_value) {
3714 return false;
3715 }
3716 }
3717 true
3718 }
3719 Value::Array(arr) => arr.iter().any(|elem| matches_single(elem, event_value)),
3720 _ => false,
3721 }
3722}
3723
3724fn matches_single(pattern_elem: &Value, event_value: &Value) -> bool {
3725 match pattern_elem {
3726 Value::Object(obj) => {
3727 if let Some(prefix_val) = obj.get("prefix") {
3728 if let (Some(prefix), Some(actual)) = (prefix_val.as_str(), event_value.as_str()) {
3729 return actual.starts_with(prefix);
3730 }
3731 return false;
3732 }
3733 if let Some(exists_val) = obj.get("exists") {
3734 let should_exist = exists_val.as_bool().unwrap_or(true);
3735 let does_exist = !event_value.is_null();
3736 return should_exist == does_exist;
3737 }
3738 if let Some(anything_but_val) = obj.get("anything-but") {
3739 return match anything_but_val {
3740 Value::String(s) => event_value.as_str() != Some(s.as_str()),
3741 Value::Array(arr) => !arr.iter().any(|v| values_equal(v, event_value)),
3742 Value::Number(_) => event_value != anything_but_val,
3743 _ => true,
3744 };
3745 }
3746 if let Some(numeric_val) = obj.get("numeric") {
3747 return matches_numeric(numeric_val, event_value);
3748 }
3749 false
3750 }
3751 _ => values_equal(pattern_elem, event_value),
3752 }
3753}
3754
3755#[allow(clippy::too_many_arguments)]
3759fn archive_matching_event(
3760 state: &mut crate::state::EventBridgeState,
3761 event: &PutEvent,
3762 event_bus_name: &str,
3763 source: &str,
3764 detail_type: &str,
3765 detail: &str,
3766 account_id: &str,
3767 region: &str,
3768 resources: &[String],
3769) {
3770 let archive_keys: Vec<String> = state.archives.keys().cloned().collect();
3771 for akey in archive_keys {
3772 let (archive_bus, archive_pattern, archive_enabled) = {
3773 let a = &state.archives[&akey];
3774 (
3775 state.resolve_bus_name(&a.event_source_arn),
3776 a.event_pattern.clone(),
3777 a.state == "ENABLED",
3778 )
3779 };
3780 if archive_bus != event_bus_name || !archive_enabled {
3781 continue;
3782 }
3783 let pattern_matches = matches_pattern(
3784 archive_pattern.as_deref(),
3785 source,
3786 detail_type,
3787 detail,
3788 account_id,
3789 region,
3790 resources,
3791 );
3792 if !pattern_matches {
3793 continue;
3794 }
3795 if let Some(archive) = state.archives.get_mut(&akey) {
3796 archive.event_count += 1;
3797 archive.size_bytes += detail.len() as i64;
3798 archive.events.push(event.clone());
3799 }
3800 }
3801}
3802
3803struct StartReplayInput {
3805 name: String,
3806 description: Option<String>,
3807 event_source_arn: String,
3808 destination: Value,
3809 destination_arn: String,
3810 event_start_time: DateTime<Utc>,
3811 event_end_time: DateTime<Utc>,
3812}
3813
3814impl StartReplayInput {
3815 fn from_body(body: &Value) -> Result<Self, AwsServiceError> {
3816 validate_required("ReplayName", &body["ReplayName"])?;
3817 let name = body["ReplayName"]
3818 .as_str()
3819 .ok_or_else(|| missing("ReplayName"))?
3820 .to_string();
3821 validate_string_length("replayName", &name, 1, 64)?;
3822 validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
3823 validate_required("EventSourceArn", &body["EventSourceArn"])?;
3824 let description = body["Description"].as_str().map(|s| s.to_string());
3825 let event_source_arn = body["EventSourceArn"]
3826 .as_str()
3827 .ok_or_else(|| missing("EventSourceArn"))?
3828 .to_string();
3829 validate_string_length("eventSourceArn", &event_source_arn, 1, 1600)?;
3830 validate_required("EventStartTime", &body["EventStartTime"])?;
3831 validate_required("EventEndTime", &body["EventEndTime"])?;
3832 validate_required("Destination", &body["Destination"])?;
3833 let destination = body["Destination"].clone();
3834
3835 let event_start_time = body["EventStartTime"]
3836 .as_f64()
3837 .and_then(|f| DateTime::from_timestamp(f as i64, 0))
3838 .unwrap_or_else(Utc::now);
3839 let event_end_time = body["EventEndTime"]
3840 .as_f64()
3841 .and_then(|f| DateTime::from_timestamp(f as i64, 0))
3842 .unwrap_or_else(Utc::now);
3843
3844 let destination_arn = destination["Arn"].as_str().unwrap_or("").to_string();
3845 if !destination_arn.contains(":event-bus/") {
3846 return Err(AwsServiceError::aws_error(
3847 StatusCode::BAD_REQUEST,
3848 "ValidationException",
3849 "Parameter Destination.Arn is not valid. Reason: Must contain an event bus ARN.",
3850 ));
3851 }
3852
3853 Ok(Self {
3854 name,
3855 description,
3856 event_source_arn,
3857 destination,
3858 destination_arn,
3859 event_start_time,
3860 event_end_time,
3861 })
3862 }
3863}
3864
3865#[allow(clippy::too_many_arguments)]
3870fn collect_replay_events_with_targets(
3871 state: &crate::state::EventBridgeState,
3872 archive_name: &str,
3873 bus_name: &str,
3874 event_start_time: DateTime<Utc>,
3875 event_end_time: DateTime<Utc>,
3876 account_id: &str,
3877 region: &str,
3878) -> Vec<(PutEvent, Vec<EventTarget>)> {
3879 let Some(archive) = state.archives.get(archive_name) else {
3880 return Vec::new();
3881 };
3882
3883 let replay_events: Vec<PutEvent> = archive
3884 .events
3885 .iter()
3886 .filter(|e| e.time >= event_start_time && e.time < event_end_time)
3887 .cloned()
3888 .collect();
3889
3890 let mut events_to_deliver: Vec<(PutEvent, Vec<EventTarget>)> = Vec::new();
3891 for event in replay_events {
3892 let matching_targets: Vec<EventTarget> = state
3893 .rules
3894 .values()
3895 .filter(|r| {
3896 r.event_bus_name == bus_name
3897 && r.state == "ENABLED"
3898 && matches_pattern(
3899 r.event_pattern.as_deref(),
3900 &event.source,
3901 &event.detail_type,
3902 &event.detail,
3903 account_id,
3904 region,
3905 &event.resources,
3906 )
3907 })
3908 .flat_map(|r| r.targets.clone())
3909 .collect();
3910
3911 if !matching_targets.is_empty() {
3912 events_to_deliver.push((event, matching_targets));
3913 }
3914 }
3915 events_to_deliver
3916}
3917
3918fn matches_numeric(numeric_arr: &Value, event_value: &Value) -> bool {
3919 let arr = match numeric_arr.as_array() {
3920 Some(a) => a,
3921 None => return false,
3922 };
3923 let actual = match event_value.as_f64() {
3924 Some(n) => n,
3925 None => return false,
3926 };
3927 let mut i = 0;
3928 while i + 1 < arr.len() {
3929 let op = match arr[i].as_str() {
3930 Some(s) => s,
3931 None => return false,
3932 };
3933 let threshold = match arr[i + 1].as_f64() {
3934 Some(n) => n,
3935 None => return false,
3936 };
3937 let ok = match op {
3938 ">" => actual > threshold,
3939 ">=" => actual >= threshold,
3940 "<" => actual < threshold,
3941 "<=" => actual <= threshold,
3942 "=" => (actual - threshold).abs() < f64::EPSILON,
3943 _ => return false,
3944 };
3945 if !ok {
3946 return false;
3947 }
3948 i += 2;
3949 }
3950 true
3951}
3952
3953fn values_equal(a: &Value, b: &Value) -> bool {
3954 a == b
3955}
3956
3957fn resolve_json_path(event: &Value, path: &str) -> Option<Value> {
3959 let path = path.strip_prefix('$').unwrap_or(path);
3960 let mut current = event;
3961 for segment in path.split('.') {
3962 if segment.is_empty() {
3963 continue;
3964 }
3965 current = current.get(segment)?;
3966 }
3967 Some(current.clone())
3968}
3969
3970fn apply_input_transformer(transformer: &Value, event: &Value) -> String {
3972 let input_paths_map = transformer
3973 .get("InputPathsMap")
3974 .and_then(|v| v.as_object())
3975 .cloned()
3976 .unwrap_or_default();
3977 let template = transformer
3978 .get("InputTemplate")
3979 .and_then(|v| v.as_str())
3980 .unwrap_or("")
3981 .to_string();
3982
3983 let mut resolved: HashMap<String, Value> = HashMap::new();
3985 for (var_name, path_val) in &input_paths_map {
3986 if let Some(path_str) = path_val.as_str() {
3987 if let Some(val) = resolve_json_path(event, path_str) {
3988 resolved.insert(var_name.clone(), val);
3989 }
3990 }
3991 }
3992
3993 let mut result = template;
3995 for (var_name, val) in &resolved {
3996 let placeholder = format!("<{var_name}>");
3997 let replacement = match val {
3998 Value::String(s) => s.clone(),
3999 other => other.to_string(),
4000 };
4001 result = result.replace(&placeholder, &replacement);
4002 }
4003
4004 result
4005}
4006
4007fn missing(name: &str) -> AwsServiceError {
4008 AwsServiceError::aws_error(
4009 StatusCode::BAD_REQUEST,
4010 "ValidationException",
4011 format!("The request must contain the parameter {name}"),
4012 )
4013}
4014
4015fn function_name_from_arn(arn: &str) -> &str {
4020 let parts: Vec<&str> = arn.split(':').collect();
4021 if parts.len() >= 7 && parts[5] == "function" {
4022 parts[6]
4023 } else {
4024 arn
4025 }
4026}
4027
4028pub fn invoke_lambda_async(
4031 container_runtime: &Option<Arc<ContainerRuntime>>,
4032 lambda_state: &Option<SharedLambdaState>,
4033 function_arn: &str,
4034 payload: &str,
4035) {
4036 let runtime = match container_runtime {
4037 Some(rt) => rt.clone(),
4038 None => return,
4039 };
4040 let lambda_state = match lambda_state {
4041 Some(ls) => ls.clone(),
4042 None => return,
4043 };
4044 let func_name = function_name_from_arn(function_arn).to_string();
4045 let payload = payload.as_bytes().to_vec();
4046
4047 tokio::spawn(async move {
4048 let func = {
4049 let accounts = lambda_state.read();
4050 let state = accounts.default_ref();
4051 state.functions.get(&func_name).cloned()
4052 };
4053 let func = match func {
4054 Some(f) => f,
4055 None => {
4056 tracing::warn!(
4057 function = %func_name,
4058 "EventBridge Lambda target not found, skipping invocation"
4059 );
4060 return;
4061 }
4062 };
4063 match runtime.invoke(&func, &payload).await {
4064 Ok(_) => {
4065 tracing::info!(function = %func_name, "EventBridge Lambda invocation succeeded");
4066 }
4067 Err(e) => {
4068 tracing::warn!(
4069 function = %func_name,
4070 error = %e,
4071 "EventBridge Lambda invocation failed"
4072 );
4073 }
4074 }
4075 });
4076}
4077
4078pub fn deliver_to_logs(
4081 logs_state: &SharedLogsState,
4082 log_group_arn: &str,
4083 payload: &str,
4084 timestamp: chrono::DateTime<chrono::Utc>,
4085) {
4086 let group_name = if log_group_arn.contains(":log-group:") {
4089 log_group_arn
4090 .split(":log-group:")
4091 .nth(1)
4092 .unwrap_or(log_group_arn)
4093 .trim_end_matches(":*")
4094 } else {
4095 log_group_arn
4096 };
4097
4098 let stream_name = "events".to_string();
4099 let ts_millis = timestamp.timestamp_millis();
4100
4101 let mut accounts = logs_state.write();
4102 let state = accounts.default_mut();
4103 let region = state.region.clone();
4104 let account_id = state.account_id.clone();
4105
4106 let group = state
4108 .log_groups
4109 .entry(group_name.to_string())
4110 .or_insert_with(|| fakecloud_logs::state::LogGroup {
4111 name: group_name.to_string(),
4112 arn: Arn::new(
4113 "logs",
4114 ®ion,
4115 &account_id,
4116 &format!("log-group:{group_name}"),
4117 )
4118 .to_string(),
4119 creation_time: ts_millis,
4120 retention_in_days: None,
4121 kms_key_id: None,
4122 tags: HashMap::new(),
4123 log_streams: HashMap::new(),
4124 stored_bytes: 0,
4125 subscription_filters: Vec::new(),
4126 data_protection_policy: None,
4127 index_policies: Vec::new(),
4128 transformer: None,
4129 deletion_protection: false,
4130 log_group_class: Some("STANDARD".to_string()),
4131 });
4132
4133 let stream = group
4134 .log_streams
4135 .entry(stream_name.clone())
4136 .or_insert_with(|| fakecloud_logs::state::LogStream {
4137 name: stream_name,
4138 arn: format!("{}:log-stream:events", group.arn),
4139 creation_time: ts_millis,
4140 first_event_timestamp: None,
4141 last_event_timestamp: None,
4142 last_ingestion_time: None,
4143 upload_sequence_token: "1".to_string(),
4144 events: Vec::new(),
4145 });
4146
4147 stream.events.push(fakecloud_logs::state::LogEvent {
4148 timestamp: ts_millis,
4149 message: payload.to_string(),
4150 ingestion_time: ts_millis,
4151 });
4152 stream.last_event_timestamp = Some(ts_millis);
4153 stream.last_ingestion_time = Some(ts_millis);
4154 if stream.first_event_timestamp.is_none() {
4155 stream.first_event_timestamp = Some(ts_millis);
4156 }
4157}
4158
4159fn apply_connection_auth(
4161 mut builder: reqwest::RequestBuilder,
4162 conn: &Connection,
4163) -> reqwest::RequestBuilder {
4164 match conn.authorization_type.as_str() {
4165 "API_KEY" => {
4166 if let Some(params) = conn.auth_parameters.get("ApiKeyAuthParameters") {
4167 if let (Some(name), Some(value)) = (
4168 params["ApiKeyName"].as_str(),
4169 params["ApiKeyValue"].as_str(),
4170 ) {
4171 builder = builder.header(name, value);
4172 }
4173 }
4174 }
4175 "BASIC" => {
4176 if let Some(params) = conn.auth_parameters.get("BasicAuthParameters") {
4177 if let (Some(user), Some(pass)) =
4178 (params["Username"].as_str(), params["Password"].as_str())
4179 {
4180 builder = builder.basic_auth(user, Some(pass));
4181 }
4182 }
4183 }
4184 "OAUTH_CLIENT_CREDENTIALS" => {
4185 if let Some(params) = conn.auth_parameters.get("OAuthParameters") {
4188 if let (Some(client_id), Some(client_secret)) = (
4189 params["ClientParameters"]["ClientID"].as_str(),
4190 params["ClientParameters"]["ClientSecret"].as_str(),
4191 ) {
4192 builder = builder.basic_auth(client_id, Some(client_secret));
4193 }
4194 }
4195 }
4196 _ => {}
4197 }
4198 builder
4199}
4200
4201#[cfg(test)]
4202mod tests {
4203 use super::*;
4204
4205 fn test_matches(
4207 pattern_json: Option<&str>,
4208 source: &str,
4209 detail_type: &str,
4210 detail: &str,
4211 ) -> bool {
4212 matches_pattern(
4213 pattern_json,
4214 source,
4215 detail_type,
4216 detail,
4217 "123456789012",
4218 "us-east-1",
4219 &[],
4220 )
4221 }
4222
4223 #[test]
4224 fn pattern_matches_source() {
4225 assert!(test_matches(
4226 Some(r#"{"source": ["my.app"]}"#),
4227 "my.app",
4228 "OrderPlaced",
4229 "{}"
4230 ));
4231 assert!(!test_matches(
4232 Some(r#"{"source": ["other.app"]}"#),
4233 "my.app",
4234 "OrderPlaced",
4235 "{}"
4236 ));
4237 }
4238
4239 #[test]
4240 fn pattern_matches_detail_type() {
4241 assert!(test_matches(
4242 Some(r#"{"detail-type": ["OrderPlaced"]}"#),
4243 "my.app",
4244 "OrderPlaced",
4245 "{}"
4246 ));
4247 assert!(!test_matches(
4248 Some(r#"{"detail-type": ["OrderShipped"]}"#),
4249 "my.app",
4250 "OrderPlaced",
4251 "{}"
4252 ));
4253 }
4254
4255 #[test]
4256 fn pattern_matches_detail_field() {
4257 assert!(test_matches(
4258 Some(r#"{"detail": {"status": ["ACTIVE"]}}"#),
4259 "my.app",
4260 "StatusChange",
4261 r#"{"status": "ACTIVE"}"#
4262 ));
4263 assert!(!test_matches(
4264 Some(r#"{"detail": {"status": ["ACTIVE"]}}"#),
4265 "my.app",
4266 "StatusChange",
4267 r#"{"status": "INACTIVE"}"#
4268 ));
4269 }
4270
4271 #[test]
4272 fn no_pattern_matches_everything() {
4273 assert!(test_matches(None, "any", "any", "{}"));
4274 }
4275
4276 #[test]
4277 fn combined_pattern() {
4278 let pattern = r#"{"source": ["orders"], "detail-type": ["OrderPlaced"]}"#;
4279 assert!(test_matches(Some(pattern), "orders", "OrderPlaced", "{}"));
4280 assert!(!test_matches(Some(pattern), "orders", "OrderShipped", "{}"));
4281 assert!(!test_matches(Some(pattern), "other", "OrderPlaced", "{}"));
4282 }
4283
4284 #[test]
4285 fn nested_detail_pattern() {
4286 let pattern = r#"{"detail": {"order": {"status": ["PLACED"]}}}"#;
4287 assert!(test_matches(
4288 Some(pattern),
4289 "my.app",
4290 "OrderEvent",
4291 r#"{"order": {"status": "PLACED", "id": "123"}}"#
4292 ));
4293 assert!(!test_matches(
4294 Some(pattern),
4295 "my.app",
4296 "OrderEvent",
4297 r#"{"order": {"status": "SHIPPED", "id": "123"}}"#
4298 ));
4299 assert!(!test_matches(
4300 Some(pattern),
4301 "my.app",
4302 "OrderEvent",
4303 r#"{"order": {"id": "123"}}"#
4304 ));
4305 }
4306
4307 #[test]
4308 fn deeply_nested_detail_pattern() {
4309 let pattern = r#"{"detail": {"a": {"b": {"c": ["deep"]}}}}"#;
4310 assert!(test_matches(
4311 Some(pattern),
4312 "src",
4313 "type",
4314 r#"{"a": {"b": {"c": "deep"}}}"#
4315 ));
4316 assert!(!test_matches(
4317 Some(pattern),
4318 "src",
4319 "type",
4320 r#"{"a": {"b": {"c": "shallow"}}}"#
4321 ));
4322 }
4323
4324 #[test]
4325 fn prefix_matcher() {
4326 let pattern = r#"{"source": [{"prefix": "com.myapp"}]}"#;
4327 assert!(test_matches(
4328 Some(pattern),
4329 "com.myapp.orders",
4330 "OrderPlaced",
4331 "{}"
4332 ));
4333 assert!(test_matches(
4334 Some(pattern),
4335 "com.myapp",
4336 "OrderPlaced",
4337 "{}"
4338 ));
4339 assert!(!test_matches(
4340 Some(pattern),
4341 "com.other",
4342 "OrderPlaced",
4343 "{}"
4344 ));
4345 }
4346
4347 #[test]
4348 fn prefix_matcher_in_detail() {
4349 let pattern = r#"{"detail": {"region": [{"prefix": "us-"}]}}"#;
4350 assert!(test_matches(
4351 Some(pattern),
4352 "src",
4353 "type",
4354 r#"{"region": "us-east-1"}"#
4355 ));
4356 assert!(!test_matches(
4357 Some(pattern),
4358 "src",
4359 "type",
4360 r#"{"region": "eu-west-1"}"#
4361 ));
4362 }
4363
4364 #[test]
4365 fn exists_matcher() {
4366 let pattern = r#"{"detail": {"error": [{"exists": true}]}}"#;
4367 assert!(test_matches(
4368 Some(pattern),
4369 "src",
4370 "type",
4371 r#"{"error": "something broke"}"#
4372 ));
4373 assert!(!test_matches(
4374 Some(pattern),
4375 "src",
4376 "type",
4377 r#"{"status": "ok"}"#
4378 ));
4379
4380 let pattern = r#"{"detail": {"error": [{"exists": false}]}}"#;
4381 assert!(test_matches(
4382 Some(pattern),
4383 "src",
4384 "type",
4385 r#"{"status": "ok"}"#
4386 ));
4387 assert!(!test_matches(
4388 Some(pattern),
4389 "src",
4390 "type",
4391 r#"{"error": "something broke"}"#
4392 ));
4393 }
4394
4395 #[test]
4396 fn anything_but_matcher() {
4397 let pattern = r#"{"source": [{"anything-but": "internal"}]}"#;
4398 assert!(test_matches(Some(pattern), "external", "Event", "{}"));
4399 assert!(!test_matches(Some(pattern), "internal", "Event", "{}"));
4400
4401 let pattern = r#"{"source": [{"anything-but": ["internal", "test"]}]}"#;
4402 assert!(test_matches(Some(pattern), "external", "Event", "{}"));
4403 assert!(!test_matches(Some(pattern), "internal", "Event", "{}"));
4404 assert!(!test_matches(Some(pattern), "test", "Event", "{}"));
4405 }
4406
4407 #[test]
4408 fn anything_but_in_detail() {
4409 let pattern = r#"{"detail": {"env": [{"anything-but": "prod"}]}}"#;
4410 assert!(test_matches(
4411 Some(pattern),
4412 "src",
4413 "type",
4414 r#"{"env": "staging"}"#
4415 ));
4416 assert!(!test_matches(
4417 Some(pattern),
4418 "src",
4419 "type",
4420 r#"{"env": "prod"}"#
4421 ));
4422 }
4423
4424 #[test]
4425 fn numeric_greater_than() {
4426 let pattern = r#"{"detail": {"count": [{"numeric": [">", 100]}]}}"#;
4427 assert!(test_matches(
4428 Some(pattern),
4429 "src",
4430 "type",
4431 r#"{"count": 150}"#
4432 ));
4433 assert!(!test_matches(
4434 Some(pattern),
4435 "src",
4436 "type",
4437 r#"{"count": 100}"#
4438 ));
4439 assert!(!test_matches(
4440 Some(pattern),
4441 "src",
4442 "type",
4443 r#"{"count": 50}"#
4444 ));
4445 }
4446
4447 #[test]
4448 fn numeric_less_than() {
4449 let pattern = r#"{"detail": {"count": [{"numeric": ["<", 10]}]}}"#;
4450 assert!(test_matches(
4451 Some(pattern),
4452 "src",
4453 "type",
4454 r#"{"count": 5}"#
4455 ));
4456 assert!(!test_matches(
4457 Some(pattern),
4458 "src",
4459 "type",
4460 r#"{"count": 10}"#
4461 ));
4462 assert!(!test_matches(
4463 Some(pattern),
4464 "src",
4465 "type",
4466 r#"{"count": 15}"#
4467 ));
4468 }
4469
4470 #[test]
4471 fn numeric_range() {
4472 let pattern = r#"{"detail": {"count": [{"numeric": [">=", 50, "<", 200]}]}}"#;
4473 assert!(test_matches(
4474 Some(pattern),
4475 "src",
4476 "type",
4477 r#"{"count": 50}"#
4478 ));
4479 assert!(test_matches(
4480 Some(pattern),
4481 "src",
4482 "type",
4483 r#"{"count": 100}"#
4484 ));
4485 assert!(!test_matches(
4486 Some(pattern),
4487 "src",
4488 "type",
4489 r#"{"count": 200}"#
4490 ));
4491 assert!(!test_matches(
4492 Some(pattern),
4493 "src",
4494 "type",
4495 r#"{"count": 49}"#
4496 ));
4497 }
4498
4499 #[test]
4500 fn mixed_matchers_and_literals() {
4501 let pattern = r#"{"source": ["exact.match", {"prefix": "com.myapp"}]}"#;
4502 assert!(test_matches(Some(pattern), "exact.match", "Event", "{}"));
4503 assert!(test_matches(
4504 Some(pattern),
4505 "com.myapp.orders",
4506 "Event",
4507 "{}"
4508 ));
4509 assert!(!test_matches(Some(pattern), "other.source", "Event", "{}"));
4510 }
4511
4512 use fakecloud_core::delivery::DeliveryBus;
4515 use parking_lot::RwLock;
4516
4517 fn make_service() -> EventBridgeService {
4518 let state = Arc::new(RwLock::new(
4519 fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
4520 ));
4521 let delivery = Arc::new(DeliveryBus::new());
4522 EventBridgeService::new(state, delivery)
4523 }
4524
4525 fn make_request(action: &str, body: Value) -> AwsRequest {
4526 AwsRequest {
4527 service: "events".to_string(),
4528 action: action.to_string(),
4529 region: "us-east-1".to_string(),
4530 account_id: "123456789012".to_string(),
4531 request_id: "test-id".to_string(),
4532 headers: http::HeaderMap::new(),
4533 query_params: HashMap::new(),
4534 body: serde_json::to_vec(&body).unwrap().into(),
4535 body_stream: parking_lot::Mutex::new(None),
4536 path_segments: vec![],
4537 raw_path: "/".to_string(),
4538 raw_query: String::new(),
4539 method: http::Method::POST,
4540 is_query_protocol: false,
4541 access_key_id: None,
4542 principal: None,
4543 }
4544 }
4545
4546 fn create_connection(svc: &EventBridgeService, name: &str) {
4547 let req = make_request(
4548 "CreateConnection",
4549 json!({
4550 "Name": name,
4551 "AuthorizationType": "API_KEY",
4552 "AuthParameters": {
4553 "ApiKeyAuthParameters": {
4554 "ApiKeyName": "x-api-key",
4555 "ApiKeyValue": "secret"
4556 }
4557 }
4558 }),
4559 );
4560 svc.create_connection(&req).unwrap();
4561 }
4562
4563 fn create_api_destination(svc: &EventBridgeService, name: &str, conn_name: &str) {
4564 let conn_arn_field = {
4565 let _mas = svc.state.read();
4566 let state = _mas.default_ref();
4567 state.connections.get(conn_name).unwrap().arn.clone()
4568 };
4569 let req = make_request(
4570 "CreateApiDestination",
4571 json!({
4572 "Name": name,
4573 "ConnectionArn": conn_arn_field,
4574 "InvocationEndpoint": "https://example.com",
4575 "HttpMethod": "POST"
4576 }),
4577 );
4578 svc.create_api_destination(&req).unwrap();
4579 }
4580
4581 #[test]
4584 fn list_connections_returns_all_by_default() {
4585 let svc = make_service();
4586 create_connection(&svc, "conn-alpha");
4587 create_connection(&svc, "conn-beta");
4588 create_connection(&svc, "conn-gamma");
4589
4590 let req = make_request("ListConnections", json!({}));
4591 let resp = svc.list_connections(&req).unwrap();
4592 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4593 assert_eq!(body["Connections"].as_array().unwrap().len(), 3);
4594 assert!(body["NextToken"].is_null());
4595 }
4596
4597 #[test]
4598 fn list_connections_name_prefix_filter() {
4599 let svc = make_service();
4600 create_connection(&svc, "prod-conn-1");
4601 create_connection(&svc, "prod-conn-2");
4602 create_connection(&svc, "dev-conn-1");
4603
4604 let req = make_request("ListConnections", json!({ "NamePrefix": "prod-" }));
4605 let resp = svc.list_connections(&req).unwrap();
4606 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4607 let names: Vec<&str> = body["Connections"]
4608 .as_array()
4609 .unwrap()
4610 .iter()
4611 .map(|c| c["Name"].as_str().unwrap())
4612 .collect();
4613 assert_eq!(names.len(), 2);
4614 assert!(names.iter().all(|n| n.starts_with("prod-")));
4615 }
4616
4617 #[test]
4618 fn list_connections_state_filter() {
4619 let svc = make_service();
4620 create_connection(&svc, "conn-a");
4621 create_connection(&svc, "conn-b");
4622
4623 {
4625 let mut _mas = svc.state.write();
4626 let state = _mas.default_mut();
4627 state
4628 .connections
4629 .get_mut("conn-b")
4630 .unwrap()
4631 .connection_state = "DEAUTHORIZED".to_string();
4632 }
4633
4634 let req = make_request(
4635 "ListConnections",
4636 json!({ "ConnectionState": "AUTHORIZED" }),
4637 );
4638 let resp = svc.list_connections(&req).unwrap();
4639 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4640 let conns = body["Connections"].as_array().unwrap();
4641 assert_eq!(conns.len(), 1);
4642 assert_eq!(conns[0]["Name"].as_str().unwrap(), "conn-a");
4643 }
4644
4645 #[test]
4646 fn list_connections_pagination() {
4647 let svc = make_service();
4648 for i in 0..5 {
4649 create_connection(&svc, &format!("conn-{i:02}"));
4650 }
4651
4652 let req = make_request("ListConnections", json!({ "Limit": 2 }));
4654 let resp = svc.list_connections(&req).unwrap();
4655 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4656 assert_eq!(body["Connections"].as_array().unwrap().len(), 2);
4657 let token = body["NextToken"].as_str().unwrap();
4658 assert_eq!(token, "2");
4659
4660 let req = make_request("ListConnections", json!({ "Limit": 2, "NextToken": token }));
4662 let resp = svc.list_connections(&req).unwrap();
4663 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4664 assert_eq!(body["Connections"].as_array().unwrap().len(), 2);
4665 let token = body["NextToken"].as_str().unwrap();
4666 assert_eq!(token, "4");
4667
4668 let req = make_request("ListConnections", json!({ "Limit": 2, "NextToken": token }));
4670 let resp = svc.list_connections(&req).unwrap();
4671 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4672 assert_eq!(body["Connections"].as_array().unwrap().len(), 1);
4673 assert!(body["NextToken"].is_null());
4674 }
4675
4676 #[test]
4677 fn list_connections_pagination_with_filter() {
4678 let svc = make_service();
4679 for i in 0..4 {
4680 create_connection(&svc, &format!("prod-{i:02}"));
4681 }
4682 create_connection(&svc, "dev-00");
4683
4684 let req = make_request(
4685 "ListConnections",
4686 json!({ "NamePrefix": "prod-", "Limit": 2 }),
4687 );
4688 let resp = svc.list_connections(&req).unwrap();
4689 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4690 assert_eq!(body["Connections"].as_array().unwrap().len(), 2);
4691 assert!(body["NextToken"].as_str().is_some());
4692 }
4693
4694 #[test]
4697 fn list_api_destinations_returns_all_by_default() {
4698 let svc = make_service();
4699 create_connection(&svc, "my-conn");
4700 create_api_destination(&svc, "dest-alpha", "my-conn");
4701 create_api_destination(&svc, "dest-beta", "my-conn");
4702
4703 let req = make_request("ListApiDestinations", json!({}));
4704 let resp = svc.list_api_destinations(&req).unwrap();
4705 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4706 assert_eq!(body["ApiDestinations"].as_array().unwrap().len(), 2);
4707 assert!(body["NextToken"].is_null());
4708 }
4709
4710 #[test]
4711 fn list_api_destinations_name_prefix_filter() {
4712 let svc = make_service();
4713 create_connection(&svc, "my-conn");
4714 create_api_destination(&svc, "prod-dest-1", "my-conn");
4715 create_api_destination(&svc, "prod-dest-2", "my-conn");
4716 create_api_destination(&svc, "dev-dest-1", "my-conn");
4717
4718 let req = make_request("ListApiDestinations", json!({ "NamePrefix": "prod-" }));
4719 let resp = svc.list_api_destinations(&req).unwrap();
4720 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4721 let names: Vec<&str> = body["ApiDestinations"]
4722 .as_array()
4723 .unwrap()
4724 .iter()
4725 .map(|d| d["Name"].as_str().unwrap())
4726 .collect();
4727 assert_eq!(names.len(), 2);
4728 assert!(names.iter().all(|n| n.starts_with("prod-")));
4729 }
4730
4731 #[test]
4732 fn list_api_destinations_connection_arn_filter() {
4733 let svc = make_service();
4734 create_connection(&svc, "conn-a");
4735 create_connection(&svc, "conn-b");
4736 create_api_destination(&svc, "dest-1", "conn-a");
4737 create_api_destination(&svc, "dest-2", "conn-b");
4738 create_api_destination(&svc, "dest-3", "conn-a");
4739
4740 let conn_a_arn = {
4741 let _mas = svc.state.read();
4742 let state = _mas.default_ref();
4743 state.connections.get("conn-a").unwrap().arn.clone()
4744 };
4745
4746 let req = make_request(
4747 "ListApiDestinations",
4748 json!({ "ConnectionArn": conn_a_arn }),
4749 );
4750 let resp = svc.list_api_destinations(&req).unwrap();
4751 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4752 let names: Vec<&str> = body["ApiDestinations"]
4753 .as_array()
4754 .unwrap()
4755 .iter()
4756 .map(|d| d["Name"].as_str().unwrap())
4757 .collect();
4758 assert_eq!(names.len(), 2);
4759 assert!(names.contains(&"dest-1"));
4760 assert!(names.contains(&"dest-3"));
4761 }
4762
4763 #[test]
4764 fn list_api_destinations_pagination() {
4765 let svc = make_service();
4766 create_connection(&svc, "my-conn");
4767 for i in 0..5 {
4768 create_api_destination(&svc, &format!("dest-{i:02}"), "my-conn");
4769 }
4770
4771 let req = make_request("ListApiDestinations", json!({ "Limit": 2 }));
4773 let resp = svc.list_api_destinations(&req).unwrap();
4774 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4775 assert_eq!(body["ApiDestinations"].as_array().unwrap().len(), 2);
4776 let token = body["NextToken"].as_str().unwrap();
4777 assert_eq!(token, "2");
4778
4779 let req = make_request(
4781 "ListApiDestinations",
4782 json!({ "Limit": 2, "NextToken": token }),
4783 );
4784 let resp = svc.list_api_destinations(&req).unwrap();
4785 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4786 assert_eq!(body["ApiDestinations"].as_array().unwrap().len(), 2);
4787 let token = body["NextToken"].as_str().unwrap();
4788 assert_eq!(token, "4");
4789
4790 let req = make_request(
4792 "ListApiDestinations",
4793 json!({ "Limit": 2, "NextToken": token }),
4794 );
4795 let resp = svc.list_api_destinations(&req).unwrap();
4796 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4797 assert_eq!(body["ApiDestinations"].as_array().unwrap().len(), 1);
4798 assert!(body["NextToken"].is_null());
4799 }
4800
4801 fn create_event_bus(svc: &EventBridgeService, name: &str) {
4804 let req = make_request("CreateEventBus", json!({ "Name": name }));
4805 svc.create_event_bus(&req).unwrap();
4806 }
4807
4808 #[test]
4809 fn list_event_buses_pagination() {
4810 let svc = make_service();
4811 for i in 0..4 {
4813 create_event_bus(&svc, &format!("bus-{i:02}"));
4814 }
4815
4816 let req = make_request("ListEventBuses", json!({ "Limit": 2 }));
4818 let resp = svc.list_event_buses(&req).unwrap();
4819 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4820 assert_eq!(body["EventBuses"].as_array().unwrap().len(), 2);
4821 let token = body["NextToken"].as_str().unwrap();
4822 assert_eq!(token, "2");
4823
4824 let req = make_request("ListEventBuses", json!({ "Limit": 2, "NextToken": token }));
4826 let resp = svc.list_event_buses(&req).unwrap();
4827 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4828 assert_eq!(body["EventBuses"].as_array().unwrap().len(), 2);
4829 let token = body["NextToken"].as_str().unwrap();
4830 assert_eq!(token, "4");
4831
4832 let req = make_request("ListEventBuses", json!({ "Limit": 2, "NextToken": token }));
4834 let resp = svc.list_event_buses(&req).unwrap();
4835 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4836 assert_eq!(body["EventBuses"].as_array().unwrap().len(), 1);
4837 assert!(body["NextToken"].is_null());
4838 }
4839
4840 #[test]
4841 fn list_event_buses_no_pagination_returns_all() {
4842 let svc = make_service();
4843 create_event_bus(&svc, "bus-alpha");
4844 create_event_bus(&svc, "bus-beta");
4845
4846 let req = make_request("ListEventBuses", json!({}));
4847 let resp = svc.list_event_buses(&req).unwrap();
4848 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4849 assert_eq!(body["EventBuses"].as_array().unwrap().len(), 3);
4851 assert!(body["NextToken"].is_null());
4852 }
4853
4854 #[test]
4857 fn put_events_never_includes_endpoint_id_in_response() {
4858 let svc = make_service();
4859 let req = make_request(
4861 "PutEvents",
4862 json!({
4863 "EndpointId": "my-endpoint.abc123",
4864 "Entries": [{
4865 "Source": "my.source",
4866 "DetailType": "MyType",
4867 "Detail": "{}",
4868 "EventBusName": "default"
4869 }]
4870 }),
4871 );
4872 let resp = svc.put_events(&req).unwrap();
4873 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4874 assert!(
4875 !body.as_object().unwrap().contains_key("EndpointId"),
4876 "EndpointId should never be in the PutEvents response"
4877 );
4878 assert_eq!(body["FailedEntryCount"], 0);
4879 }
4880
4881 fn create_archive(svc: &EventBridgeService, name: &str) {
4884 let req = make_request(
4885 "CreateArchive",
4886 json!({
4887 "ArchiveName": name,
4888 "EventSourceArn": "arn:aws:events:us-east-1:123456789012:event-bus/default"
4889 }),
4890 );
4891 svc.create_archive(&req).unwrap();
4892 }
4893
4894 #[test]
4895 fn list_archives_pagination() {
4896 let svc = make_service();
4897 for i in 0..5 {
4898 create_archive(&svc, &format!("archive-{i:02}"));
4899 }
4900
4901 let req = make_request("ListArchives", json!({ "Limit": 2 }));
4903 let resp = svc.list_archives(&req).unwrap();
4904 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4905 assert_eq!(body["Archives"].as_array().unwrap().len(), 2);
4906 let token = body["NextToken"].as_str().unwrap();
4907 assert_eq!(token, "2");
4908
4909 let req = make_request("ListArchives", json!({ "Limit": 2, "NextToken": token }));
4911 let resp = svc.list_archives(&req).unwrap();
4912 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4913 assert_eq!(body["Archives"].as_array().unwrap().len(), 2);
4914 let token = body["NextToken"].as_str().unwrap();
4915 assert_eq!(token, "4");
4916
4917 let req = make_request("ListArchives", json!({ "Limit": 2, "NextToken": token }));
4919 let resp = svc.list_archives(&req).unwrap();
4920 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4921 assert_eq!(body["Archives"].as_array().unwrap().len(), 1);
4922 assert!(body["NextToken"].is_null());
4923 }
4924
4925 fn create_replay(svc: &EventBridgeService, name: &str) {
4928 let archive_arn = {
4930 let guard = svc.state.read();
4931 let st = guard.default_ref();
4932 if st.archives.contains_key("replay-archive") {
4933 st.archives["replay-archive"].arn.clone()
4934 } else {
4935 drop(guard);
4936 create_archive(svc, "replay-archive");
4937 svc.state.read().default_ref().archives["replay-archive"]
4938 .arn
4939 .clone()
4940 }
4941 };
4942 let req = make_request(
4943 "StartReplay",
4944 json!({
4945 "ReplayName": name,
4946 "EventSourceArn": archive_arn,
4947 "EventStartTime": 1000000.0,
4948 "EventEndTime": 2000000.0,
4949 "Destination": {
4950 "Arn": "arn:aws:events:us-east-1:123456789012:event-bus/default"
4951 }
4952 }),
4953 );
4954 svc.start_replay(&req).unwrap();
4955 }
4956
4957 #[test]
4958 fn list_replays_pagination() {
4959 let svc = make_service();
4960 for i in 0..5 {
4961 create_replay(&svc, &format!("replay-{i:02}"));
4962 }
4963
4964 let req = make_request("ListReplays", json!({ "Limit": 2 }));
4966 let resp = svc.list_replays(&req).unwrap();
4967 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4968 assert_eq!(body["Replays"].as_array().unwrap().len(), 2);
4969 let token = body["NextToken"].as_str().unwrap();
4970 assert_eq!(token, "2");
4971
4972 let req = make_request("ListReplays", json!({ "Limit": 2, "NextToken": token }));
4974 let resp = svc.list_replays(&req).unwrap();
4975 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4976 assert_eq!(body["Replays"].as_array().unwrap().len(), 2);
4977 let token = body["NextToken"].as_str().unwrap();
4978 assert_eq!(token, "4");
4979
4980 let req = make_request("ListReplays", json!({ "Limit": 2, "NextToken": token }));
4982 let resp = svc.list_replays(&req).unwrap();
4983 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4984 assert_eq!(body["Replays"].as_array().unwrap().len(), 1);
4985 assert!(body["NextToken"].is_null());
4986 }
4987
4988 #[test]
4989 fn list_event_buses_invalid_next_token_returns_error() {
4990 let svc = make_service();
4991
4992 let req = make_request("ListEventBuses", json!({ "NextToken": "not-a-number" }));
4993 let result = svc.list_event_buses(&req);
4994 assert!(
4995 result.is_err(),
4996 "non-numeric NextToken should return an error"
4997 );
4998 }
4999
5000 #[test]
5003 fn test_event_pattern_match() {
5004 let svc = make_service();
5005 let req = make_request(
5006 "TestEventPattern",
5007 json!({
5008 "EventPattern": r#"{"source": ["my.app"]}"#,
5009 "Event": r#"{"source": "my.app", "detail-type": "Test", "detail": {}}"#
5010 }),
5011 );
5012 let resp = svc.test_event_pattern(&req).unwrap();
5013 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5014 assert_eq!(body["Result"], true);
5015 }
5016
5017 #[test]
5018 fn test_event_pattern_no_match() {
5019 let svc = make_service();
5020 let req = make_request(
5021 "TestEventPattern",
5022 json!({
5023 "EventPattern": r#"{"source": ["other.app"]}"#,
5024 "Event": r#"{"source": "my.app", "detail-type": "Test", "detail": {}}"#
5025 }),
5026 );
5027 let resp = svc.test_event_pattern(&req).unwrap();
5028 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5029 assert_eq!(body["Result"], false);
5030 }
5031
5032 #[test]
5033 fn test_event_pattern_detail_match() {
5034 let svc = make_service();
5035 let req = make_request(
5036 "TestEventPattern",
5037 json!({
5038 "EventPattern": r#"{"detail": {"status": ["PLACED"]}}"#,
5039 "Event": r#"{"source": "my.app", "detail-type": "Order", "detail": {"status": "PLACED", "id": "123"}}"#
5040 }),
5041 );
5042 let resp = svc.test_event_pattern(&req).unwrap();
5043 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5044 assert_eq!(body["Result"], true);
5045 }
5046
5047 #[test]
5050 fn update_event_bus_description() {
5051 let svc = make_service();
5052 create_event_bus(&svc, "my-bus");
5053
5054 let req = make_request(
5055 "UpdateEventBus",
5056 json!({ "Name": "my-bus", "Description": "Updated desc" }),
5057 );
5058 let resp = svc.update_event_bus(&req).unwrap();
5059 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5060 assert_eq!(body["Name"], "my-bus");
5061
5062 let req = make_request("DescribeEventBus", json!({ "Name": "my-bus" }));
5064 let resp = svc.describe_event_bus(&req).unwrap();
5065 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5066 assert_eq!(body["Description"], "Updated desc");
5067 }
5068
5069 #[test]
5070 fn update_event_bus_not_found() {
5071 let svc = make_service();
5072 let req = make_request(
5073 "UpdateEventBus",
5074 json!({ "Name": "ghost-bus", "Description": "nope" }),
5075 );
5076 assert!(svc.update_event_bus(&req).is_err());
5077 }
5078
5079 fn create_endpoint_helper(svc: &EventBridgeService, name: &str) {
5082 let req = make_request(
5083 "CreateEndpoint",
5084 json!({
5085 "Name": name,
5086 "RoutingConfig": {
5087 "FailoverConfig": {
5088 "Primary": { "HealthCheck": "" },
5089 "Secondary": { "Route": "us-west-2" }
5090 }
5091 },
5092 "EventBuses": [
5093 { "EventBusArn": "arn:aws:events:us-east-1:123456789012:event-bus/default" }
5094 ]
5095 }),
5096 );
5097 svc.create_endpoint(&req).unwrap();
5098 }
5099
5100 #[test]
5101 fn endpoint_create_describe_delete() {
5102 let svc = make_service();
5103 create_endpoint_helper(&svc, "my-endpoint");
5104
5105 let req = make_request("DescribeEndpoint", json!({ "Name": "my-endpoint" }));
5107 let resp = svc.describe_endpoint(&req).unwrap();
5108 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5109 assert_eq!(body["Name"], "my-endpoint");
5110 assert_eq!(body["State"], "ACTIVE");
5111 assert!(body["EndpointId"].as_str().unwrap().contains("my-endpoint"));
5112
5113 let req = make_request("DeleteEndpoint", json!({ "Name": "my-endpoint" }));
5115 svc.delete_endpoint(&req).unwrap();
5116
5117 let req = make_request("DescribeEndpoint", json!({ "Name": "my-endpoint" }));
5119 assert!(svc.describe_endpoint(&req).is_err());
5120 }
5121
5122 #[test]
5123 fn endpoint_list_and_update() {
5124 let svc = make_service();
5125 create_endpoint_helper(&svc, "ep-alpha");
5126 create_endpoint_helper(&svc, "ep-beta");
5127
5128 let req = make_request("ListEndpoints", json!({}));
5130 let resp = svc.list_endpoints(&req).unwrap();
5131 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5132 assert_eq!(body["Endpoints"].as_array().unwrap().len(), 2);
5133
5134 let req = make_request(
5136 "UpdateEndpoint",
5137 json!({ "Name": "ep-alpha", "Description": "updated" }),
5138 );
5139 let resp = svc.update_endpoint(&req).unwrap();
5140 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5141 assert_eq!(body["Name"], "ep-alpha");
5142
5143 let req = make_request("DescribeEndpoint", json!({ "Name": "ep-alpha" }));
5145 let resp = svc.describe_endpoint(&req).unwrap();
5146 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5147 assert_eq!(body["Description"], "updated");
5148 }
5149
5150 #[test]
5151 fn endpoint_duplicate_fails() {
5152 let svc = make_service();
5153 create_endpoint_helper(&svc, "dup-ep");
5154 let req = make_request(
5155 "CreateEndpoint",
5156 json!({
5157 "Name": "dup-ep",
5158 "RoutingConfig": {},
5159 "EventBuses": []
5160 }),
5161 );
5162 assert!(svc.create_endpoint(&req).is_err());
5163 }
5164
5165 #[test]
5168 fn deauthorize_connection_sets_state() {
5169 let svc = make_service();
5170 create_connection(&svc, "deauth-conn");
5171
5172 let req = make_request("DeauthorizeConnection", json!({ "Name": "deauth-conn" }));
5173 let resp = svc.deauthorize_connection(&req).unwrap();
5174 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5175 assert_eq!(body["ConnectionState"], "DEAUTHORIZING");
5176 assert!(body["ConnectionArn"]
5177 .as_str()
5178 .unwrap()
5179 .contains("deauth-conn"));
5180
5181 let req = make_request("DescribeConnection", json!({ "Name": "deauth-conn" }));
5183 let resp = svc.describe_connection(&req).unwrap();
5184 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5185 assert_eq!(body["ConnectionState"], "DEAUTHORIZING");
5186 }
5187
5188 #[test]
5189 fn deauthorize_connection_not_found() {
5190 let svc = make_service();
5191 let req = make_request("DeauthorizeConnection", json!({ "Name": "ghost-conn" }));
5192 assert!(svc.deauthorize_connection(&req).is_err());
5193 }
5194
5195 #[test]
5198 fn partner_event_source_crud() {
5199 let svc = make_service();
5200
5201 let req = make_request(
5203 "CreatePartnerEventSource",
5204 json!({ "Name": "partner/test", "Account": "123456789012" }),
5205 );
5206 svc.create_partner_event_source(&req).unwrap();
5207
5208 let req = make_request(
5210 "DescribePartnerEventSource",
5211 json!({ "Name": "partner/test" }),
5212 );
5213 let resp = svc.describe_partner_event_source(&req).unwrap();
5214 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5215 assert_eq!(body["Name"], "partner/test");
5216
5217 let req = make_request("ListPartnerEventSources", json!({"NamePrefix": "partner/"}));
5219 let resp = svc.list_partner_event_sources(&req).unwrap();
5220 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5221 assert_eq!(body["PartnerEventSources"].as_array().unwrap().len(), 1);
5222
5223 let req = make_request(
5225 "ListPartnerEventSourceAccounts",
5226 json!({ "EventSourceName": "partner/test" }),
5227 );
5228 let resp = svc.list_partner_event_source_accounts(&req).unwrap();
5229 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5230 assert_eq!(
5231 body["PartnerEventSourceAccounts"].as_array().unwrap().len(),
5232 1
5233 );
5234
5235 let req = make_request("DescribeEventSource", json!({ "Name": "partner/test" }));
5237 let resp = svc.describe_event_source(&req).unwrap();
5238 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5239 assert_eq!(body["Name"], "partner/test");
5240 assert_eq!(body["State"], "ACTIVE");
5241
5242 let req = make_request("ListEventSources", json!({}));
5244 let resp = svc.list_event_sources(&req).unwrap();
5245 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5246 assert_eq!(body["EventSources"].as_array().unwrap().len(), 1);
5247
5248 let req = make_request(
5250 "DeletePartnerEventSource",
5251 json!({ "Name": "partner/test", "Account": "123456789012" }),
5252 );
5253 svc.delete_partner_event_source(&req).unwrap();
5254
5255 let req = make_request(
5257 "DescribePartnerEventSource",
5258 json!({ "Name": "partner/test" }),
5259 );
5260 assert!(svc.describe_partner_event_source(&req).is_err());
5261 }
5262
5263 #[test]
5264 fn activate_deactivate_event_source() {
5265 let svc = make_service();
5266
5267 let req = make_request(
5269 "CreatePartnerEventSource",
5270 json!({ "Name": "aws.partner/test", "Account": "123456789012" }),
5271 );
5272 svc.create_partner_event_source(&req).unwrap();
5273
5274 let req = make_request(
5276 "DeactivateEventSource",
5277 json!({ "Name": "aws.partner/test" }),
5278 );
5279 svc.deactivate_event_source(&req).unwrap();
5280 {
5281 let _mas = svc.state.read();
5282 let state = _mas.default_ref();
5283 assert_eq!(
5284 state.partner_event_sources["aws.partner/test"].state,
5285 "INACTIVE"
5286 );
5287 }
5288
5289 let req = make_request("ActivateEventSource", json!({ "Name": "aws.partner/test" }));
5291 svc.activate_event_source(&req).unwrap();
5292 {
5293 let _mas = svc.state.read();
5294 let state = _mas.default_ref();
5295 assert_eq!(
5296 state.partner_event_sources["aws.partner/test"].state,
5297 "ACTIVE"
5298 );
5299 }
5300
5301 let req = make_request("ActivateEventSource", json!({ "Name": "nonexistent" }));
5303 assert!(svc.activate_event_source(&req).is_err());
5304
5305 let req = make_request("DeactivateEventSource", json!({ "Name": "nonexistent" }));
5306 assert!(svc.deactivate_event_source(&req).is_err());
5307 }
5308
5309 #[test]
5310 fn delete_partner_event_source_verifies_account() {
5311 let svc = make_service();
5312
5313 let req = make_request(
5315 "CreatePartnerEventSource",
5316 json!({ "Name": "aws.partner/test", "Account": "123456789012" }),
5317 );
5318 svc.create_partner_event_source(&req).unwrap();
5319
5320 let req = make_request(
5322 "DeletePartnerEventSource",
5323 json!({ "Name": "aws.partner/test", "Account": "999999999999" }),
5324 );
5325 assert!(svc.delete_partner_event_source(&req).is_err());
5326 assert!(svc
5328 .state
5329 .read()
5330 .default_ref()
5331 .partner_event_sources
5332 .contains_key("aws.partner/test"));
5333
5334 let req = make_request(
5336 "DeletePartnerEventSource",
5337 json!({ "Name": "aws.partner/test", "Account": "123456789012" }),
5338 );
5339 svc.delete_partner_event_source(&req).unwrap();
5340 assert!(!svc
5341 .state
5342 .read()
5343 .default_ref()
5344 .partner_event_sources
5345 .contains_key("aws.partner/test"));
5346
5347 let req = make_request(
5349 "DeletePartnerEventSource",
5350 json!({ "Name": "aws.partner/test", "Account": "123456789012" }),
5351 );
5352 assert!(svc.delete_partner_event_source(&req).is_err());
5353 }
5354
5355 #[test]
5356 fn put_partner_events() {
5357 let svc = make_service();
5358 let req = make_request(
5359 "PutPartnerEvents",
5360 json!({
5361 "Entries": [
5362 { "Source": "partner.app", "DetailType": "Test", "Detail": "{}" }
5363 ]
5364 }),
5365 );
5366 let resp = svc.put_partner_events(&req).unwrap();
5367 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5368 assert_eq!(body["FailedEntryCount"], 0);
5369 assert_eq!(body["Entries"].as_array().unwrap().len(), 1);
5370 assert!(body["Entries"][0]["EventId"].as_str().is_some());
5371 }
5372
5373 #[allow(clippy::type_complexity)]
5377 fn make_service_with_sqs_recorder() -> (
5378 EventBridgeService,
5379 Arc<parking_lot::Mutex<Vec<(String, String)>>>,
5380 ) {
5381 use fakecloud_core::delivery::SqsDelivery;
5382
5383 struct RecordingSqsDelivery {
5384 messages: Arc<parking_lot::Mutex<Vec<(String, String)>>>,
5385 }
5386
5387 impl SqsDelivery for RecordingSqsDelivery {
5388 fn deliver_to_queue(
5389 &self,
5390 queue_arn: &str,
5391 message_body: &str,
5392 _attributes: &HashMap<String, String>,
5393 ) {
5394 self.messages
5395 .lock()
5396 .push((queue_arn.to_string(), message_body.to_string()));
5397 }
5398 }
5399
5400 let messages: Arc<parking_lot::Mutex<Vec<(String, String)>>> =
5401 Arc::new(parking_lot::Mutex::new(Vec::new()));
5402 let state = Arc::new(RwLock::new(
5403 fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
5404 ));
5405 let delivery = Arc::new(DeliveryBus::new().with_sqs(Arc::new(RecordingSqsDelivery {
5406 messages: messages.clone(),
5407 })));
5408 let svc = EventBridgeService::new(state, delivery);
5409 (svc, messages)
5410 }
5411
5412 #[test]
5413 fn start_replay_delivers_archived_events_to_sqs_target() {
5414 let (svc, messages) = make_service_with_sqs_recorder();
5415 let queue_arn = "arn:aws:sqs:us-east-1:123456789012:replay-queue";
5416
5417 let req = make_request(
5419 "PutRule",
5420 json!({
5421 "Name": "replay-test-rule",
5422 "EventPattern": r#"{"source": ["my.app"]}"#,
5423 "State": "ENABLED"
5424 }),
5425 );
5426 svc.put_rule(&req).unwrap();
5427
5428 let req = make_request(
5429 "PutTargets",
5430 json!({
5431 "Rule": "replay-test-rule",
5432 "Targets": [{
5433 "Id": "sqs-target",
5434 "Arn": queue_arn
5435 }]
5436 }),
5437 );
5438 svc.put_targets(&req).unwrap();
5439
5440 let req = make_request(
5442 "CreateArchive",
5443 json!({
5444 "ArchiveName": "test-archive",
5445 "EventSourceArn": "arn:aws:events:us-east-1:123456789012:event-bus/default"
5446 }),
5447 );
5448 svc.create_archive(&req).unwrap();
5449
5450 let req = make_request(
5452 "PutEvents",
5453 json!({
5454 "Entries": [
5455 {
5456 "Source": "my.app",
5457 "DetailType": "OrderCreated",
5458 "Detail": "{\"orderId\": \"1\"}",
5459 "EventBusName": "default"
5460 },
5461 {
5462 "Source": "my.app",
5463 "DetailType": "OrderShipped",
5464 "Detail": "{\"orderId\": \"2\"}",
5465 "EventBusName": "default"
5466 }
5467 ]
5468 }),
5469 );
5470 let resp = svc.put_events(&req).unwrap();
5471 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5472 assert_eq!(body["FailedEntryCount"], 0);
5473
5474 {
5476 let _mas = svc.state.read();
5477 let state = _mas.default_ref();
5478 let archive = state.archives.get("test-archive").unwrap();
5479 assert_eq!(archive.events.len(), 2);
5480 assert_eq!(archive.event_count, 2);
5481 }
5482
5483 messages.lock().clear();
5485
5486 let archive_arn = {
5488 let _mas = svc.state.read();
5489 let state = _mas.default_ref();
5490 state.archives.get("test-archive").unwrap().arn.clone()
5491 };
5492
5493 let start_ts = 0.0_f64;
5495 let end_ts = (chrono::Utc::now().timestamp() + 3600) as f64;
5496
5497 let req = make_request(
5498 "StartReplay",
5499 json!({
5500 "ReplayName": "my-replay",
5501 "EventSourceArn": archive_arn,
5502 "Destination": {
5503 "Arn": "arn:aws:events:us-east-1:123456789012:event-bus/default"
5504 },
5505 "EventStartTime": start_ts,
5506 "EventEndTime": end_ts
5507 }),
5508 );
5509 let resp = svc.start_replay(&req).unwrap();
5510 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5511 assert_eq!(body["State"], "STARTING");
5512
5513 let delivered = messages.lock();
5515 assert_eq!(
5516 delivered.len(),
5517 2,
5518 "expected 2 replayed events delivered to SQS"
5519 );
5520 for (arn, msg) in delivered.iter() {
5521 assert_eq!(arn, queue_arn);
5522 let event: Value = serde_json::from_str(msg).unwrap();
5523 assert_eq!(event["source"], "my.app");
5524 assert!(event["replay-name"].as_str().is_some());
5526 }
5527
5528 let _mas = svc.state.read();
5530 let state = _mas.default_ref();
5531 let replay = state.replays.get("my-replay").unwrap();
5532 assert_eq!(replay.state, "COMPLETED");
5533 }
5534
5535 #[test]
5536 fn apply_connection_auth_api_key() {
5537 let conn = Connection {
5538 name: "test-conn".to_string(),
5539 arn: "arn:aws:events:us-east-1:123456789012:connection/test-conn/uuid".to_string(),
5540 description: None,
5541 authorization_type: "API_KEY".to_string(),
5542 auth_parameters: json!({
5543 "ApiKeyAuthParameters": {
5544 "ApiKeyName": "x-api-key",
5545 "ApiKeyValue": "my-secret"
5546 }
5547 }),
5548 connection_state: "AUTHORIZED".to_string(),
5549 secret_arn: "arn:aws:secretsmanager:us-east-1:123456789012:secret:test".to_string(),
5550 creation_time: Utc::now(),
5551 last_modified_time: Utc::now(),
5552 last_authorized_time: Utc::now(),
5553 };
5554
5555 let client = reqwest::Client::new();
5556 let builder = client
5557 .post("http://localhost:12345/test")
5558 .header("Content-Type", "application/json");
5559 let builder = apply_connection_auth(builder, &conn);
5560
5561 let request = builder.body("{}").build().unwrap();
5563 assert_eq!(
5564 request
5565 .headers()
5566 .get("x-api-key")
5567 .unwrap()
5568 .to_str()
5569 .unwrap(),
5570 "my-secret"
5571 );
5572 }
5573
5574 #[test]
5575 fn apply_connection_auth_basic() {
5576 let conn = Connection {
5577 name: "basic-conn".to_string(),
5578 arn: "arn:aws:events:us-east-1:123456789012:connection/basic-conn/uuid".to_string(),
5579 description: None,
5580 authorization_type: "BASIC".to_string(),
5581 auth_parameters: json!({
5582 "BasicAuthParameters": {
5583 "Username": "user",
5584 "Password": "pass"
5585 }
5586 }),
5587 connection_state: "AUTHORIZED".to_string(),
5588 secret_arn: "arn:aws:secretsmanager:us-east-1:123456789012:secret:test".to_string(),
5589 creation_time: Utc::now(),
5590 last_modified_time: Utc::now(),
5591 last_authorized_time: Utc::now(),
5592 };
5593
5594 let client = reqwest::Client::new();
5595 let builder = client.post("http://localhost:12345/test");
5596 let builder = apply_connection_auth(builder, &conn);
5597
5598 let request = builder.body("{}").build().unwrap();
5599 let auth_header = request
5600 .headers()
5601 .get("authorization")
5602 .unwrap()
5603 .to_str()
5604 .unwrap();
5605 assert!(
5606 auth_header.starts_with("Basic "),
5607 "Expected Basic auth header, got: {auth_header}"
5608 );
5609 }
5610
5611 #[tokio::test]
5612 async fn put_events_with_api_destination_target_resolves_destination() {
5613 let state = Arc::new(RwLock::new(
5617 fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
5618 ));
5619 let delivery = Arc::new(DeliveryBus::new());
5620 let svc = EventBridgeService::new(state, delivery);
5621
5622 create_connection(&svc, "my-conn");
5624 let conn_arn = {
5625 let _mas = svc.state.read();
5626 let state = _mas.default_ref();
5627 state.connections.get("my-conn").unwrap().arn.clone()
5628 };
5629 let req = make_request(
5630 "CreateApiDestination",
5631 json!({
5632 "Name": "my-dest",
5633 "ConnectionArn": conn_arn,
5634 "InvocationEndpoint": "http://127.0.0.1:1/noop",
5635 "HttpMethod": "POST"
5636 }),
5637 );
5638 svc.create_api_destination(&req).unwrap();
5639
5640 let dest_arn = {
5641 let _mas = svc.state.read();
5642 let state = _mas.default_ref();
5643 state.api_destinations.get("my-dest").unwrap().arn.clone()
5644 };
5645
5646 let req = make_request(
5648 "PutRule",
5649 json!({
5650 "Name": "api-dest-rule",
5651 "EventPattern": r#"{"source":["test.app"]}"#,
5652 "State": "ENABLED"
5653 }),
5654 );
5655 svc.put_rule(&req).unwrap();
5656
5657 let req = make_request(
5658 "PutTargets",
5659 json!({
5660 "Rule": "api-dest-rule",
5661 "Targets": [{ "Id": "dest-target", "Arn": dest_arn }]
5662 }),
5663 );
5664 svc.put_targets(&req).unwrap();
5665
5666 let req = make_request(
5668 "PutEvents",
5669 json!({
5670 "Entries": [{
5671 "Source": "test.app",
5672 "DetailType": "TestEvent",
5673 "Detail": r#"{"key":"value"}"#
5674 }]
5675 }),
5676 );
5677 let resp = svc.put_events(&req).unwrap();
5678 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5679 assert_eq!(body["FailedEntryCount"], 0);
5680 assert_eq!(body["Entries"].as_array().unwrap().len(), 1);
5681 assert!(body["Entries"][0]["EventId"].as_str().is_some());
5682 }
5683
5684 #[test]
5685 fn test_function_name_from_arn() {
5686 assert_eq!(
5688 super::function_name_from_arn("arn:aws:lambda:us-east-1:123456789012:function:my-func"),
5689 "my-func"
5690 );
5691 assert_eq!(
5693 super::function_name_from_arn(
5694 "arn:aws:lambda:us-east-1:123456789012:function:my-func:prod"
5695 ),
5696 "my-func"
5697 );
5698 assert_eq!(
5700 super::function_name_from_arn(
5701 "arn:aws:lambda:us-east-1:123456789012:function:my-func:42"
5702 ),
5703 "my-func"
5704 );
5705 assert_eq!(super::function_name_from_arn("my-func"), "my-func");
5707 }
5708
5709 fn put_rule_simple(svc: &EventBridgeService, name: &str) {
5712 let req = make_request(
5713 "PutRule",
5714 json!({ "Name": name, "EventPattern": r#"{"source":["a"]}"# }),
5715 );
5716 svc.put_rule(&req).unwrap();
5717 }
5718
5719 #[test]
5720 fn put_rule_persists_event_pattern_and_state() {
5721 let svc = make_service();
5722 put_rule_simple(&svc, "r1");
5723 let _mas = svc.state.read();
5724 let state = _mas.default_ref();
5725 let rule = state
5726 .rules
5727 .get(&("default".to_string(), "r1".to_string()))
5728 .unwrap();
5729 assert_eq!(rule.state, "ENABLED");
5730 assert!(rule.event_pattern.is_some());
5731 assert!(rule.arn.contains("rule/r1"));
5732 }
5733
5734 #[test]
5735 fn put_rule_rejects_schedule_on_non_default_bus() {
5736 let svc = make_service();
5737 let bus_req = make_request("CreateEventBus", json!({ "Name": "custom" }));
5739 svc.create_event_bus(&bus_req).unwrap();
5740
5741 let req = make_request(
5742 "PutRule",
5743 json!({
5744 "Name": "r1",
5745 "EventBusName": "custom",
5746 "ScheduleExpression": "rate(5 minutes)"
5747 }),
5748 );
5749 let err = svc.put_rule(&req).err().expect("expected error");
5750 assert_eq!(err.code(), "ValidationException");
5751 }
5752
5753 #[test]
5754 fn put_rule_rejects_unknown_event_bus() {
5755 let svc = make_service();
5756 let req = make_request(
5757 "PutRule",
5758 json!({ "Name": "r1", "EventBusName": "ghost", "EventPattern": r#"{"source":["a"]}"# }),
5759 );
5760 let err = svc.put_rule(&req).err().expect("expected error");
5761 assert_eq!(err.code(), "ResourceNotFoundException");
5762 }
5763
5764 #[test]
5765 fn put_rule_overlay_preserves_existing_targets() {
5766 let svc = make_service();
5767 put_rule_simple(&svc, "r1");
5768 {
5770 let mut _mas = svc.state.write();
5771 let state = _mas.default_mut();
5772 let rule = state
5773 .rules
5774 .get_mut(&("default".to_string(), "r1".to_string()))
5775 .unwrap();
5776 rule.targets.push(crate::state::EventTarget {
5777 id: "t1".to_string(),
5778 arn: "arn:aws:sqs:us-east-1:123456789012:q".to_string(),
5779 input: None,
5780 input_path: None,
5781 input_transformer: None,
5782 sqs_parameters: None,
5783 });
5784 }
5785
5786 let req = make_request(
5788 "PutRule",
5789 json!({ "Name": "r1", "Description": "updated", "EventPattern": r#"{"source":["a"]}"# }),
5790 );
5791 svc.put_rule(&req).unwrap();
5792 let _mas = svc.state.read();
5793 let state = _mas.default_ref();
5794 let rule = state
5795 .rules
5796 .get(&("default".to_string(), "r1".to_string()))
5797 .unwrap();
5798 assert_eq!(rule.description.as_deref(), Some("updated"));
5799 assert_eq!(rule.targets.len(), 1);
5800 }
5801
5802 #[test]
5803 fn delete_rule_with_targets_errors() {
5804 let svc = make_service();
5805 put_rule_simple(&svc, "r1");
5806 let put_targets_req = make_request(
5807 "PutTargets",
5808 json!({
5809 "Rule": "r1",
5810 "Targets": [{ "Id": "t1", "Arn": "arn:aws:sqs:us-east-1:123456789012:q" }]
5811 }),
5812 );
5813 svc.put_targets(&put_targets_req).unwrap();
5814
5815 let req = make_request("DeleteRule", json!({ "Name": "r1" }));
5816 let err = svc.delete_rule(&req).err().expect("expected error");
5817 assert_eq!(err.code(), "ValidationException");
5818 }
5819
5820 #[test]
5821 fn delete_rule_after_remove_targets_succeeds() {
5822 let svc = make_service();
5823 put_rule_simple(&svc, "r1");
5824 let put_t = make_request(
5825 "PutTargets",
5826 json!({
5827 "Rule": "r1",
5828 "Targets": [{ "Id": "t1", "Arn": "arn:aws:sqs:us-east-1:123456789012:q" }]
5829 }),
5830 );
5831 svc.put_targets(&put_t).unwrap();
5832 let rm_t = make_request("RemoveTargets", json!({ "Rule": "r1", "Ids": ["t1"] }));
5833 svc.remove_targets(&rm_t).unwrap();
5834 let del = make_request("DeleteRule", json!({ "Name": "r1" }));
5835 svc.delete_rule(&del).unwrap();
5836 assert!(!svc
5837 .state
5838 .read()
5839 .default_ref()
5840 .rules
5841 .contains_key(&("default".to_string(), "r1".to_string())));
5842 }
5843
5844 #[test]
5845 fn enable_disable_rule_toggles_state() {
5846 let svc = make_service();
5847 put_rule_simple(&svc, "r1");
5848 let dis = make_request("DisableRule", json!({ "Name": "r1" }));
5849 svc.disable_rule(&dis).unwrap();
5850 assert_eq!(
5851 svc.state
5852 .read()
5853 .default_ref()
5854 .rules
5855 .get(&("default".to_string(), "r1".to_string()))
5856 .unwrap()
5857 .state,
5858 "DISABLED"
5859 );
5860 let en = make_request("EnableRule", json!({ "Name": "r1" }));
5861 svc.enable_rule(&en).unwrap();
5862 assert_eq!(
5863 svc.state
5864 .read()
5865 .default_ref()
5866 .rules
5867 .get(&("default".to_string(), "r1".to_string()))
5868 .unwrap()
5869 .state,
5870 "ENABLED"
5871 );
5872 }
5873
5874 #[test]
5875 fn enable_rule_unknown_errors() {
5876 let svc = make_service();
5877 let req = make_request("EnableRule", json!({ "Name": "ghost" }));
5878 let err = svc.enable_rule(&req).err().expect("expected error");
5879 assert_eq!(err.code(), "ResourceNotFoundException");
5880 }
5881
5882 #[test]
5883 fn list_rules_with_name_prefix_filter() {
5884 let svc = make_service();
5885 put_rule_simple(&svc, "prod-orders");
5886 put_rule_simple(&svc, "prod-shipping");
5887 put_rule_simple(&svc, "dev-orders");
5888
5889 let req = make_request("ListRules", json!({ "NamePrefix": "prod-" }));
5890 let resp = svc.list_rules(&req).unwrap();
5891 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5892 let names: Vec<&str> = body["Rules"]
5893 .as_array()
5894 .unwrap()
5895 .iter()
5896 .map(|r| r["Name"].as_str().unwrap())
5897 .collect();
5898 assert_eq!(names.len(), 2);
5899 assert!(names.iter().all(|n| n.starts_with("prod-")));
5900 }
5901
5902 #[test]
5903 fn list_rules_pagination_emits_next_token() {
5904 let svc = make_service();
5905 for i in 0..5 {
5906 put_rule_simple(&svc, &format!("r{i}"));
5907 }
5908 let req = make_request("ListRules", json!({ "Limit": 2 }));
5909 let resp = svc.list_rules(&req).unwrap();
5910 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5911 assert_eq!(body["Rules"].as_array().unwrap().len(), 2);
5912 assert!(body["NextToken"].is_string());
5913 }
5914
5915 #[test]
5916 fn describe_rule_returns_persisted_fields() {
5917 let svc = make_service();
5918 let put = make_request(
5919 "PutRule",
5920 json!({
5921 "Name": "r1",
5922 "EventPattern": r#"{"source":["a"]}"#,
5923 "Description": "hi",
5924 "State": "DISABLED"
5925 }),
5926 );
5927 svc.put_rule(&put).unwrap();
5928 let desc = make_request("DescribeRule", json!({ "Name": "r1" }));
5929 let resp = svc.describe_rule(&desc).unwrap();
5930 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5931 assert_eq!(body["Name"], json!("r1"));
5932 assert_eq!(body["State"], json!("DISABLED"));
5933 assert_eq!(body["Description"], json!("hi"));
5934 }
5935
5936 #[test]
5937 fn describe_rule_unknown_errors() {
5938 let svc = make_service();
5939 let req = make_request("DescribeRule", json!({ "Name": "ghost" }));
5940 let err = svc.describe_rule(&req).err().expect("expected error");
5941 assert_eq!(err.code(), "ResourceNotFoundException");
5942 }
5943
5944 #[test]
5945 fn put_targets_rejects_fifo_without_sqs_parameters() {
5946 let svc = make_service();
5947 put_rule_simple(&svc, "r1");
5948 let req = make_request(
5949 "PutTargets",
5950 json!({
5951 "Rule": "r1",
5952 "Targets": [{ "Id": "t1", "Arn": "arn:aws:sqs:us-east-1:123456789012:q.fifo" }]
5953 }),
5954 );
5955 let err = svc.put_targets(&req).err().expect("expected error");
5956 assert_eq!(err.code(), "ValidationException");
5957 }
5958
5959 #[test]
5960 fn put_targets_rejects_invalid_arn() {
5961 let svc = make_service();
5962 put_rule_simple(&svc, "r1");
5963 let req = make_request(
5964 "PutTargets",
5965 json!({
5966 "Rule": "r1",
5967 "Targets": [{ "Id": "t1", "Arn": "not-an-arn" }]
5968 }),
5969 );
5970 let err = svc.put_targets(&req).err().expect("expected error");
5971 assert_eq!(err.code(), "ValidationException");
5972 }
5973
5974 #[test]
5975 fn put_targets_unknown_rule_errors() {
5976 let svc = make_service();
5977 let req = make_request(
5978 "PutTargets",
5979 json!({
5980 "Rule": "ghost",
5981 "Targets": [{ "Id": "t1", "Arn": "arn:aws:sqs:us-east-1:123456789012:q" }]
5982 }),
5983 );
5984 let err = svc.put_targets(&req).err().expect("expected error");
5985 assert_eq!(err.code(), "ResourceNotFoundException");
5986 }
5987
5988 #[test]
5989 fn put_targets_replaces_existing_with_same_id() {
5990 let svc = make_service();
5991 put_rule_simple(&svc, "r1");
5992 let first = make_request(
5993 "PutTargets",
5994 json!({
5995 "Rule": "r1",
5996 "Targets": [{ "Id": "t1", "Arn": "arn:aws:sqs:us-east-1:123456789012:q1" }]
5997 }),
5998 );
5999 svc.put_targets(&first).unwrap();
6000 let second = make_request(
6001 "PutTargets",
6002 json!({
6003 "Rule": "r1",
6004 "Targets": [{ "Id": "t1", "Arn": "arn:aws:sqs:us-east-1:123456789012:q2" }]
6005 }),
6006 );
6007 svc.put_targets(&second).unwrap();
6008
6009 let _mas = svc.state.read();
6010 let state = _mas.default_ref();
6011 let rule = state
6012 .rules
6013 .get(&("default".to_string(), "r1".to_string()))
6014 .unwrap();
6015 assert_eq!(rule.targets.len(), 1);
6016 assert!(rule.targets[0].arn.ends_with("q2"));
6017 }
6018
6019 #[test]
6020 fn list_targets_by_rule_returns_pagination_token() {
6021 let svc = make_service();
6022 put_rule_simple(&svc, "r1");
6023 for i in 0..4 {
6024 let req = make_request(
6025 "PutTargets",
6026 json!({
6027 "Rule": "r1",
6028 "Targets": [{
6029 "Id": format!("t{i}"),
6030 "Arn": format!("arn:aws:sqs:us-east-1:123456789012:q{i}")
6031 }]
6032 }),
6033 );
6034 svc.put_targets(&req).unwrap();
6035 }
6036 let req = make_request("ListTargetsByRule", json!({ "Rule": "r1", "Limit": 2 }));
6037 let resp = svc.list_targets_by_rule(&req).unwrap();
6038 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6039 assert_eq!(body["Targets"].as_array().unwrap().len(), 2);
6040 assert!(body["NextToken"].is_string());
6041 }
6042
6043 #[test]
6044 fn list_rule_names_by_target_groups_by_arn() {
6045 let svc = make_service();
6046 put_rule_simple(&svc, "r1");
6047 put_rule_simple(&svc, "r2");
6048 for rule in ["r1", "r2"] {
6049 let req = make_request(
6050 "PutTargets",
6051 json!({
6052 "Rule": rule,
6053 "Targets": [{
6054 "Id": "t1",
6055 "Arn": "arn:aws:sqs:us-east-1:123456789012:shared"
6056 }]
6057 }),
6058 );
6059 svc.put_targets(&req).unwrap();
6060 }
6061 let req = make_request(
6062 "ListRuleNamesByTarget",
6063 json!({ "TargetArn": "arn:aws:sqs:us-east-1:123456789012:shared" }),
6064 );
6065 let resp = svc.list_rule_names_by_target(&req).unwrap();
6066 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6067 let names: Vec<&str> = body["RuleNames"]
6068 .as_array()
6069 .unwrap()
6070 .iter()
6071 .map(|v| v.as_str().unwrap())
6072 .collect();
6073 assert_eq!(names, vec!["r1", "r2"]);
6074 }
6075
6076 #[test]
6079 fn tag_then_list_tags_for_rule() {
6080 let svc = make_service();
6081 put_rule_simple(&svc, "r1");
6082 let arn = svc
6083 .state
6084 .read()
6085 .default_ref()
6086 .rules
6087 .get(&("default".to_string(), "r1".to_string()))
6088 .unwrap()
6089 .arn
6090 .clone();
6091
6092 let tag_req = make_request(
6093 "TagResource",
6094 json!({
6095 "ResourceARN": arn,
6096 "Tags": [{ "Key": "env", "Value": "prod" }]
6097 }),
6098 );
6099 svc.tag_resource(&tag_req).unwrap();
6100
6101 let list_req = make_request("ListTagsForResource", json!({ "ResourceARN": arn }));
6102 let resp = svc.list_tags_for_resource(&list_req).unwrap();
6103 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6104 let tags = body["Tags"].as_array().unwrap();
6105 assert_eq!(tags.len(), 1);
6106 assert_eq!(tags[0]["Key"], json!("env"));
6107 assert_eq!(tags[0]["Value"], json!("prod"));
6108 }
6109
6110 #[test]
6111 fn untag_resource_removes_listed_keys() {
6112 let svc = make_service();
6113 put_rule_simple(&svc, "r1");
6114 let arn = svc
6115 .state
6116 .read()
6117 .default_ref()
6118 .rules
6119 .get(&("default".to_string(), "r1".to_string()))
6120 .unwrap()
6121 .arn
6122 .clone();
6123 let tag_req = make_request(
6124 "TagResource",
6125 json!({
6126 "ResourceARN": &arn,
6127 "Tags": [{ "Key": "env", "Value": "prod" }, { "Key": "team", "Value": "core" }]
6128 }),
6129 );
6130 svc.tag_resource(&tag_req).unwrap();
6131
6132 let untag = make_request(
6133 "UntagResource",
6134 json!({ "ResourceARN": &arn, "TagKeys": ["env"] }),
6135 );
6136 svc.untag_resource(&untag).unwrap();
6137
6138 let _mas = svc.state.read();
6139 let state = _mas.default_ref();
6140 let rule = state
6141 .rules
6142 .get(&("default".to_string(), "r1".to_string()))
6143 .unwrap();
6144 assert!(!rule.tags.contains_key("env"));
6145 assert_eq!(rule.tags.get("team").map(String::as_str), Some("core"));
6146 }
6147
6148 #[test]
6151 fn test_event_pattern_returns_result_field() {
6152 let svc = make_service();
6153 let req = make_request(
6154 "TestEventPattern",
6155 json!({
6156 "EventPattern": r#"{"source":["my.app"]}"#,
6157 "Event": r#"{"source":"my.app","detail-type":"x","detail":{}}"#
6158 }),
6159 );
6160 let resp = svc.test_event_pattern(&req).unwrap();
6161 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6162 assert_eq!(body["Result"], json!(true));
6163 }
6164
6165 #[test]
6168 fn describe_event_bus_default_returns_arn() {
6169 let svc = make_service();
6170 let req = make_request("DescribeEventBus", json!({}));
6171 let resp = svc.describe_event_bus(&req).unwrap();
6172 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6173 assert_eq!(body["Name"], json!("default"));
6174 assert!(body["Arn"].as_str().unwrap().contains("event-bus/default"));
6175 }
6176
6177 #[test]
6178 fn delete_event_bus_default_fails() {
6179 let svc = make_service();
6180 let req = make_request("DeleteEventBus", json!({ "Name": "default" }));
6181 let err = svc.delete_event_bus(&req).err().expect("expected error");
6182 assert_eq!(err.code(), "ValidationException");
6183 }
6184
6185 #[test]
6188 fn describe_rule_not_found() {
6189 let svc = make_service();
6190 let req = make_request("DescribeRule", json!({"Name": "nonexistent"}));
6191 let err = svc.describe_rule(&req).err().expect("expected error");
6192 assert_eq!(err.code(), "ResourceNotFoundException");
6193 }
6194
6195 #[test]
6196 fn delete_rule_nonexistent_is_noop() {
6197 let svc = make_service();
6198 let req = make_request("DeleteRule", json!({"Name": "nope"}));
6199 svc.delete_rule(&req).unwrap();
6201 }
6202
6203 #[test]
6204 fn put_targets_rule_not_found() {
6205 let svc = make_service();
6206 let req = make_request(
6207 "PutTargets",
6208 json!({"Rule": "ghost", "Targets": [{"Id": "t1", "Arn": "arn:a"}]}),
6209 );
6210 let err = svc.put_targets(&req).err().expect("expected error");
6211 assert_eq!(err.code(), "ResourceNotFoundException");
6212 }
6213
6214 #[test]
6215 fn remove_targets_rule_not_found() {
6216 let svc = make_service();
6217 let req = make_request("RemoveTargets", json!({"Rule": "ghost", "Ids": ["t1"]}));
6218 let err = svc.remove_targets(&req).err().expect("expected error");
6219 assert_eq!(err.code(), "ResourceNotFoundException");
6220 }
6221
6222 #[test]
6223 fn list_targets_by_rule_not_found() {
6224 let svc = make_service();
6225 let req = make_request("ListTargetsByRule", json!({"Rule": "ghost"}));
6226 let err = svc
6227 .list_targets_by_rule(&req)
6228 .err()
6229 .expect("expected error");
6230 assert_eq!(err.code(), "ResourceNotFoundException");
6231 }
6232
6233 #[test]
6234 fn enable_rule_not_found() {
6235 let svc = make_service();
6236 let req = make_request("EnableRule", json!({"Name": "ghost"}));
6237 let err = svc.enable_rule(&req).err().expect("expected error");
6238 assert_eq!(err.code(), "ResourceNotFoundException");
6239 }
6240
6241 #[test]
6242 fn disable_rule_not_found() {
6243 let svc = make_service();
6244 let req = make_request("DisableRule", json!({"Name": "ghost"}));
6245 let err = svc.disable_rule(&req).err().expect("expected error");
6246 assert_eq!(err.code(), "ResourceNotFoundException");
6247 }
6248
6249 #[test]
6250 fn describe_event_bus_not_found() {
6251 let svc = make_service();
6252 let req = make_request("DescribeEventBus", json!({"Name": "nonexistent-bus"}));
6253 let err = svc.describe_event_bus(&req).err().expect("expected error");
6254 assert_eq!(err.code(), "ResourceNotFoundException");
6255 }
6256
6257 #[test]
6258 fn tag_resource_not_found() {
6259 let svc = make_service();
6260 let req = make_request(
6261 "TagResource",
6262 json!({"ResourceARN": "arn:aws:events:us-east-1:123:nope", "Tags": [{"Key": "k", "Value": "v"}]}),
6263 );
6264 let err = svc.tag_resource(&req).err().expect("expected error");
6265 assert_eq!(err.code(), "ResourceNotFoundException");
6266 }
6267
6268 #[test]
6269 fn untag_resource_not_found() {
6270 let svc = make_service();
6271 let req = make_request(
6272 "UntagResource",
6273 json!({"ResourceARN": "arn:aws:events:us-east-1:123:nope", "TagKeys": ["k"]}),
6274 );
6275 let err = svc.untag_resource(&req).err().expect("expected error");
6276 assert_eq!(err.code(), "ResourceNotFoundException");
6277 }
6278
6279 #[test]
6280 fn describe_archive_not_found() {
6281 let svc = make_service();
6282 let req = make_request("DescribeArchive", json!({"ArchiveName": "ghost"}));
6283 let err = svc.describe_archive(&req).err().expect("expected error");
6284 assert_eq!(err.code(), "ResourceNotFoundException");
6285 }
6286
6287 #[test]
6288 fn delete_archive_not_found() {
6289 let svc = make_service();
6290 let req = make_request("DeleteArchive", json!({"ArchiveName": "ghost"}));
6291 let err = svc.delete_archive(&req).err().expect("expected error");
6292 assert_eq!(err.code(), "ResourceNotFoundException");
6293 }
6294
6295 #[test]
6296 fn describe_connection_not_found() {
6297 let svc = make_service();
6298 let req = make_request("DescribeConnection", json!({"Name": "ghost"}));
6299 let err = svc.describe_connection(&req).err().expect("expected error");
6300 assert_eq!(err.code(), "ResourceNotFoundException");
6301 }
6302
6303 #[test]
6304 fn describe_api_destination_not_found() {
6305 let svc = make_service();
6306 let req = make_request("DescribeApiDestination", json!({"Name": "ghost"}));
6307 let err = svc
6308 .describe_api_destination(&req)
6309 .err()
6310 .expect("expected error");
6311 assert_eq!(err.code(), "ResourceNotFoundException");
6312 }
6313
6314 #[test]
6315 fn describe_replay_not_found() {
6316 let svc = make_service();
6317 let req = make_request("DescribeReplay", json!({"ReplayName": "ghost"}));
6318 let err = svc.describe_replay(&req).err().expect("expected error");
6319 assert_eq!(err.code(), "ResourceNotFoundException");
6320 }
6321
6322 #[test]
6323 fn create_event_bus_duplicate() {
6324 let svc = make_service();
6325 let req = make_request("CreateEventBus", json!({"Name": "dup-bus"}));
6326 svc.create_event_bus(&req).unwrap();
6327 let err = svc.create_event_bus(&req).err().expect("expected error");
6328 assert_eq!(err.code(), "ResourceAlreadyExistsException");
6329 }
6330
6331 #[test]
6334 fn rule_put_describe_enable_disable_delete() {
6335 let svc = make_service();
6336 svc.put_rule(&make_request(
6337 "PutRule",
6338 json!({"Name": "my-rule", "EventPattern": "{\"source\":[\"aws.s3\"]}", "State": "ENABLED"}),
6339 ))
6340 .unwrap();
6341
6342 let resp = svc
6343 .describe_rule(&make_request("DescribeRule", json!({"Name": "my-rule"})))
6344 .unwrap();
6345 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6346 assert_eq!(body["State"], "ENABLED");
6347
6348 svc.disable_rule(&make_request("DisableRule", json!({"Name": "my-rule"})))
6349 .unwrap();
6350 svc.enable_rule(&make_request("EnableRule", json!({"Name": "my-rule"})))
6351 .unwrap();
6352 svc.delete_rule(&make_request("DeleteRule", json!({"Name": "my-rule"})))
6353 .unwrap();
6354 }
6355
6356 #[test]
6357 fn list_rules_returns_created() {
6358 let svc = make_service();
6359 for name in &["r1", "r2", "r3"] {
6360 svc.put_rule(&make_request(
6361 "PutRule",
6362 json!({"Name": name, "EventPattern": "{\"source\":[\"aws.s3\"]}"}),
6363 ))
6364 .unwrap();
6365 }
6366 let resp = svc
6367 .list_rules(&make_request("ListRules", json!({})))
6368 .unwrap();
6369 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6370 assert_eq!(body["Rules"].as_array().unwrap().len(), 3);
6371 }
6372
6373 #[test]
6376 fn put_list_remove_targets() {
6377 let svc = make_service();
6378 svc.put_rule(&make_request(
6379 "PutRule",
6380 json!({"Name": "tr", "EventPattern": "{\"source\":[\"aws.s3\"]}"}),
6381 ))
6382 .unwrap();
6383
6384 svc.put_targets(&make_request(
6385 "PutTargets",
6386 json!({
6387 "Rule": "tr",
6388 "Targets": [
6389 {"Id": "t1", "Arn": "arn:aws:sqs:us-east-1:123456789012:q1"},
6390 {"Id": "t2", "Arn": "arn:aws:lambda:us-east-1:123456789012:function:fn1"},
6391 ]
6392 }),
6393 ))
6394 .unwrap();
6395
6396 let resp = svc
6397 .list_targets_by_rule(&make_request("ListTargetsByRule", json!({"Rule": "tr"})))
6398 .unwrap();
6399 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6400 assert_eq!(body["Targets"].as_array().unwrap().len(), 2);
6401
6402 svc.remove_targets(&make_request(
6403 "RemoveTargets",
6404 json!({"Rule": "tr", "Ids": ["t1"]}),
6405 ))
6406 .unwrap();
6407
6408 let resp = svc
6409 .list_targets_by_rule(&make_request("ListTargetsByRule", json!({"Rule": "tr"})))
6410 .unwrap();
6411 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6412 assert_eq!(body["Targets"].as_array().unwrap().len(), 1);
6413 }
6414
6415 #[test]
6418 fn put_events_basic() {
6419 let svc = make_service();
6420 let resp = svc
6421 .put_events(&make_request(
6422 "PutEvents",
6423 json!({
6424 "Entries": [
6425 {"Source": "aws.s3", "DetailType": "Object Created", "Detail": "{}"},
6426 ]
6427 }),
6428 ))
6429 .unwrap();
6430 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6431 assert_eq!(body["FailedEntryCount"], 0);
6432 }
6433
6434 #[test]
6437 fn archive_create_describe_list_delete() {
6438 let svc = make_service();
6439
6440 svc.create_archive(&make_request(
6441 "CreateArchive",
6442 json!({
6443 "ArchiveName": "my-archive",
6444 "EventSourceArn": "arn:aws:events:us-east-1:123456789012:event-bus/default",
6445 }),
6446 ))
6447 .unwrap();
6448
6449 let resp = svc
6450 .describe_archive(&make_request(
6451 "DescribeArchive",
6452 json!({"ArchiveName": "my-archive"}),
6453 ))
6454 .unwrap();
6455 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6456 assert_eq!(body["ArchiveName"], "my-archive");
6457
6458 let resp = svc
6459 .list_archives(&make_request("ListArchives", json!({})))
6460 .unwrap();
6461 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6462 assert!(!body["Archives"].as_array().unwrap().is_empty());
6463
6464 svc.delete_archive(&make_request(
6465 "DeleteArchive",
6466 json!({"ArchiveName": "my-archive"}),
6467 ))
6468 .unwrap();
6469 }
6470
6471 #[test]
6474 fn connection_create_list_describe_deauthorize() {
6475 let svc = make_service();
6476
6477 svc.create_connection(&make_request(
6478 "CreateConnection",
6479 json!({
6480 "Name": "my-conn",
6481 "AuthorizationType": "API_KEY",
6482 "AuthParameters": {
6483 "ApiKeyAuthParameters": {"ApiKeyName": "x-key", "ApiKeyValue": "secret"}
6484 }
6485 }),
6486 ))
6487 .unwrap();
6488
6489 let resp = svc
6490 .list_connections(&make_request("ListConnections", json!({})))
6491 .unwrap();
6492 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6493 assert!(!body["Connections"].as_array().unwrap().is_empty());
6494
6495 svc.describe_connection(&make_request(
6496 "DescribeConnection",
6497 json!({"Name": "my-conn"}),
6498 ))
6499 .unwrap();
6500 svc.deauthorize_connection(&make_request(
6501 "DeauthorizeConnection",
6502 json!({"Name": "my-conn"}),
6503 ))
6504 .unwrap();
6505 }
6506
6507 #[test]
6510 fn list_event_buses_returns_default_and_custom() {
6511 let svc = make_service();
6512 svc.create_event_bus(&make_request(
6513 "CreateEventBus",
6514 json!({"Name": "custom-bus"}),
6515 ))
6516 .unwrap();
6517
6518 let resp = svc
6519 .list_event_buses(&make_request("ListEventBuses", json!({})))
6520 .unwrap();
6521 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6522 let names: Vec<&str> = body["EventBuses"]
6523 .as_array()
6524 .unwrap()
6525 .iter()
6526 .map(|v| v["Name"].as_str().unwrap())
6527 .collect();
6528 assert!(names.contains(&"default"));
6529 assert!(names.contains(&"custom-bus"));
6530 }
6531
6532 #[test]
6535 fn tag_list_untag_rule_resource() {
6536 let svc = make_service();
6537 svc.put_rule(&make_request(
6538 "PutRule",
6539 json!({"Name": "tagged-rule", "EventPattern": "{\"source\":[\"aws.s3\"]}"}),
6540 ))
6541 .unwrap();
6542
6543 let arn = "arn:aws:events:us-east-1:123456789012:rule/tagged-rule";
6544
6545 svc.tag_resource(&make_request(
6546 "TagResource",
6547 json!({"ResourceARN": arn, "Tags": [{"Key": "env", "Value": "prod"}]}),
6548 ))
6549 .unwrap();
6550
6551 let resp = svc
6552 .list_tags_for_resource(&make_request(
6553 "ListTagsForResource",
6554 json!({"ResourceARN": arn}),
6555 ))
6556 .unwrap();
6557 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6558 assert_eq!(body["Tags"].as_array().unwrap().len(), 1);
6559
6560 svc.untag_resource(&make_request(
6561 "UntagResource",
6562 json!({"ResourceARN": arn, "TagKeys": ["env"]}),
6563 ))
6564 .unwrap();
6565 }
6566
6567 #[test]
6570 fn put_permission_with_policy_json() {
6571 let svc = make_service();
6572 let policy = r#"{"Version":"2012-10-17","Statement":[]}"#;
6573 let req = make_request("PutPermission", json!({"Policy": policy}));
6574 svc.put_permission(&req).unwrap();
6575 }
6576
6577 #[test]
6578 fn put_permission_invalid_action_errors() {
6579 let svc = make_service();
6580 let req = make_request(
6581 "PutPermission",
6582 json!({
6583 "Action": "events:NotARealAction",
6584 "Principal": "123456789012",
6585 "StatementId": "s1"
6586 }),
6587 );
6588 assert!(svc.put_permission(&req).is_err());
6589 }
6590
6591 #[test]
6592 fn put_permission_unknown_bus_errors() {
6593 let svc = make_service();
6594 let req = make_request(
6595 "PutPermission",
6596 json!({
6597 "EventBusName": "missing",
6598 "Action": "events:PutEvents",
6599 "Principal": "123456789012",
6600 "StatementId": "s1"
6601 }),
6602 );
6603 assert!(svc.put_permission(&req).is_err());
6604 }
6605
6606 #[test]
6607 fn put_permission_add_and_remove_statement() {
6608 let svc = make_service();
6609 let req = make_request(
6610 "PutPermission",
6611 json!({
6612 "Action": "events:PutEvents",
6613 "Principal": "123456789012",
6614 "StatementId": "s1"
6615 }),
6616 );
6617 svc.put_permission(&req).unwrap();
6618
6619 let req = make_request("RemovePermission", json!({"StatementId": "s1"}));
6620 svc.remove_permission(&req).unwrap();
6621 }
6622
6623 #[test]
6624 fn remove_permission_remove_all_flag() {
6625 let svc = make_service();
6626 let req = make_request(
6627 "PutPermission",
6628 json!({
6629 "Action": "events:PutEvents",
6630 "Principal": "123456789012",
6631 "StatementId": "s1"
6632 }),
6633 );
6634 svc.put_permission(&req).unwrap();
6635
6636 let req = make_request("RemovePermission", json!({"RemoveAllPermissions": true}));
6637 svc.remove_permission(&req).unwrap();
6638 }
6639
6640 #[test]
6641 fn remove_permission_unknown_bus_errors() {
6642 let svc = make_service();
6643 let req = make_request(
6644 "RemovePermission",
6645 json!({"EventBusName": "missing", "StatementId": "s1"}),
6646 );
6647 assert!(svc.remove_permission(&req).is_err());
6648 }
6649
6650 #[test]
6651 fn remove_permission_no_policy_errors() {
6652 let svc = make_service();
6653 let req = make_request("RemovePermission", json!({"StatementId": "s1"}));
6654 assert!(svc.remove_permission(&req).is_err());
6655 }
6656
6657 #[test]
6658 fn remove_permission_unknown_statement_errors() {
6659 let svc = make_service();
6660 svc.put_permission(&make_request(
6661 "PutPermission",
6662 json!({
6663 "Action": "events:PutEvents",
6664 "Principal": "123456789012",
6665 "StatementId": "s1"
6666 }),
6667 ))
6668 .unwrap();
6669
6670 let req = make_request("RemovePermission", json!({"StatementId": "ghost"}));
6671 assert!(svc.remove_permission(&req).is_err());
6672 }
6673
6674 #[test]
6677 fn put_rule_missing_name_errors() {
6678 let svc = make_service();
6679 let req = make_request("PutRule", json!({}));
6680 assert!(svc.put_rule(&req).is_err());
6681 }
6682
6683 #[test]
6684 fn put_rule_name_too_long_errors() {
6685 let svc = make_service();
6686 let name = "x".repeat(65);
6687 let req = make_request("PutRule", json!({"Name": name}));
6688 assert!(svc.put_rule(&req).is_err());
6689 }
6690
6691 #[test]
6692 fn put_rule_invalid_state_errors() {
6693 let svc = make_service();
6694 let req = make_request("PutRule", json!({"Name": "r1", "State": "BOGUS"}));
6695 assert!(svc.put_rule(&req).is_err());
6696 }
6697
6698 #[test]
6701 fn create_connection_api_key_auth() {
6702 let svc = make_service();
6703 let req = make_request(
6704 "CreateConnection",
6705 json!({
6706 "Name": "conn-apikey",
6707 "AuthorizationType": "API_KEY",
6708 "AuthParameters": {
6709 "ApiKeyAuthParameters": {
6710 "ApiKeyName": "X-Api-Key",
6711 "ApiKeyValue": "secret"
6712 }
6713 }
6714 }),
6715 );
6716 svc.create_connection(&req).unwrap();
6717 }
6718
6719 #[test]
6720 fn create_connection_basic_auth() {
6721 let svc = make_service();
6722 let req = make_request(
6723 "CreateConnection",
6724 json!({
6725 "Name": "conn-basic",
6726 "AuthorizationType": "BASIC",
6727 "AuthParameters": {
6728 "BasicAuthParameters": {
6729 "Username": "u",
6730 "Password": "p"
6731 }
6732 }
6733 }),
6734 );
6735 svc.create_connection(&req).unwrap();
6736 }
6737
6738 #[test]
6739 fn create_connection_missing_name_errors() {
6740 let svc = make_service();
6741 let req = make_request("CreateConnection", json!({"AuthorizationType": "API_KEY"}));
6742 assert!(svc.create_connection(&req).is_err());
6743 }
6744
6745 #[test]
6746 fn create_connection_missing_auth_type_errors() {
6747 let svc = make_service();
6748 let req = make_request("CreateConnection", json!({"Name": "c-noauth"}));
6749 assert!(svc.create_connection(&req).is_err());
6750 }
6751
6752 #[test]
6753 fn delete_connection_not_found() {
6754 let svc = make_service();
6755 let req = make_request("DeleteConnection", json!({"Name": "ghost"}));
6756 assert!(svc.delete_connection(&req).is_err());
6757 }
6758
6759 #[test]
6762 fn create_api_destination_missing_name_errors() {
6763 let svc = make_service();
6764 let req = make_request(
6765 "CreateApiDestination",
6766 json!({
6767 "ConnectionArn": "arn:aws:events:us-east-1:123456789012:connection/c",
6768 "InvocationEndpoint": "https://example.com",
6769 "HttpMethod": "POST"
6770 }),
6771 );
6772 assert!(svc.create_api_destination(&req).is_err());
6773 }
6774
6775 #[test]
6776 fn create_api_destination_invalid_method_errors() {
6777 let svc = make_service();
6778 create_connection(&svc, "conn-m");
6779 let guard = svc.state.read();
6780 let st = guard.default_ref();
6781 let conn_arn = st
6782 .connections
6783 .get("conn-m")
6784 .map(|c| c.arn.clone())
6785 .unwrap_or_default();
6786 drop(guard);
6787
6788 let req = make_request(
6789 "CreateApiDestination",
6790 json!({
6791 "Name": "d1",
6792 "ConnectionArn": conn_arn,
6793 "InvocationEndpoint": "https://example.com",
6794 "HttpMethod": "FLY"
6795 }),
6796 );
6797 assert!(svc.create_api_destination(&req).is_err());
6798 }
6799
6800 #[test]
6801 fn delete_api_destination_not_found() {
6802 let svc = make_service();
6803 let req = make_request("DeleteApiDestination", json!({"Name": "ghost"}));
6804 assert!(svc.delete_api_destination(&req).is_err());
6805 }
6806
6807 #[test]
6810 fn create_archive_missing_name_errors() {
6811 let svc = make_service();
6812 let req = make_request(
6813 "CreateArchive",
6814 json!({"EventSourceArn": "arn:aws:events:us-east-1:123456789012:event-bus/default"}),
6815 );
6816 assert!(svc.create_archive(&req).is_err());
6817 }
6818
6819 #[test]
6820 fn create_archive_missing_source_arn_errors() {
6821 let svc = make_service();
6822 let req = make_request("CreateArchive", json!({"ArchiveName": "arc1"}));
6823 assert!(svc.create_archive(&req).is_err());
6824 }
6825
6826 #[test]
6827 fn delete_archive_missing_errors() {
6828 let svc = make_service();
6829 let req = make_request("DeleteArchive", json!({"ArchiveName": "ghost"}));
6830 assert!(svc.delete_archive(&req).is_err());
6831 }
6832
6833 #[test]
6836 fn cancel_replay_not_found() {
6837 let svc = make_service();
6838 let req = make_request("CancelReplay", json!({"ReplayName": "ghost"}));
6839 assert!(svc.cancel_replay(&req).is_err());
6840 }
6841
6842 #[test]
6845 fn put_events_empty_entries_errors() {
6846 let svc = make_service();
6847 let req = make_request("PutEvents", json!({"Entries": []}));
6848 assert!(svc.put_events(&req).is_err());
6849 }
6850
6851 #[test]
6852 fn put_events_success_count() {
6853 let svc = make_service();
6854 let req = make_request(
6855 "PutEvents",
6856 json!({
6857 "Entries": [
6858 {"Source": "my.app", "DetailType": "Test", "Detail": "{}"}
6859 ]
6860 }),
6861 );
6862 let resp = svc.put_events(&req).unwrap();
6863 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6864 assert_eq!(body["FailedEntryCount"], 0);
6865 assert_eq!(body["Entries"].as_array().unwrap().len(), 1);
6866 }
6867
6868 #[test]
6871 fn list_tags_for_resource_unknown_errors() {
6872 let svc = make_service();
6873 let req = make_request(
6874 "ListTagsForResource",
6875 json!({
6876 "ResourceARN": "arn:aws:events:us-east-1:123456789012:rule/ghost"
6877 }),
6878 );
6879 assert!(svc.list_tags_for_resource(&req).is_err());
6880 }
6881
6882 #[test]
6885 fn describe_rule_custom_bus() {
6886 let svc = make_service();
6887 svc.create_event_bus(&make_request("CreateEventBus", json!({"Name": "cb"})))
6888 .unwrap();
6889
6890 svc.put_rule(&make_request(
6891 "PutRule",
6892 json!({
6893 "Name": "r-cb",
6894 "EventPattern": "{\"source\":[\"aws.s3\"]}",
6895 "EventBusName": "cb"
6896 }),
6897 ))
6898 .unwrap();
6899
6900 let resp = svc
6901 .describe_rule(&make_request(
6902 "DescribeRule",
6903 json!({"Name": "r-cb", "EventBusName": "cb"}),
6904 ))
6905 .unwrap();
6906 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6907 assert_eq!(body["Name"], "r-cb");
6908 }
6909
6910 #[test]
6913 fn disable_rule_on_custom_bus() {
6914 let svc = make_service();
6915 svc.create_event_bus(&make_request("CreateEventBus", json!({"Name": "dcb"})))
6916 .unwrap();
6917 svc.put_rule(&make_request(
6918 "PutRule",
6919 json!({
6920 "Name": "r-d",
6921 "EventPattern": "{\"source\":[\"s\"]}",
6922 "EventBusName": "dcb"
6923 }),
6924 ))
6925 .unwrap();
6926 svc.disable_rule(&make_request(
6927 "DisableRule",
6928 json!({"Name": "r-d", "EventBusName": "dcb"}),
6929 ))
6930 .unwrap();
6931 }
6932
6933 #[test]
6936 fn describe_event_bus_custom() {
6937 let svc = make_service();
6938 svc.create_event_bus(&make_request("CreateEventBus", json!({"Name": "deb"})))
6939 .unwrap();
6940 let resp = svc
6941 .describe_event_bus(&make_request("DescribeEventBus", json!({"Name": "deb"})))
6942 .unwrap();
6943 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6944 assert_eq!(body["Name"], "deb");
6945 }
6946
6947 #[test]
6948 fn list_event_buses_with_name_prefix() {
6949 let svc = make_service();
6950 for name in &["dev-x", "dev-y", "prod-z"] {
6951 svc.create_event_bus(&make_request("CreateEventBus", json!({"Name": name})))
6952 .unwrap();
6953 }
6954 let resp = svc
6955 .list_event_buses(&make_request(
6956 "ListEventBuses",
6957 json!({"NamePrefix": "dev-"}),
6958 ))
6959 .unwrap();
6960 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6961 assert_eq!(body["EventBuses"].as_array().unwrap().len(), 2);
6962 }
6963
6964 #[test]
6965 fn list_rules_on_custom_bus() {
6966 let svc = make_service();
6967 svc.create_event_bus(&make_request("CreateEventBus", json!({"Name": "lrcb"})))
6968 .unwrap();
6969 svc.put_rule(&make_request(
6970 "PutRule",
6971 json!({
6972 "Name": "r1",
6973 "EventPattern": "{\"source\":[\"s\"]}",
6974 "EventBusName": "lrcb"
6975 }),
6976 ))
6977 .unwrap();
6978
6979 let resp = svc
6980 .list_rules(&make_request("ListRules", json!({"EventBusName": "lrcb"})))
6981 .unwrap();
6982 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6983 assert_eq!(body["Rules"].as_array().unwrap().len(), 1);
6984 }
6985
6986 #[test]
6989 fn put_targets_on_custom_bus() {
6990 let svc = make_service();
6991 svc.create_event_bus(&make_request("CreateEventBus", json!({"Name": "ptcb"})))
6992 .unwrap();
6993 svc.put_rule(&make_request(
6994 "PutRule",
6995 json!({
6996 "Name": "rt",
6997 "EventPattern": "{\"source\":[\"s\"]}",
6998 "EventBusName": "ptcb"
6999 }),
7000 ))
7001 .unwrap();
7002
7003 svc.put_targets(&make_request(
7004 "PutTargets",
7005 json!({
7006 "Rule": "rt",
7007 "EventBusName": "ptcb",
7008 "Targets": [{"Id": "t1", "Arn": "arn:aws:sqs:us-east-1:123456789012:q1"}]
7009 }),
7010 ))
7011 .unwrap();
7012 }
7013
7014 #[test]
7017 fn remove_targets_unknown_ids_returns_failed() {
7018 let svc = make_service();
7019 svc.put_rule(&make_request(
7020 "PutRule",
7021 json!({"Name": "rmt", "EventPattern": "{\"source\":[\"s\"]}"}),
7022 ))
7023 .unwrap();
7024
7025 let resp = svc
7026 .remove_targets(&make_request(
7027 "RemoveTargets",
7028 json!({"Rule": "rmt", "Ids": ["ghost1", "ghost2"]}),
7029 ))
7030 .unwrap();
7031 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
7032 assert!(body.is_object());
7034 }
7035
7036 #[test]
7037 fn describe_event_source_unknown_errors() {
7038 let svc = make_service();
7039 let req = make_request("DescribeEventSource", json!({"Name": "ghost"}));
7040 assert!(svc.describe_event_source(&req).is_err());
7041 }
7042
7043 #[test]
7044 fn describe_partner_event_source_unknown_errors() {
7045 let svc = make_service();
7046 let req = make_request("DescribePartnerEventSource", json!({"Name": "ghost"}));
7047 assert!(svc.describe_partner_event_source(&req).is_err());
7048 }
7049
7050 #[test]
7051 fn list_partner_event_sources_empty_ok() {
7052 let svc = make_service();
7053 let req = make_request(
7054 "ListPartnerEventSources",
7055 json!({"NamePrefix": "aws.partner"}),
7056 );
7057 let resp = svc.list_partner_event_sources(&req).unwrap();
7058 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
7059 assert!(body["PartnerEventSources"].is_array());
7060 }
7061
7062 #[test]
7063 fn list_event_sources_empty_ok() {
7064 let svc = make_service();
7065 let req = make_request("ListEventSources", json!({}));
7066 let resp = svc.list_event_sources(&req).unwrap();
7067 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
7068 assert!(body["EventSources"].is_array());
7069 }
7070
7071 #[test]
7072 fn update_connection_unknown_errors() {
7073 let svc = make_service();
7074 let req = make_request(
7075 "UpdateConnection",
7076 json!({"Name": "ghost", "AuthorizationType": "API_KEY"}),
7077 );
7078 assert!(svc.update_connection(&req).is_err());
7079 }
7080
7081 #[test]
7082 fn describe_api_destination_unknown_errors() {
7083 let svc = make_service();
7084 let req = make_request("DescribeApiDestination", json!({"Name": "ghost"}));
7085 assert!(svc.describe_api_destination(&req).is_err());
7086 }
7087
7088 #[test]
7089 fn update_api_destination_unknown_errors() {
7090 let svc = make_service();
7091 let req = make_request("UpdateApiDestination", json!({"Name": "ghost"}));
7092 assert!(svc.update_api_destination(&req).is_err());
7093 }
7094
7095 #[test]
7096 fn update_archive_unknown_errors() {
7097 let svc = make_service();
7098 let req = make_request("UpdateArchive", json!({"ArchiveName": "ghost"}));
7099 assert!(svc.update_archive(&req).is_err());
7100 }
7101
7102 #[test]
7103 fn describe_archive_unknown_errors_b() {
7104 let svc = make_service();
7105 let req = make_request("DescribeArchive", json!({"ArchiveName": "ghost"}));
7106 assert!(svc.describe_archive(&req).is_err());
7107 }
7108
7109 #[test]
7110 fn list_archives_empty_ok() {
7111 let svc = make_service();
7112 let req = make_request("ListArchives", json!({}));
7113 let resp = svc.list_archives(&req).unwrap();
7114 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
7115 assert!(body["Archives"].is_array());
7116 }
7117
7118 #[test]
7119 fn list_replays_empty_ok() {
7120 let svc = make_service();
7121 let req = make_request("ListReplays", json!({}));
7122 let resp = svc.list_replays(&req).unwrap();
7123 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
7124 assert!(body["Replays"].is_array());
7125 }
7126
7127 #[test]
7128 fn describe_endpoint_unknown_errors() {
7129 let svc = make_service();
7130 let req = make_request("DescribeEndpoint", json!({"Name": "ghost"}));
7131 assert!(svc.describe_endpoint(&req).is_err());
7132 }
7133
7134 #[test]
7135 fn delete_endpoint_unknown_errors() {
7136 let svc = make_service();
7137 let req = make_request("DeleteEndpoint", json!({"Name": "ghost"}));
7138 assert!(svc.delete_endpoint(&req).is_err());
7139 }
7140
7141 #[test]
7142 fn list_endpoints_empty_ok() {
7143 let svc = make_service();
7144 let req = make_request("ListEndpoints", json!({}));
7145 let resp = svc.list_endpoints(&req).unwrap();
7146 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
7147 assert!(body["Endpoints"].is_array());
7148 }
7149}