1use async_trait::async_trait;
2use chrono::{DateTime, Utc};
3use http::StatusCode;
4use serde_json::{json, Value};
5
6use std::collections::HashMap;
7use std::sync::Arc;
8
9use tokio::sync::Mutex as AsyncMutex;
10
11use fakecloud_aws::arn::Arn;
12use fakecloud_core::delivery::DeliveryBus;
13use fakecloud_core::pagination::paginate;
14use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
15use fakecloud_core::validation::*;
16use fakecloud_persistence::SnapshotStore;
17
18use fakecloud_lambda::runtime::ContainerRuntime;
19use fakecloud_lambda::state::{LambdaInvocation, SharedLambdaState};
20use fakecloud_logs::state::SharedLogsState;
21
22use crate::state::{
23 ApiDestination, Archive, Connection, Endpoint, EventBridgeSnapshot, EventBridgeState, EventBus,
24 EventRule, EventTarget, PartnerEventSource, PutEvent, Replay, SharedEventBridgeState,
25 EVENTBRIDGE_SNAPSHOT_SCHEMA_VERSION,
26};
27
28fn validate_put_events_entry(source: &str, detail_type: &str, detail: &str) -> Result<(), Value> {
33 if source.is_empty() {
34 return Err(json!({
35 "ErrorCode": "InvalidArgument",
36 "ErrorMessage": "Parameter Source is not valid. Reason: Source is a required argument.",
37 }));
38 }
39 if detail_type.is_empty() {
40 return Err(json!({
41 "ErrorCode": "InvalidArgument",
42 "ErrorMessage": "Parameter DetailType is not valid. Reason: DetailType is a required argument.",
43 }));
44 }
45 if detail.is_empty() {
46 return Err(json!({
47 "ErrorCode": "InvalidArgument",
48 "ErrorMessage": "Parameter Detail is not valid. Reason: Detail is a required argument.",
49 }));
50 }
51 if serde_json::from_str::<Value>(detail).is_err() {
52 return Err(json!({
53 "ErrorCode": "MalformedDetail",
54 "ErrorMessage": "Detail is malformed.",
55 }));
56 }
57 Ok(())
58}
59
60fn parse_put_events_time(raw: &Value) -> DateTime<Utc> {
65 if let Some(s) = raw.as_str() {
66 return DateTime::parse_from_rfc3339(s)
67 .map(|dt| dt.with_timezone(&Utc))
68 .unwrap_or_else(|_| Utc::now());
69 }
70 if let Some(ts) = raw.as_f64() {
71 return DateTime::from_timestamp(ts as i64, ((ts.fract()) * 1_000_000_000.0) as u32)
72 .unwrap_or_else(Utc::now);
73 }
74 if let Some(ts) = raw.as_i64() {
75 return DateTime::from_timestamp(ts, 0).unwrap_or_else(Utc::now);
76 }
77 Utc::now()
78}
79
80fn is_mutating_action(action: &str) -> bool {
82 matches!(
83 action,
84 "CreateEventBus"
85 | "DeleteEventBus"
86 | "UpdateEventBus"
87 | "PutRule"
88 | "DeleteRule"
89 | "EnableRule"
90 | "DisableRule"
91 | "PutTargets"
92 | "RemoveTargets"
93 | "PutEvents"
94 | "PutPermission"
95 | "RemovePermission"
96 | "TagResource"
97 | "UntagResource"
98 | "CreateArchive"
99 | "UpdateArchive"
100 | "DeleteArchive"
101 | "CreateConnection"
102 | "UpdateConnection"
103 | "DeleteConnection"
104 | "DeauthorizeConnection"
105 | "CreateApiDestination"
106 | "UpdateApiDestination"
107 | "DeleteApiDestination"
108 | "StartReplay"
109 | "CancelReplay"
110 | "CreatePartnerEventSource"
111 | "DeletePartnerEventSource"
112 | "ActivateEventSource"
113 | "DeactivateEventSource"
114 | "PutPartnerEvents"
115 | "CreateEndpoint"
116 | "DeleteEndpoint"
117 | "UpdateEndpoint"
118 )
119}
120
121pub struct EventBridgeService {
122 state: SharedEventBridgeState,
123 delivery: Arc<DeliveryBus>,
124 lambda_state: Option<SharedLambdaState>,
125 logs_state: Option<SharedLogsState>,
126 container_runtime: Option<Arc<ContainerRuntime>>,
127 snapshot_store: Option<Arc<dyn SnapshotStore>>,
128 snapshot_lock: Arc<AsyncMutex<()>>,
129}
130
131impl EventBridgeService {
132 pub fn new(state: SharedEventBridgeState, delivery: Arc<DeliveryBus>) -> Self {
133 Self {
134 state,
135 delivery,
136 lambda_state: None,
137 logs_state: None,
138 container_runtime: None,
139 snapshot_store: None,
140 snapshot_lock: Arc::new(AsyncMutex::new(())),
141 }
142 }
143
144 pub fn with_lambda(mut self, lambda_state: SharedLambdaState) -> Self {
145 self.lambda_state = Some(lambda_state);
146 self
147 }
148
149 pub fn with_logs(mut self, logs_state: SharedLogsState) -> Self {
150 self.logs_state = Some(logs_state);
151 self
152 }
153
154 pub fn with_runtime(mut self, runtime: Arc<ContainerRuntime>) -> Self {
155 self.container_runtime = Some(runtime);
156 self
157 }
158
159 pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
160 self.snapshot_store = Some(store);
161 self
162 }
163
164 async fn save_snapshot(&self) {
168 let Some(store) = self.snapshot_store.clone() else {
169 return;
170 };
171 let _guard = self.snapshot_lock.lock().await;
172 let snapshot = EventBridgeSnapshot {
173 schema_version: EVENTBRIDGE_SNAPSHOT_SCHEMA_VERSION,
174 accounts: Some(self.state.read().clone()),
175 state: None,
176 };
177 let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
178 let bytes = serde_json::to_vec(&snapshot)
179 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
180 store.save(&bytes)
181 })
182 .await;
183 match join {
184 Ok(Ok(())) => {}
185 Ok(Err(err)) => tracing::error!(%err, "failed to write eventbridge snapshot"),
186 Err(err) => tracing::error!(%err, "eventbridge snapshot task panicked"),
187 }
188 }
189}
190
191#[async_trait]
192impl AwsService for EventBridgeService {
193 fn service_name(&self) -> &str {
194 "events"
195 }
196
197 async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
198 let mutates = is_mutating_action(req.action.as_str());
199 let result = match req.action.as_str() {
200 "CreateEventBus" => self.create_event_bus(&req),
201 "DeleteEventBus" => self.delete_event_bus(&req),
202 "ListEventBuses" => self.list_event_buses(&req),
203 "DescribeEventBus" => self.describe_event_bus(&req),
204 "PutRule" => self.put_rule(&req),
205 "DeleteRule" => self.delete_rule(&req),
206 "ListRules" => self.list_rules(&req),
207 "DescribeRule" => self.describe_rule(&req),
208 "EnableRule" => self.enable_rule(&req),
209 "DisableRule" => self.disable_rule(&req),
210 "PutTargets" => self.put_targets(&req),
211 "RemoveTargets" => self.remove_targets(&req),
212 "ListTargetsByRule" => self.list_targets_by_rule(&req),
213 "ListRuleNamesByTarget" => self.list_rule_names_by_target(&req),
214 "PutEvents" => self.put_events(&req),
215 "PutPermission" => self.put_permission(&req),
216 "RemovePermission" => self.remove_permission(&req),
217 "TagResource" => self.tag_resource(&req),
218 "UntagResource" => self.untag_resource(&req),
219 "ListTagsForResource" => self.list_tags_for_resource(&req),
220 "CreateArchive" => self.create_archive(&req),
221 "DescribeArchive" => self.describe_archive(&req),
222 "ListArchives" => self.list_archives(&req),
223 "UpdateArchive" => self.update_archive(&req),
224 "DeleteArchive" => self.delete_archive(&req),
225 "CreateConnection" => self.create_connection(&req),
226 "DescribeConnection" => self.describe_connection(&req),
227 "ListConnections" => self.list_connections(&req),
228 "UpdateConnection" => self.update_connection(&req),
229 "DeleteConnection" => self.delete_connection(&req),
230 "CreateApiDestination" => self.create_api_destination(&req),
231 "DescribeApiDestination" => self.describe_api_destination(&req),
232 "ListApiDestinations" => self.list_api_destinations(&req),
233 "UpdateApiDestination" => self.update_api_destination(&req),
234 "DeleteApiDestination" => self.delete_api_destination(&req),
235 "StartReplay" => self.start_replay(&req),
236 "DescribeReplay" => self.describe_replay(&req),
237 "ListReplays" => self.list_replays(&req),
238 "CancelReplay" => self.cancel_replay(&req),
239 "CreatePartnerEventSource" => self.create_partner_event_source(&req),
240 "DeletePartnerEventSource" => self.delete_partner_event_source(&req),
241 "DescribePartnerEventSource" => self.describe_partner_event_source(&req),
242 "ListPartnerEventSources" => self.list_partner_event_sources(&req),
243 "ListPartnerEventSourceAccounts" => self.list_partner_event_source_accounts(&req),
244 "ActivateEventSource" => self.activate_event_source(&req),
245 "DeactivateEventSource" => self.deactivate_event_source(&req),
246 "DescribeEventSource" => self.describe_event_source(&req),
247 "ListEventSources" => self.list_event_sources(&req),
248 "PutPartnerEvents" => self.put_partner_events(&req),
249 "TestEventPattern" => self.test_event_pattern(&req),
250 "UpdateEventBus" => self.update_event_bus(&req),
251 "CreateEndpoint" => self.create_endpoint(&req),
252 "DeleteEndpoint" => self.delete_endpoint(&req),
253 "DescribeEndpoint" => self.describe_endpoint(&req),
254 "ListEndpoints" => self.list_endpoints(&req),
255 "UpdateEndpoint" => self.update_endpoint(&req),
256 "DeauthorizeConnection" => self.deauthorize_connection(&req),
257 _ => Err(AwsServiceError::action_not_implemented(
258 "events",
259 &req.action,
260 )),
261 };
262 if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
263 self.save_snapshot().await;
264 }
265 result
266 }
267
268 fn supported_actions(&self) -> &[&str] {
269 &[
270 "CreateEventBus",
271 "DeleteEventBus",
272 "ListEventBuses",
273 "DescribeEventBus",
274 "PutRule",
275 "DeleteRule",
276 "ListRules",
277 "DescribeRule",
278 "EnableRule",
279 "DisableRule",
280 "PutTargets",
281 "RemoveTargets",
282 "ListTargetsByRule",
283 "ListRuleNamesByTarget",
284 "PutEvents",
285 "PutPermission",
286 "RemovePermission",
287 "TagResource",
288 "UntagResource",
289 "ListTagsForResource",
290 "CreateArchive",
291 "DescribeArchive",
292 "ListArchives",
293 "UpdateArchive",
294 "DeleteArchive",
295 "CreateConnection",
296 "DescribeConnection",
297 "ListConnections",
298 "UpdateConnection",
299 "DeleteConnection",
300 "CreateApiDestination",
301 "DescribeApiDestination",
302 "ListApiDestinations",
303 "UpdateApiDestination",
304 "DeleteApiDestination",
305 "StartReplay",
306 "DescribeReplay",
307 "ListReplays",
308 "CancelReplay",
309 "CreatePartnerEventSource",
310 "DeletePartnerEventSource",
311 "DescribePartnerEventSource",
312 "ListPartnerEventSources",
313 "ListPartnerEventSourceAccounts",
314 "ActivateEventSource",
315 "DeactivateEventSource",
316 "DescribeEventSource",
317 "ListEventSources",
318 "PutPartnerEvents",
319 "TestEventPattern",
320 "UpdateEventBus",
321 "CreateEndpoint",
322 "DeleteEndpoint",
323 "DescribeEndpoint",
324 "ListEndpoints",
325 "UpdateEndpoint",
326 "DeauthorizeConnection",
327 ]
328 }
329}
330
331fn parse_tags(body: &Value) -> HashMap<String, String> {
332 let mut tags = HashMap::new();
333 if let Some(arr) = body["Tags"].as_array() {
334 for tag in arr {
335 if let (Some(key), Some(val)) = (tag["Key"].as_str(), tag["Value"].as_str()) {
336 tags.insert(key.to_string(), val.to_string());
337 }
338 }
339 }
340 tags
341}
342
343fn parse_target(target: &Value) -> EventTarget {
344 EventTarget {
345 id: target["Id"].as_str().unwrap_or("").to_string(),
346 arn: target["Arn"].as_str().unwrap_or("").to_string(),
347 input: target["Input"].as_str().map(|s| s.to_string()),
348 input_path: target["InputPath"].as_str().map(|s| s.to_string()),
349 input_transformer: target.get("InputTransformer").cloned(),
350 sqs_parameters: target.get("SqsParameters").cloned(),
351 }
352}
353
354fn target_to_json(t: &EventTarget) -> Value {
355 let mut obj = json!({ "Id": t.id, "Arn": t.arn });
356 if let Some(ref input) = t.input {
357 obj["Input"] = json!(input);
358 }
359 if let Some(ref input_path) = t.input_path {
360 obj["InputPath"] = json!(input_path);
361 }
362 if let Some(ref it) = t.input_transformer {
363 obj["InputTransformer"] = it.clone();
364 }
365 if let Some(ref sp) = t.sqs_parameters {
366 obj["SqsParameters"] = sp.clone();
367 }
368 obj
369}
370
371impl EventBridgeService {
373 fn create_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
374 let body = req.json_body();
375 validate_required("Name", &body["Name"])?;
376 let name = body["Name"]
377 .as_str()
378 .ok_or_else(|| missing("Name"))?
379 .to_string();
380 validate_string_length("name", &name, 1, 256)?;
381 validate_optional_string_length(
382 "eventSourceName",
383 body["EventSourceName"].as_str(),
384 1,
385 256,
386 )?;
387 validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
388 validate_optional_string_length(
389 "kmsKeyIdentifier",
390 body["KmsKeyIdentifier"].as_str(),
391 0,
392 2048,
393 )?;
394
395 if name.contains('/') && !name.starts_with("aws.partner/") {
397 return Err(AwsServiceError::aws_error(
398 StatusCode::BAD_REQUEST,
399 "ValidationException",
400 "Event bus name must not contain '/'.",
401 ));
402 }
403
404 if name.starts_with("aws.partner/") {
406 let event_source = body["EventSourceName"].as_str().unwrap_or("");
407 let accounts_r = self.state.read();
408 let empty_r = EventBridgeState::new(&req.account_id, &req.region);
409 let state_r = accounts_r.get(&req.account_id).unwrap_or(&empty_r);
410 let has_source = state_r.partner_event_sources.contains_key(event_source);
411 drop(accounts_r);
412 if !has_source {
413 return Err(AwsServiceError::aws_error(
414 StatusCode::BAD_REQUEST,
415 "ResourceNotFoundException",
416 format!("Event source {event_source} does not exist."),
417 ));
418 }
419 }
420
421 let mut accounts = self.state.write();
422 let state = accounts.get_or_create(&req.account_id);
423
424 if state.buses.contains_key(&name) {
425 return Err(AwsServiceError::aws_error(
426 StatusCode::BAD_REQUEST,
427 "ResourceAlreadyExistsException",
428 format!("Event bus {name} already exists."),
429 ));
430 }
431
432 let arn = format!(
433 "arn:aws:events:{}:{}:event-bus/{}",
434 req.region, state.account_id, name
435 );
436 let now = Utc::now();
437 let description = body["Description"].as_str().map(|s| s.to_string());
438 let kms_key_identifier = body["KmsKeyIdentifier"].as_str().map(|s| s.to_string());
439 let dead_letter_config = body.get("DeadLetterConfig").cloned();
440
441 let tags = parse_tags(&body);
442
443 let bus = EventBus {
444 name: name.clone(),
445 arn: arn.clone(),
446 tags,
447 policy: None,
448 description,
449 kms_key_identifier,
450 dead_letter_config,
451 creation_time: now,
452 last_modified_time: now,
453 };
454 state.buses.insert(name, bus);
455
456 Ok(AwsResponse::ok_json(json!({ "EventBusArn": arn })))
457 }
458
459 fn delete_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
460 let body = req.json_body();
461 validate_required("Name", &body["Name"])?;
462 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
463 validate_string_length("name", name, 1, 256)?;
464
465 if name == "default" {
466 return Err(AwsServiceError::aws_error(
467 StatusCode::BAD_REQUEST,
468 "ValidationException",
469 format!("Cannot delete event bus {name}."),
470 ));
471 }
472
473 let mut accounts = self.state.write();
474 let state = accounts.get_or_create(&req.account_id);
475 state.buses.remove(name);
476 state.rules.retain(|k, _| k.0 != name);
477
478 Ok(AwsResponse::ok_json(json!({})))
479 }
480
481 fn list_event_buses(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
482 let body = req.json_body();
483 validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 256)?;
484 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
485 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
486 let name_prefix = body["NamePrefix"].as_str();
487 let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
488 if let Some(t) = body["NextToken"].as_str() {
489 t.parse::<usize>().map_err(|_| {
490 AwsServiceError::aws_error(
491 StatusCode::BAD_REQUEST,
492 "InvalidNextTokenException",
493 format!("Invalid NextToken value: '{t}'"),
494 )
495 })?;
496 }
497
498 let accounts = self.state.read();
499 let empty = EventBridgeState::new(&req.account_id, &req.region);
500 let state = accounts.get(&req.account_id).unwrap_or(&empty);
501 let filtered: Vec<&_> = state
502 .buses
503 .values()
504 .filter(|b| match name_prefix {
505 Some(prefix) => b.name.starts_with(prefix),
506 None => true,
507 })
508 .collect();
509
510 let (page, next_token) = paginate(&filtered, body["NextToken"].as_str(), limit);
511 let buses: Vec<Value> = page
512 .iter()
513 .map(|b| json!({ "Name": b.name, "Arn": b.arn }))
514 .collect();
515 let mut resp = json!({ "EventBuses": buses });
516 if let Some(token) = next_token {
517 resp["NextToken"] = json!(token);
518 }
519
520 Ok(AwsResponse::ok_json(resp))
521 }
522
523 fn describe_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
524 let body = req.json_body();
525 validate_optional_string_length("name", body["Name"].as_str(), 1, 1600)?;
526 let name = body["Name"].as_str().unwrap_or("default");
527
528 let accounts = self.state.read();
529 let empty = EventBridgeState::new(&req.account_id, &req.region);
530 let state = accounts.get(&req.account_id).unwrap_or(&empty);
531 let bus = state.buses.get(name).ok_or_else(|| {
532 AwsServiceError::aws_error(
533 StatusCode::BAD_REQUEST,
534 "ResourceNotFoundException",
535 format!("Event bus {name} does not exist."),
536 )
537 })?;
538
539 let mut resp = json!({
540 "Name": bus.name,
541 "Arn": bus.arn,
542 "CreationTime": bus.creation_time.timestamp() as f64,
543 "LastModifiedTime": bus.last_modified_time.timestamp() as f64,
544 });
545
546 if let Some(ref policy) = bus.policy {
547 resp["Policy"] = Value::String(serde_json::to_string(policy).unwrap());
548 }
549 if let Some(ref desc) = bus.description {
550 resp["Description"] = json!(desc);
551 }
552 if let Some(ref kms) = bus.kms_key_identifier {
553 resp["KmsKeyIdentifier"] = json!(kms);
554 }
555 if let Some(ref dlc) = bus.dead_letter_config {
556 resp["DeadLetterConfig"] = dlc.clone();
557 }
558
559 Ok(AwsResponse::ok_json(resp))
560 }
561
562 fn put_permission(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
565 let body = req.json_body();
566 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 256)?;
567 validate_optional_string_length("action", body["Action"].as_str(), 1, 64)?;
568 validate_optional_string_length("principal", body["Principal"].as_str(), 1, 12)?;
569 validate_optional_string_length("statementId", body["StatementId"].as_str(), 1, 64)?;
570 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
571
572 let mut accounts = self.state.write();
573 let state = accounts.get_or_create(&req.account_id);
574
575 let bus = state.buses.get_mut(event_bus_name).ok_or_else(|| {
576 AwsServiceError::aws_error(
577 StatusCode::BAD_REQUEST,
578 "ResourceNotFoundException",
579 format!("Event bus {event_bus_name} does not exist."),
580 )
581 })?;
582
583 if let Some(policy_str) = body["Policy"].as_str() {
585 if let Ok(policy) = serde_json::from_str::<Value>(policy_str) {
586 bus.policy = Some(policy);
587 return Ok(AwsResponse::ok_json(json!({})));
588 }
589 }
590
591 let action = body["Action"].as_str().unwrap_or("");
593 let principal = body["Principal"].as_str().unwrap_or("");
594 let statement_id = body["StatementId"].as_str().unwrap_or("");
595
596 if action != "events:PutEvents" {
598 return Err(AwsServiceError::aws_error(
599 StatusCode::BAD_REQUEST,
600 "ValidationException",
601 "Provided value in parameter 'action' is not supported.",
602 ));
603 }
604
605 let statement = json!({
606 "Sid": statement_id,
607 "Effect": "Allow",
608 "Principal": { "AWS": Arn::global("iam", principal, "root").to_string() },
609 "Action": action,
610 "Resource": bus.arn,
611 });
612
613 let policy = bus.policy.get_or_insert_with(|| {
614 json!({
615 "Version": "2012-10-17",
616 "Statement": [],
617 })
618 });
619
620 if let Some(stmts) = policy["Statement"].as_array_mut() {
621 stmts.push(statement);
622 }
623
624 Ok(AwsResponse::ok_json(json!({})))
625 }
626
627 fn remove_permission(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
628 let body = req.json_body();
629 validate_optional_string_length("statementId", body["StatementId"].as_str(), 1, 64)?;
630 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 256)?;
631 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
632 let statement_id = body["StatementId"].as_str().unwrap_or("");
633 let remove_all = body["RemoveAllPermissions"].as_bool().unwrap_or(false);
634
635 let mut accounts = self.state.write();
636 let state = accounts.get_or_create(&req.account_id);
637
638 let bus = state.buses.get_mut(event_bus_name).ok_or_else(|| {
639 AwsServiceError::aws_error(
640 StatusCode::BAD_REQUEST,
641 "ResourceNotFoundException",
642 format!("Event bus {event_bus_name} does not exist."),
643 )
644 })?;
645
646 if remove_all {
647 bus.policy = None;
648 return Ok(AwsResponse::ok_json(json!({})));
649 }
650
651 let policy = bus.policy.as_mut().ok_or_else(|| {
652 AwsServiceError::aws_error(
653 StatusCode::BAD_REQUEST,
654 "ResourceNotFoundException",
655 "EventBus does not have a policy.",
656 )
657 })?;
658
659 if let Some(stmts) = policy["Statement"].as_array_mut() {
660 let before = stmts.len();
661 stmts.retain(|s| s["Sid"].as_str() != Some(statement_id));
662 if stmts.len() == before {
663 return Err(AwsServiceError::aws_error(
664 StatusCode::BAD_REQUEST,
665 "ResourceNotFoundException",
666 "Statement with the provided id does not exist.",
667 ));
668 }
669 if stmts.is_empty() {
670 bus.policy = None;
671 }
672 }
673
674 Ok(AwsResponse::ok_json(json!({})))
675 }
676
677 fn put_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
680 let body = req.json_body();
681 validate_required("Name", &body["Name"])?;
682 let name = body["Name"]
683 .as_str()
684 .ok_or_else(|| missing("Name"))?
685 .to_string();
686 validate_string_length("name", &name, 1, 64)?;
687 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
688 validate_optional_string_length(
689 "scheduleExpression",
690 body["ScheduleExpression"].as_str(),
691 0,
692 256,
693 )?;
694 validate_optional_string_length("eventPattern", body["EventPattern"].as_str(), 0, 4096)?;
695 validate_optional_enum(
696 "state",
697 body["State"].as_str(),
698 &[
699 "ENABLED",
700 "DISABLED",
701 "ENABLED_WITH_ALL_CLOUDTRAIL_MANAGEMENT_EVENTS",
702 ],
703 )?;
704 validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
705 validate_optional_string_length("roleArn", body["RoleArn"].as_str(), 1, 1600)?;
706
707 let raw_bus = body["EventBusName"]
708 .as_str()
709 .unwrap_or("default")
710 .to_string();
711
712 let mut accounts = self.state.write();
713 let state = accounts.get_or_create(&req.account_id);
714 let event_bus_name = state.resolve_bus_name(&raw_bus);
715
716 let event_pattern = body["EventPattern"].as_str().and_then(|s| {
717 if s.is_empty() {
718 None
719 } else {
720 Some(s.to_string())
721 }
722 });
723 let schedule_expression = body["ScheduleExpression"].as_str().and_then(|s| {
724 if s.is_empty() {
725 None
726 } else {
727 Some(s.to_string())
728 }
729 });
730 let description = body["Description"].as_str().map(|s| s.to_string());
731 let role_arn = body["RoleArn"].as_str().map(|s| s.to_string());
732 let rule_state = body["State"].as_str().unwrap_or("ENABLED").to_string();
733
734 if schedule_expression.is_some() && event_bus_name != "default" {
736 return Err(AwsServiceError::aws_error(
737 StatusCode::BAD_REQUEST,
738 "ValidationException",
739 "ScheduleExpression is supported only on the default event bus.",
740 ));
741 }
742
743 if !state.buses.contains_key(&event_bus_name) {
744 return Err(AwsServiceError::aws_error(
745 StatusCode::BAD_REQUEST,
746 "ResourceNotFoundException",
747 format!("Event bus {event_bus_name} does not exist."),
748 ));
749 }
750
751 let arn = if event_bus_name == "default" {
752 format!(
753 "arn:aws:events:{}:{}:rule/{}",
754 req.region, state.account_id, name
755 )
756 } else {
757 format!(
758 "arn:aws:events:{}:{}:rule/{}/{}",
759 req.region, state.account_id, event_bus_name, name
760 )
761 };
762
763 let key = (event_bus_name.clone(), name.clone());
764 let targets = state
765 .rules
766 .get(&key)
767 .map(|r| r.targets.clone())
768 .unwrap_or_default();
769
770 let tags = parse_tags(&body);
771
772 let rule = EventRule {
773 name: name.clone(),
774 arn: arn.clone(),
775 event_bus_name,
776 event_pattern,
777 schedule_expression,
778 state: rule_state,
779 description,
780 role_arn,
781 managed_by: None,
782 created_by: None,
783 targets,
784 tags,
785 last_fired: None,
786 };
787
788 state.rules.insert(key, rule);
789 Ok(AwsResponse::ok_json(json!({ "RuleArn": arn })))
790 }
791
792 fn delete_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
793 let body = req.json_body();
794 validate_required("Name", &body["Name"])?;
795 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
796 validate_string_length("name", name, 1, 64)?;
797 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
798 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
799
800 let mut accounts = self.state.write();
801 let state = accounts.get_or_create(&req.account_id);
802 let bus_name = state.resolve_bus_name(event_bus_name);
803 let key = (bus_name, name.to_string());
804
805 if let Some(rule) = state.rules.get(&key) {
807 if !rule.targets.is_empty() {
808 return Err(AwsServiceError::aws_error(
809 StatusCode::BAD_REQUEST,
810 "ValidationException",
811 "Rule can't be deleted since it has targets.",
812 ));
813 }
814 }
815
816 state.rules.remove(&key);
817 Ok(AwsResponse::ok_json(json!({})))
818 }
819
820 fn list_rules(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
821 let body = req.json_body();
822 validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 64)?;
823 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
824 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
825 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
826 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
827 let name_prefix = body["NamePrefix"].as_str();
828 let limit = body["Limit"].as_u64().map(|n| n as usize);
829 let next_token = body["NextToken"].as_str();
830
831 let accounts = self.state.read();
832 let empty = EventBridgeState::new(&req.account_id, &req.region);
833 let state = accounts.get(&req.account_id).unwrap_or(&empty);
834 let bus_name = state.resolve_bus_name(event_bus_name);
835
836 let mut rules: Vec<&EventRule> = state
837 .rules
838 .values()
839 .filter(|r| r.event_bus_name == bus_name)
840 .filter(|r| match name_prefix {
841 Some(prefix) => r.name.starts_with(prefix),
842 None => true,
843 })
844 .collect();
845 rules.sort_by(|a, b| a.name.cmp(&b.name));
846
847 let start = next_token
849 .and_then(|t| t.parse::<usize>().ok())
850 .unwrap_or(0)
851 .min(rules.len());
852 let rules_slice = &rules[start..];
853
854 let (page, new_next_token) = if let Some(lim) = limit {
855 if rules_slice.len() > lim {
856 (&rules_slice[..lim], Some((start + lim).to_string()))
857 } else {
858 (rules_slice, None)
859 }
860 } else {
861 (rules_slice, None)
862 };
863
864 let rules_json: Vec<Value> = page
865 .iter()
866 .map(|r| {
867 let mut obj = json!({
868 "Name": r.name,
869 "Arn": r.arn,
870 "EventBusName": r.event_bus_name,
871 "State": r.state,
872 });
873 if let Some(ref desc) = r.description {
874 obj["Description"] = json!(desc);
875 }
876 if let Some(ref ep) = r.event_pattern {
877 obj["EventPattern"] = json!(ep);
878 }
879 if let Some(ref se) = r.schedule_expression {
880 obj["ScheduleExpression"] = json!(se);
881 }
882 if let Some(ref mb) = r.managed_by {
883 obj["ManagedBy"] = json!(mb);
884 }
885 obj
886 })
887 .collect();
888
889 let mut resp = json!({ "Rules": rules_json });
890 if let Some(token) = new_next_token {
891 resp["NextToken"] = json!(token);
892 }
893
894 Ok(AwsResponse::ok_json(resp))
895 }
896
897 fn describe_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
898 let body = req.json_body();
899 validate_required("Name", &body["Name"])?;
900 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
901 validate_string_length("name", name, 1, 64)?;
902 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
903 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
904
905 let accounts = self.state.read();
906 let empty = EventBridgeState::new(&req.account_id, &req.region);
907 let state = accounts.get(&req.account_id).unwrap_or(&empty);
908 let bus_name = state.resolve_bus_name(event_bus_name);
909 let key = (bus_name.clone(), name.to_string());
910
911 let rule = state.rules.get(&key).ok_or_else(|| {
912 AwsServiceError::aws_error(
913 StatusCode::BAD_REQUEST,
914 "ResourceNotFoundException",
915 format!("Rule {name} does not exist."),
916 )
917 })?;
918
919 let mut resp = json!({
920 "Name": rule.name,
921 "Arn": rule.arn,
922 "EventBusName": rule.event_bus_name,
923 "State": rule.state,
924 });
925
926 if let Some(ref desc) = rule.description {
927 resp["Description"] = json!(desc);
928 }
929 if let Some(ref ep) = rule.event_pattern {
930 resp["EventPattern"] = json!(ep);
931 }
932 if let Some(ref se) = rule.schedule_expression {
933 resp["ScheduleExpression"] = json!(se);
934 }
935 if let Some(ref role) = rule.role_arn {
936 resp["RoleArn"] = json!(role);
937 }
938 if let Some(ref mb) = rule.managed_by {
939 resp["ManagedBy"] = json!(mb);
940 }
941 if let Some(ref cb) = rule.created_by {
942 resp["CreatedBy"] = json!(cb);
943 }
944 if rule.event_bus_name != "default" && rule.created_by.is_none() {
946 resp["CreatedBy"] = json!(state.account_id);
947 }
948
949 Ok(AwsResponse::ok_json(resp))
950 }
951
952 fn enable_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
953 let body = req.json_body();
954 validate_required("Name", &body["Name"])?;
955 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
956 validate_string_length("name", name, 1, 64)?;
957 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
958 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
959
960 let mut accounts = self.state.write();
961 let state = accounts.get_or_create(&req.account_id);
962 let bus_name = state.resolve_bus_name(event_bus_name);
963 let key = (bus_name, name.to_string());
964
965 let rule = state.rules.get_mut(&key).ok_or_else(|| {
966 AwsServiceError::aws_error(
967 StatusCode::BAD_REQUEST,
968 "ResourceNotFoundException",
969 format!("Rule {name} does not exist."),
970 )
971 })?;
972
973 rule.state = "ENABLED".to_string();
974 Ok(AwsResponse::ok_json(json!({})))
975 }
976
977 fn disable_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
978 let body = req.json_body();
979 validate_required("Name", &body["Name"])?;
980 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
981 validate_string_length("name", name, 1, 64)?;
982 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
983 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
984
985 let mut accounts = self.state.write();
986 let state = accounts.get_or_create(&req.account_id);
987 let bus_name = state.resolve_bus_name(event_bus_name);
988 let key = (bus_name, name.to_string());
989
990 let rule = state.rules.get_mut(&key).ok_or_else(|| {
991 AwsServiceError::aws_error(
992 StatusCode::BAD_REQUEST,
993 "ResourceNotFoundException",
994 format!("Rule {name} does not exist."),
995 )
996 })?;
997
998 rule.state = "DISABLED".to_string();
999 Ok(AwsResponse::ok_json(json!({})))
1000 }
1001
1002 fn put_targets(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1005 let body = req.json_body();
1006 validate_required("Rule", &body["Rule"])?;
1007 let rule_name = body["Rule"].as_str().ok_or_else(|| missing("Rule"))?;
1008 validate_string_length("rule", rule_name, 1, 64)?;
1009 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
1010 validate_required("Targets", &body["Targets"])?;
1011 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
1012 let targets = body["Targets"]
1013 .as_array()
1014 .ok_or_else(|| missing("Targets"))?;
1015
1016 for target in targets {
1018 let target_id = target["Id"].as_str().unwrap_or("");
1019 let target_arn = target["Arn"].as_str().unwrap_or("");
1020
1021 if target_arn.ends_with(".fifo") && target.get("SqsParameters").is_none() {
1022 return Err(AwsServiceError::aws_error(
1023 StatusCode::BAD_REQUEST,
1024 "ValidationException",
1025 format!(
1026 "Parameter(s) SqsParameters must be specified for target: {target_id}."
1027 ),
1028 ));
1029 }
1030
1031 if !target_arn.starts_with("arn:") {
1033 return Err(AwsServiceError::aws_error(
1034 StatusCode::BAD_REQUEST,
1035 "ValidationException",
1036 format!(
1037 "Parameter {target_arn} is not valid. Reason: Provided Arn is not in correct format."
1038 ),
1039 ));
1040 }
1041 }
1042
1043 let mut accounts = self.state.write();
1044 let state = accounts.get_or_create(&req.account_id);
1045 let bus_name = state.resolve_bus_name(event_bus_name);
1046 let key = (bus_name.clone(), rule_name.to_string());
1047
1048 let rule = state.rules.get_mut(&key).ok_or_else(|| {
1049 AwsServiceError::aws_error(
1050 StatusCode::BAD_REQUEST,
1051 "ResourceNotFoundException",
1052 format!("Rule {rule_name} does not exist on EventBus {bus_name}."),
1053 )
1054 })?;
1055
1056 for target in targets {
1057 let et = parse_target(target);
1058 rule.targets.retain(|t| t.id != et.id);
1060 rule.targets.push(et);
1061 }
1062
1063 Ok(AwsResponse::ok_json(json!({
1064 "FailedEntryCount": 0,
1065 "FailedEntries": [],
1066 })))
1067 }
1068
1069 fn remove_targets(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1070 let body = req.json_body();
1071 validate_required("Rule", &body["Rule"])?;
1072 let rule_name = body["Rule"].as_str().ok_or_else(|| missing("Rule"))?;
1073 validate_string_length("rule", rule_name, 1, 64)?;
1074 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
1075 validate_required("Ids", &body["Ids"])?;
1076 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
1077 let ids = body["Ids"].as_array().ok_or_else(|| missing("Ids"))?;
1078
1079 let target_ids: Vec<String> = ids
1080 .iter()
1081 .filter_map(|v| v.as_str().map(|s| s.to_string()))
1082 .collect();
1083
1084 let mut accounts = self.state.write();
1085 let state = accounts.get_or_create(&req.account_id);
1086 let bus_name = state.resolve_bus_name(event_bus_name);
1087 let key = (bus_name.clone(), rule_name.to_string());
1088
1089 let rule = state.rules.get_mut(&key).ok_or_else(|| {
1090 AwsServiceError::aws_error(
1091 StatusCode::BAD_REQUEST,
1092 "ResourceNotFoundException",
1093 format!("Rule {rule_name} does not exist on EventBus {bus_name}."),
1094 )
1095 })?;
1096
1097 rule.targets.retain(|t| !target_ids.contains(&t.id));
1098
1099 Ok(AwsResponse::ok_json(json!({
1100 "FailedEntryCount": 0,
1101 "FailedEntries": [],
1102 })))
1103 }
1104
1105 fn list_targets_by_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1106 let body = req.json_body();
1107 validate_required("Rule", &body["Rule"])?;
1108 let rule_name = body["Rule"].as_str().ok_or_else(|| missing("Rule"))?;
1109 validate_string_length("rule", rule_name, 1, 64)?;
1110 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
1111 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1112 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1113 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
1114 let limit = body["Limit"].as_u64().map(|n| n as usize);
1115 let next_token = body["NextToken"].as_str();
1116
1117 let accounts = self.state.read();
1118 let empty = EventBridgeState::new(&req.account_id, &req.region);
1119 let state = accounts.get(&req.account_id).unwrap_or(&empty);
1120 let bus_name = state.resolve_bus_name(event_bus_name);
1121 let key = (bus_name, rule_name.to_string());
1122
1123 let rule = state.rules.get(&key).ok_or_else(|| {
1124 AwsServiceError::aws_error(
1125 StatusCode::BAD_REQUEST,
1126 "ResourceNotFoundException",
1127 format!("Rule {rule_name} does not exist."),
1128 )
1129 })?;
1130
1131 let all_targets = &rule.targets;
1132 let start = next_token
1133 .and_then(|t| t.parse::<usize>().ok())
1134 .unwrap_or(0)
1135 .min(all_targets.len());
1136 let slice = &all_targets[start..];
1137
1138 let (page, new_next_token) = if let Some(lim) = limit {
1139 if slice.len() > lim {
1140 (&slice[..lim], Some((start + lim).to_string()))
1141 } else {
1142 (slice, None)
1143 }
1144 } else {
1145 (slice, None)
1146 };
1147
1148 let targets: Vec<Value> = page.iter().map(target_to_json).collect();
1149
1150 let mut resp = json!({ "Targets": targets });
1151 if let Some(token) = new_next_token {
1152 resp["NextToken"] = json!(token);
1153 }
1154
1155 Ok(AwsResponse::ok_json(resp))
1156 }
1157
1158 fn list_rule_names_by_target(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1159 let body = req.json_body();
1160 validate_required("TargetArn", &body["TargetArn"])?;
1161 let target_arn = body["TargetArn"]
1162 .as_str()
1163 .ok_or_else(|| missing("TargetArn"))?;
1164 validate_string_length("targetArn", target_arn, 1, 1600)?;
1165 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
1166 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1167 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1168 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
1169 let limit = body["Limit"].as_u64().map(|n| n as usize);
1170 let next_token = body["NextToken"].as_str();
1171
1172 let accounts = self.state.read();
1173 let empty = EventBridgeState::new(&req.account_id, &req.region);
1174 let state = accounts.get(&req.account_id).unwrap_or(&empty);
1175 let bus_name = state.resolve_bus_name(event_bus_name);
1176
1177 let mut rule_names: Vec<String> = Vec::new();
1179 for rule in state.rules.values() {
1180 if rule.event_bus_name == bus_name
1181 && rule.targets.iter().any(|t| t.arn == target_arn)
1182 && !rule_names.contains(&rule.name)
1183 {
1184 rule_names.push(rule.name.clone());
1185 }
1186 }
1187 rule_names.sort();
1188
1189 let start = next_token
1190 .and_then(|t| t.parse::<usize>().ok())
1191 .unwrap_or(0)
1192 .min(rule_names.len());
1193 let slice = &rule_names[start..];
1194
1195 let (page, new_next_token) = if let Some(lim) = limit {
1196 if slice.len() > lim {
1197 (&slice[..lim], Some((start + lim).to_string()))
1198 } else {
1199 (slice, None)
1200 }
1201 } else {
1202 (slice, None)
1203 };
1204
1205 let mut resp = json!({ "RuleNames": page });
1206 if let Some(token) = new_next_token {
1207 resp["NextToken"] = json!(token);
1208 }
1209
1210 Ok(AwsResponse::ok_json(resp))
1211 }
1212
1213 fn create_partner_event_source(
1216 &self,
1217 req: &AwsRequest,
1218 ) -> Result<AwsResponse, AwsServiceError> {
1219 let body = req.json_body();
1220 validate_required("Name", &body["Name"])?;
1221 let name = body["Name"]
1222 .as_str()
1223 .ok_or_else(|| missing("Name"))?
1224 .to_string();
1225 validate_string_length("name", &name, 1, 256)?;
1226 validate_required("Account", &body["Account"])?;
1227 let account = body["Account"]
1228 .as_str()
1229 .ok_or_else(|| missing("Account"))?
1230 .to_string();
1231 validate_string_length("account", &account, 12, 12)?;
1232
1233 let mut accounts = self.state.write();
1234 let state = accounts.get_or_create(&req.account_id);
1235 if state.partner_event_sources.contains_key(&name) {
1236 return Err(AwsServiceError::aws_error(
1237 StatusCode::CONFLICT,
1238 "ResourceAlreadyExistsException",
1239 format!("Partner event source {name} already exists."),
1240 ));
1241 }
1242 let arn = format!(
1243 "arn:aws:events:{}::event-source/aws.partner/{}",
1244 state.region, name
1245 );
1246 let now = Utc::now();
1247 let ps = PartnerEventSource {
1248 name: name.clone(),
1249 arn: arn.clone(),
1250 account,
1251 creation_time: now,
1252 expiration_time: None,
1253 state: "ACTIVE".to_string(),
1254 };
1255 state.partner_event_sources.insert(name.clone(), ps);
1256
1257 Ok(AwsResponse::ok_json(json!({ "EventSourceArn": arn })))
1258 }
1259
1260 fn delete_partner_event_source(
1261 &self,
1262 req: &AwsRequest,
1263 ) -> Result<AwsResponse, AwsServiceError> {
1264 let body = req.json_body();
1265 validate_required("Name", &body["Name"])?;
1266 let name = body["Name"]
1267 .as_str()
1268 .ok_or_else(|| missing("Name"))?
1269 .to_string();
1270 validate_required("Account", &body["Account"])?;
1271 let account = body["Account"]
1272 .as_str()
1273 .ok_or_else(|| missing("Account"))?
1274 .to_string();
1275
1276 let mut accounts = self.state.write();
1277 let state = accounts.get_or_create(&req.account_id);
1278 match state.partner_event_sources.get(&name) {
1279 Some(ps) if ps.account == account => {
1280 state.partner_event_sources.remove(&name);
1281 }
1282 Some(_) => {
1283 return Err(AwsServiceError::aws_error(
1284 StatusCode::NOT_FOUND,
1285 "ResourceNotFoundException",
1286 format!("Partner event source {name} does not exist for account {account}."),
1287 ));
1288 }
1289 None => {
1290 return Err(AwsServiceError::aws_error(
1291 StatusCode::NOT_FOUND,
1292 "ResourceNotFoundException",
1293 format!("Partner event source {name} does not exist."),
1294 ));
1295 }
1296 }
1297
1298 Ok(AwsResponse::ok_json(json!({})))
1299 }
1300
1301 fn describe_partner_event_source(
1302 &self,
1303 req: &AwsRequest,
1304 ) -> Result<AwsResponse, AwsServiceError> {
1305 let body = req.json_body();
1306 validate_required("Name", &body["Name"])?;
1307 let name = body["Name"]
1308 .as_str()
1309 .ok_or_else(|| missing("Name"))?
1310 .to_string();
1311 validate_string_length("name", &name, 1, 256)?;
1312
1313 let accounts = self.state.read();
1314 let empty = EventBridgeState::new(&req.account_id, &req.region);
1315 let state = accounts.get(&req.account_id).unwrap_or(&empty);
1316 let ps = state.partner_event_sources.get(&name).ok_or_else(|| {
1317 AwsServiceError::aws_error(
1318 StatusCode::NOT_FOUND,
1319 "ResourceNotFoundException",
1320 format!("Partner event source {name} does not exist."),
1321 )
1322 })?;
1323
1324 Ok(AwsResponse::ok_json(json!({
1325 "Arn": ps.arn,
1326 "Name": ps.name,
1327 })))
1328 }
1329
1330 fn list_partner_event_sources(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1331 let body = req.json_body();
1332 validate_required("namePrefix", &body["NamePrefix"])?;
1333 let name_prefix = body["NamePrefix"]
1334 .as_str()
1335 .ok_or_else(|| missing("NamePrefix"))?;
1336 validate_string_length("namePrefix", name_prefix, 1, 256)?;
1337 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1338 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1339 let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
1340
1341 let accounts = self.state.read();
1342 let empty = EventBridgeState::new(&req.account_id, &req.region);
1343 let state = accounts.get(&req.account_id).unwrap_or(&empty);
1344 let all: Vec<Value> = state
1345 .partner_event_sources
1346 .values()
1347 .filter(|ps| ps.name.starts_with(name_prefix))
1348 .map(|ps| {
1349 json!({
1350 "Arn": ps.arn,
1351 "Name": ps.name,
1352 })
1353 })
1354 .collect();
1355
1356 let (sources, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
1357 let mut resp = json!({ "PartnerEventSources": sources });
1358 if let Some(token) = next_token {
1359 resp["NextToken"] = json!(token);
1360 }
1361
1362 Ok(AwsResponse::ok_json(resp))
1363 }
1364
1365 fn list_partner_event_source_accounts(
1366 &self,
1367 req: &AwsRequest,
1368 ) -> Result<AwsResponse, AwsServiceError> {
1369 let body = req.json_body();
1370 validate_required("EventSourceName", &body["EventSourceName"])?;
1371 let event_source_name = body["EventSourceName"]
1372 .as_str()
1373 .ok_or_else(|| missing("EventSourceName"))?;
1374 validate_string_length("eventSourceName", event_source_name, 1, 256)?;
1375 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1376 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1377
1378 let accounts = self.state.read();
1379 let empty = EventBridgeState::new(&req.account_id, &req.region);
1380 let state = accounts.get(&req.account_id).unwrap_or(&empty);
1381 let accounts: Vec<Value> = state
1382 .partner_event_sources
1383 .values()
1384 .filter(|ps| ps.name == event_source_name)
1385 .map(|ps| json!({ "Account": ps.account }))
1386 .collect();
1387
1388 Ok(AwsResponse::ok_json(json!({
1389 "PartnerEventSourceAccounts": accounts
1390 })))
1391 }
1392
1393 fn activate_event_source(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1394 let body = req.json_body();
1395 validate_required("Name", &body["Name"])?;
1396 let name = body["Name"]
1397 .as_str()
1398 .ok_or_else(|| missing("Name"))?
1399 .to_string();
1400
1401 let mut accounts = self.state.write();
1402 let state = accounts.get_or_create(&req.account_id);
1403 let ps = state.partner_event_sources.get_mut(&name).ok_or_else(|| {
1404 AwsServiceError::aws_error(
1405 StatusCode::NOT_FOUND,
1406 "ResourceNotFoundException",
1407 format!("Event source {name} does not exist."),
1408 )
1409 })?;
1410 ps.state = "ACTIVE".to_string();
1411
1412 Ok(AwsResponse::ok_json(json!({})))
1413 }
1414
1415 fn deactivate_event_source(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1416 let body = req.json_body();
1417 validate_required("Name", &body["Name"])?;
1418 let name = body["Name"]
1419 .as_str()
1420 .ok_or_else(|| missing("Name"))?
1421 .to_string();
1422
1423 let mut accounts = self.state.write();
1424 let state = accounts.get_or_create(&req.account_id);
1425 let ps = state.partner_event_sources.get_mut(&name).ok_or_else(|| {
1426 AwsServiceError::aws_error(
1427 StatusCode::NOT_FOUND,
1428 "ResourceNotFoundException",
1429 format!("Event source {name} does not exist."),
1430 )
1431 })?;
1432 ps.state = "INACTIVE".to_string();
1433
1434 Ok(AwsResponse::ok_json(json!({})))
1435 }
1436
1437 fn describe_event_source(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1438 let body = req.json_body();
1439 validate_required("Name", &body["Name"])?;
1440 let name = body["Name"]
1441 .as_str()
1442 .ok_or_else(|| missing("Name"))?
1443 .to_string();
1444
1445 let accounts = self.state.read();
1446 let empty = EventBridgeState::new(&req.account_id, &req.region);
1447 let state = accounts.get(&req.account_id).unwrap_or(&empty);
1448 let ps = state.partner_event_sources.get(&name).ok_or_else(|| {
1449 AwsServiceError::aws_error(
1450 StatusCode::NOT_FOUND,
1451 "ResourceNotFoundException",
1452 format!("Event source {name} does not exist."),
1453 )
1454 })?;
1455
1456 Ok(AwsResponse::ok_json(json!({
1457 "Arn": ps.arn,
1458 "Name": ps.name,
1459 "CreatedBy": ps.account,
1460 "CreationTime": ps.creation_time.timestamp() as f64,
1461 "State": ps.state,
1462 })))
1463 }
1464
1465 fn list_event_sources(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1466 let body = req.json_body();
1467 validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 256)?;
1468 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1469 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1470 let name_prefix = body["NamePrefix"].as_str();
1471 let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
1472
1473 let accounts = self.state.read();
1474 let empty = EventBridgeState::new(&req.account_id, &req.region);
1475 let state = accounts.get(&req.account_id).unwrap_or(&empty);
1476 let all: Vec<Value> = state
1477 .partner_event_sources
1478 .values()
1479 .filter(|ps| match name_prefix {
1480 Some(prefix) => ps.name.starts_with(prefix),
1481 None => true,
1482 })
1483 .map(|ps| {
1484 json!({
1485 "Arn": ps.arn,
1486 "Name": ps.name,
1487 "CreatedBy": ps.account,
1488 "CreationTime": ps.creation_time.timestamp() as f64,
1489 "State": ps.state,
1490 })
1491 })
1492 .collect();
1493
1494 let (sources, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
1495 let mut resp = json!({ "EventSources": sources });
1496 if let Some(token) = next_token {
1497 resp["NextToken"] = json!(token);
1498 }
1499
1500 Ok(AwsResponse::ok_json(resp))
1501 }
1502
1503 fn put_partner_events(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1504 let body = req.json_body();
1505 validate_required("Entries", &body["Entries"])?;
1506 let entries = body["Entries"]
1507 .as_array()
1508 .ok_or_else(|| missing("Entries"))?;
1509
1510 let mut result_entries = Vec::new();
1511 for _entry in entries {
1512 let event_id = uuid::Uuid::new_v4().to_string();
1513 result_entries.push(json!({ "EventId": event_id }));
1514 }
1515
1516 Ok(AwsResponse::ok_json(json!({
1517 "FailedEntryCount": 0,
1518 "Entries": result_entries,
1519 })))
1520 }
1521
1522 fn test_event_pattern(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1525 let body = req.json_body();
1526 validate_required("EventPattern", &body["EventPattern"])?;
1527 validate_required("Event", &body["Event"])?;
1528 let event_pattern = body["EventPattern"]
1529 .as_str()
1530 .ok_or_else(|| missing("EventPattern"))?;
1531 let event_str = body["Event"].as_str().ok_or_else(|| missing("Event"))?;
1532
1533 let event: Value = serde_json::from_str(event_str).map_err(|_| {
1535 AwsServiceError::aws_error(
1536 StatusCode::BAD_REQUEST,
1537 "InvalidEventPatternException",
1538 "Event is not valid JSON.",
1539 )
1540 })?;
1541
1542 let _pattern: Value = serde_json::from_str(event_pattern).map_err(|_| {
1544 AwsServiceError::aws_error(
1545 StatusCode::BAD_REQUEST,
1546 "InvalidEventPatternException",
1547 "Event pattern is not valid JSON.",
1548 )
1549 })?;
1550
1551 let source = event["source"].as_str().unwrap_or("");
1552 let detail_type = event["detail-type"].as_str().unwrap_or("");
1553 let detail = event
1554 .get("detail")
1555 .map(|v| serde_json::to_string(v).unwrap_or_default())
1556 .unwrap_or_else(|| "{}".to_string());
1557 let account = event["account"].as_str().unwrap_or("");
1558 let region = event["region"].as_str().unwrap_or("");
1559 let resources: Vec<String> = event["resources"]
1560 .as_array()
1561 .map(|arr| {
1562 arr.iter()
1563 .filter_map(|v| v.as_str().map(|s| s.to_string()))
1564 .collect()
1565 })
1566 .unwrap_or_default();
1567
1568 let result = matches_pattern(
1569 Some(event_pattern),
1570 source,
1571 detail_type,
1572 &detail,
1573 account,
1574 region,
1575 &resources,
1576 );
1577
1578 Ok(AwsResponse::ok_json(json!({ "Result": result })))
1579 }
1580
1581 fn update_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1584 let body = req.json_body();
1585 validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
1586 validate_optional_string_length(
1587 "kmsKeyIdentifier",
1588 body["KmsKeyIdentifier"].as_str(),
1589 0,
1590 2048,
1591 )?;
1592 let name = body["Name"].as_str().unwrap_or("default");
1593
1594 let mut accounts = self.state.write();
1595 let state = accounts.get_or_create(&req.account_id);
1596 let bus = state.buses.get_mut(name).ok_or_else(|| {
1597 AwsServiceError::aws_error(
1598 StatusCode::BAD_REQUEST,
1599 "ResourceNotFoundException",
1600 format!("Event bus {name} does not exist."),
1601 )
1602 })?;
1603
1604 if let Some(desc) = body["Description"].as_str() {
1605 bus.description = Some(desc.to_string());
1606 }
1607 if let Some(kms) = body["KmsKeyIdentifier"].as_str() {
1608 bus.kms_key_identifier = Some(kms.to_string());
1609 }
1610 if let Some(dlc) = body.get("DeadLetterConfig") {
1611 bus.dead_letter_config = Some(dlc.clone());
1612 }
1613 bus.last_modified_time = Utc::now();
1614
1615 let arn = bus.arn.clone();
1616 let bus_name = bus.name.clone();
1617
1618 Ok(AwsResponse::ok_json(json!({
1619 "Arn": arn,
1620 "Name": bus_name,
1621 })))
1622 }
1623
1624 fn create_endpoint(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1627 let body = req.json_body();
1628 validate_required("Name", &body["Name"])?;
1629 let name = body["Name"]
1630 .as_str()
1631 .ok_or_else(|| missing("Name"))?
1632 .to_string();
1633 validate_string_length("name", &name, 1, 64)?;
1634 validate_required("RoutingConfig", &body["RoutingConfig"])?;
1635 validate_required("EventBuses", &body["EventBuses"])?;
1636
1637 let description = body["Description"].as_str().map(|s| s.to_string());
1638 let routing_config = body["RoutingConfig"].clone();
1639 let replication_config = body.get("ReplicationConfig").cloned();
1640 let event_buses = body["EventBuses"].as_array().cloned().unwrap_or_default();
1641 let role_arn = body["RoleArn"].as_str().map(|s| s.to_string());
1642
1643 let mut accounts = self.state.write();
1644 let state = accounts.get_or_create(&req.account_id);
1645 if state.endpoints.contains_key(&name) {
1646 return Err(AwsServiceError::aws_error(
1647 StatusCode::CONFLICT,
1648 "ResourceAlreadyExistsException",
1649 format!("Endpoint {name} already exists."),
1650 ));
1651 }
1652
1653 let endpoint_id = format!("{}.abc123", name);
1654 let arn = format!(
1655 "arn:aws:events:{}:{}:endpoint/{}",
1656 req.region, state.account_id, name
1657 );
1658 let endpoint_url = format!(
1659 "https://{}.endpoint.events.{}.amazonaws.com",
1660 endpoint_id, req.region
1661 );
1662 let now = Utc::now();
1663
1664 let endpoint = Endpoint {
1665 name: name.clone(),
1666 arn: arn.clone(),
1667 endpoint_id: endpoint_id.clone(),
1668 endpoint_url: Some(endpoint_url),
1669 description,
1670 routing_config: routing_config.clone(),
1671 replication_config: replication_config.clone(),
1672 event_buses: event_buses.clone(),
1673 role_arn: role_arn.clone(),
1674 state: "ACTIVE".to_string(),
1675 creation_time: now,
1676 last_modified_time: now,
1677 };
1678 state.endpoints.insert(name.clone(), endpoint);
1679
1680 let mut resp = json!({
1681 "Name": name,
1682 "Arn": arn,
1683 "State": "ACTIVE",
1684 "RoutingConfig": routing_config,
1685 "EventBuses": event_buses,
1686 });
1687 if let Some(ref rc) = replication_config {
1688 resp["ReplicationConfig"] = rc.clone();
1689 }
1690 if let Some(ref ra) = role_arn {
1691 resp["RoleArn"] = json!(ra);
1692 }
1693
1694 Ok(AwsResponse::ok_json(resp))
1695 }
1696
1697 fn delete_endpoint(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1698 let body = req.json_body();
1699 validate_required("Name", &body["Name"])?;
1700 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
1701
1702 let mut accounts = self.state.write();
1703 let state = accounts.get_or_create(&req.account_id);
1704 state.endpoints.remove(name).ok_or_else(|| {
1705 AwsServiceError::aws_error(
1706 StatusCode::BAD_REQUEST,
1707 "ResourceNotFoundException",
1708 format!("Endpoint '{name}' does not exist."),
1709 )
1710 })?;
1711
1712 Ok(AwsResponse::ok_json(json!({})))
1713 }
1714
1715 fn describe_endpoint(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1716 let body = req.json_body();
1717 validate_required("Name", &body["Name"])?;
1718 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
1719
1720 let accounts = self.state.read();
1721 let empty = EventBridgeState::new(&req.account_id, &req.region);
1722 let state = accounts.get(&req.account_id).unwrap_or(&empty);
1723 let ep = state.endpoints.get(name).ok_or_else(|| {
1724 AwsServiceError::aws_error(
1725 StatusCode::BAD_REQUEST,
1726 "ResourceNotFoundException",
1727 format!("Endpoint '{name}' does not exist."),
1728 )
1729 })?;
1730
1731 let mut resp = json!({
1732 "Name": ep.name,
1733 "Arn": ep.arn,
1734 "EndpointId": ep.endpoint_id,
1735 "State": ep.state,
1736 "RoutingConfig": ep.routing_config,
1737 "EventBuses": ep.event_buses,
1738 "CreationTime": ep.creation_time.timestamp() as f64,
1739 "LastModifiedTime": ep.last_modified_time.timestamp() as f64,
1740 });
1741 if let Some(ref url) = ep.endpoint_url {
1742 resp["EndpointUrl"] = json!(url);
1743 }
1744 if let Some(ref desc) = ep.description {
1745 resp["Description"] = json!(desc);
1746 }
1747 if let Some(ref rc) = ep.replication_config {
1748 resp["ReplicationConfig"] = rc.clone();
1749 }
1750 if let Some(ref ra) = ep.role_arn {
1751 resp["RoleArn"] = json!(ra);
1752 }
1753
1754 Ok(AwsResponse::ok_json(resp))
1755 }
1756
1757 fn list_endpoints(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1758 let body = req.json_body();
1759 validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 64)?;
1760 validate_optional_string_length("homeRegion", body["HomeRegion"].as_str(), 9, 20)?;
1761 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1762 validate_optional_range_i64("maxResults", body["MaxResults"].as_i64(), 1, 100)?;
1763 let name_prefix = body["NamePrefix"].as_str();
1764 let limit = body["MaxResults"].as_i64().unwrap_or(100) as usize;
1765
1766 let accounts = self.state.read();
1767 let empty = EventBridgeState::new(&req.account_id, &req.region);
1768 let state = accounts.get(&req.account_id).unwrap_or(&empty);
1769 let all: Vec<Value> = state
1770 .endpoints
1771 .values()
1772 .filter(|ep| match name_prefix {
1773 Some(prefix) => ep.name.starts_with(prefix),
1774 None => true,
1775 })
1776 .map(|ep| {
1777 let mut obj = json!({
1778 "Name": ep.name,
1779 "Arn": ep.arn,
1780 "EndpointId": ep.endpoint_id,
1781 "State": ep.state,
1782 "RoutingConfig": ep.routing_config,
1783 "EventBuses": ep.event_buses,
1784 "CreationTime": ep.creation_time.timestamp() as f64,
1785 "LastModifiedTime": ep.last_modified_time.timestamp() as f64,
1786 });
1787 if let Some(ref url) = ep.endpoint_url {
1788 obj["EndpointUrl"] = json!(url);
1789 }
1790 obj
1791 })
1792 .collect();
1793
1794 let (endpoints, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
1795 let mut resp = json!({ "Endpoints": endpoints });
1796 if let Some(token) = next_token {
1797 resp["NextToken"] = json!(token);
1798 }
1799
1800 Ok(AwsResponse::ok_json(resp))
1801 }
1802
1803 fn update_endpoint(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1804 let body = req.json_body();
1805 validate_required("Name", &body["Name"])?;
1806 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
1807
1808 let mut accounts = self.state.write();
1809 let state = accounts.get_or_create(&req.account_id);
1810 let ep = state.endpoints.get_mut(name).ok_or_else(|| {
1811 AwsServiceError::aws_error(
1812 StatusCode::BAD_REQUEST,
1813 "ResourceNotFoundException",
1814 format!("Endpoint '{name}' does not exist."),
1815 )
1816 })?;
1817
1818 if let Some(desc) = body["Description"].as_str() {
1819 ep.description = Some(desc.to_string());
1820 }
1821 if !body["RoutingConfig"].is_null() {
1822 ep.routing_config = body["RoutingConfig"].clone();
1823 }
1824 if let Some(rc) = body.get("ReplicationConfig") {
1825 ep.replication_config = Some(rc.clone());
1826 }
1827 if let Some(buses) = body["EventBuses"].as_array() {
1828 ep.event_buses = buses.clone();
1829 }
1830 if let Some(ra) = body["RoleArn"].as_str() {
1831 ep.role_arn = Some(ra.to_string());
1832 }
1833 ep.last_modified_time = Utc::now();
1834
1835 let resp = json!({
1836 "Name": ep.name,
1837 "Arn": ep.arn,
1838 "EndpointId": ep.endpoint_id,
1839 "State": ep.state,
1840 "RoutingConfig": ep.routing_config,
1841 "EventBuses": ep.event_buses,
1842 });
1843
1844 Ok(AwsResponse::ok_json(resp))
1845 }
1846
1847 fn deauthorize_connection(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1850 let body = req.json_body();
1851 validate_required("Name", &body["Name"])?;
1852 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
1853 validate_string_length("name", name, 1, 64)?;
1854
1855 let mut accounts = self.state.write();
1856 let state = accounts.get_or_create(&req.account_id);
1857 let conn = state.connections.get_mut(name).ok_or_else(|| {
1858 AwsServiceError::aws_error(
1859 StatusCode::BAD_REQUEST,
1860 "ResourceNotFoundException",
1861 format!("Connection '{name}' does not exist."),
1862 )
1863 })?;
1864
1865 conn.connection_state = "DEAUTHORIZING".to_string();
1866 conn.last_modified_time = Utc::now();
1867
1868 let resp = json!({
1869 "ConnectionArn": conn.arn,
1870 "ConnectionState": conn.connection_state,
1871 "CreationTime": conn.creation_time.timestamp() as f64,
1872 "LastModifiedTime": conn.last_modified_time.timestamp() as f64,
1873 "LastAuthorizedTime": conn.last_authorized_time.timestamp() as f64,
1874 });
1875
1876 Ok(AwsResponse::ok_json(resp))
1877 }
1878
1879 fn put_events(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1882 let body = req.json_body();
1883 validate_required("Entries", &body["Entries"])?;
1884 validate_optional_string_length("endpointId", body["EndpointId"].as_str(), 1, 50)?;
1885 let entries = body["Entries"]
1886 .as_array()
1887 .ok_or_else(|| missing("Entries"))?;
1888
1889 if entries.is_empty() {
1891 return Err(AwsServiceError::aws_error(
1892 StatusCode::BAD_REQUEST,
1893 "ValidationException",
1894 "1 validation error detected: Value '[PutEventsRequestEntry]' at 'entries' failed to satisfy constraint: Member must have length greater than or equal to 1",
1895 ));
1896 }
1897 if entries.len() > 10 {
1898 return Err(AwsServiceError::aws_error(
1899 StatusCode::BAD_REQUEST,
1900 "ValidationException",
1901 "1 validation error detected: Value '[PutEventsRequestEntry]' at 'entries' failed to satisfy constraint: Member must have length less than or equal to 10",
1902 ));
1903 }
1904
1905 let mut accounts = self.state.write();
1906 let state = accounts.get_or_create(&req.account_id);
1907 let mut result_entries = Vec::new();
1908 let mut events_to_deliver = Vec::new();
1909 let mut failed_count = 0;
1910
1911 for entry in entries {
1912 let source = entry["Source"].as_str().unwrap_or("").to_string();
1913 let detail_type = entry["DetailType"].as_str().unwrap_or("").to_string();
1914 let detail = entry["Detail"].as_str().unwrap_or("").to_string();
1915
1916 if let Err(error) = validate_put_events_entry(&source, &detail_type, &detail) {
1917 failed_count += 1;
1918 result_entries.push(error);
1919 continue;
1920 }
1921
1922 let event_id = uuid::Uuid::new_v4().to_string();
1923 let raw_bus = entry["EventBusName"]
1924 .as_str()
1925 .unwrap_or("default")
1926 .to_string();
1927 let event_bus_name = state.resolve_bus_name(&raw_bus);
1928 let time = parse_put_events_time(&entry["Time"]);
1929 let resources: Vec<String> = entry["Resources"]
1930 .as_array()
1931 .map(|arr| {
1932 arr.iter()
1933 .filter_map(|v| v.as_str().map(|s| s.to_string()))
1934 .collect()
1935 })
1936 .unwrap_or_default();
1937
1938 let event = PutEvent {
1939 event_id: event_id.clone(),
1940 source: source.clone(),
1941 detail_type: detail_type.clone(),
1942 detail: detail.clone(),
1943 event_bus_name: event_bus_name.clone(),
1944 time,
1945 resources: resources.clone(),
1946 };
1947
1948 archive_matching_event(
1949 state,
1950 &event,
1951 &event_bus_name,
1952 &source,
1953 &detail_type,
1954 &detail,
1955 &req.account_id,
1956 &req.region,
1957 &resources,
1958 );
1959
1960 state.events.push(event);
1961
1962 let matching_targets: Vec<EventTarget> = state
1964 .rules
1965 .values()
1966 .filter(|r| {
1967 r.event_bus_name == event_bus_name
1968 && r.state == "ENABLED"
1969 && matches_pattern(
1970 r.event_pattern.as_deref(),
1971 &source,
1972 &detail_type,
1973 &detail,
1974 &req.account_id,
1975 &req.region,
1976 &resources,
1977 )
1978 })
1979 .flat_map(|r| r.targets.clone())
1980 .collect();
1981
1982 if !matching_targets.is_empty() {
1983 events_to_deliver.push((
1984 event_id.clone(),
1985 source,
1986 detail_type,
1987 detail,
1988 time,
1989 resources,
1990 matching_targets,
1991 ));
1992 }
1993
1994 result_entries.push(json!({ "EventId": event_id }));
1995 }
1996
1997 drop(accounts);
1999
2000 for (event_id, source, detail_type, detail, time, resources, targets) in events_to_deliver {
2002 let detail_value: Value = serde_json::from_str(&detail).unwrap_or(json!({}));
2003 let event_json = json!({
2004 "version": "0",
2005 "id": event_id,
2006 "source": source,
2007 "account": req.account_id,
2008 "detail-type": detail_type,
2009 "detail": detail_value,
2010 "time": time.format("%Y-%m-%dT%H:%M:%SZ").to_string(),
2011 "region": req.region,
2012 "resources": resources,
2013 });
2014 let event_str = event_json.to_string();
2015
2016 for target in targets {
2017 let arn = &target.arn;
2018 let body_str = if let Some(ref transformer) = target.input_transformer {
2020 apply_input_transformer(transformer, &event_json)
2021 } else if let Some(ref input) = target.input {
2022 input.clone()
2023 } else if let Some(ref input_path) = target.input_path {
2024 resolve_json_path(&event_json, input_path)
2025 .map(|v| v.to_string())
2026 .unwrap_or_else(|| event_str.clone())
2027 } else {
2028 event_str.clone()
2029 };
2030
2031 if arn.contains(":sqs:") {
2032 let group_id = target
2034 .sqs_parameters
2035 .as_ref()
2036 .and_then(|p| p["MessageGroupId"].as_str())
2037 .map(|s| s.to_string());
2038 if group_id.is_some() {
2039 self.delivery.send_to_sqs_with_attrs(
2043 arn,
2044 &body_str,
2045 &HashMap::new(),
2046 group_id.as_deref(),
2047 None,
2048 );
2049 } else {
2050 self.delivery.send_to_sqs(arn, &body_str, &HashMap::new());
2051 }
2052 } else if arn.contains(":sns:") {
2053 self.delivery
2054 .publish_to_sns(arn, &body_str, Some(&detail_type));
2055 } else if arn.contains(":lambda:") {
2056 tracing::info!(
2057 function_arn = %arn,
2058 payload = %body_str,
2059 "EventBridge delivering to Lambda function"
2060 );
2061 let now = Utc::now();
2062 let mut accounts = self.state.write();
2063 let state = accounts.get_or_create(&req.account_id);
2064 state
2065 .lambda_invocations
2066 .push(crate::state::LambdaInvocation {
2067 function_arn: arn.clone(),
2068 payload: body_str.clone(),
2069 timestamp: now,
2070 });
2071 drop(accounts);
2072 if let Some(ref ls) = self.lambda_state {
2074 ls.write().default_mut().invocations.push(LambdaInvocation {
2075 function_arn: arn.clone(),
2076 payload: body_str.clone(),
2077 timestamp: now,
2078 source: "aws:events".to_string(),
2079 });
2080 }
2081 invoke_lambda_async(
2083 &self.container_runtime,
2084 &self.lambda_state,
2085 arn,
2086 &body_str,
2087 );
2088 } else if arn.contains(":logs:") {
2089 tracing::info!(
2090 log_group_arn = %arn,
2091 payload = %body_str,
2092 "EventBridge delivering to CloudWatch Logs"
2093 );
2094 let now = Utc::now();
2095 let mut accounts = self.state.write();
2096 let state = accounts.get_or_create(&req.account_id);
2097 state.log_deliveries.push(crate::state::LogDelivery {
2098 log_group_arn: arn.clone(),
2099 payload: body_str.clone(),
2100 timestamp: now,
2101 });
2102 drop(accounts);
2103 if let Some(ref log_state) = self.logs_state {
2105 deliver_to_logs(log_state, arn, &body_str, now);
2106 }
2107 } else if arn.contains(":kinesis:") {
2108 tracing::info!(
2109 stream_arn = %arn,
2110 "EventBridge delivering to Kinesis stream"
2111 );
2112 self.delivery.send_to_kinesis(arn, &body_str, &event_id);
2114 } else if arn.contains(":states:") {
2115 tracing::info!(
2116 state_machine_arn = %arn,
2117 "EventBridge delivering to Step Functions"
2118 );
2119 self.delivery.start_stepfunctions_execution(arn, &body_str);
2120 let mut accounts = self.state.write();
2121 let state = accounts.get_or_create(&req.account_id);
2122 state
2123 .step_function_executions
2124 .push(crate::state::StepFunctionExecution {
2125 state_machine_arn: arn.clone(),
2126 payload: body_str.clone(),
2127 timestamp: Utc::now(),
2128 });
2129 } else if arn.contains(":api-destination/") {
2130 let accounts = self.state.read();
2132 let empty = EventBridgeState::new(&req.account_id, &req.region);
2133 let state = accounts.get(&req.account_id).unwrap_or(&empty);
2134 let dest = state.api_destinations.values().find(|d| d.arn == *arn);
2135 if let Some(dest) = dest {
2136 let url = dest.invocation_endpoint.clone();
2137 let method = dest.http_method.clone();
2138 let conn = state
2139 .connections
2140 .values()
2141 .find(|c| c.arn == dest.connection_arn)
2142 .cloned();
2143 drop(accounts);
2144
2145 let payload = body_str.clone();
2146 tokio::spawn(async move {
2147 let client = reqwest::Client::new();
2148 let mut req_builder = match method.as_str() {
2149 "GET" => client.get(&url),
2150 "PUT" => client.put(&url),
2151 "DELETE" => client.delete(&url),
2152 "PATCH" => client.patch(&url),
2153 "HEAD" => client.head(&url),
2154 _ => client.post(&url),
2155 };
2156 req_builder = req_builder.header("Content-Type", "application/json");
2157
2158 if let Some(conn) = conn {
2160 req_builder = apply_connection_auth(req_builder, &conn);
2161 }
2162
2163 let result = req_builder.body(payload).send().await;
2164 if let Err(e) = result {
2165 tracing::warn!(
2166 endpoint = %url,
2167 error = %e,
2168 "EventBridge ApiDestination delivery failed"
2169 );
2170 }
2171 });
2172 }
2173 } else if arn.starts_with("https://") || arn.starts_with("http://") {
2174 let url = arn.clone();
2176 let payload = body_str.clone();
2177 tokio::spawn(async move {
2178 let client = reqwest::Client::new();
2179 let result = client
2180 .post(&url)
2181 .header("Content-Type", "application/json")
2182 .body(payload)
2183 .send()
2184 .await;
2185 if let Err(e) = result {
2186 tracing::warn!(
2187 endpoint = %url,
2188 error = %e,
2189 "EventBridge HTTP target delivery failed"
2190 );
2191 }
2192 });
2193 }
2194 }
2195 }
2196
2197 let resp = json!({
2198 "FailedEntryCount": failed_count,
2199 "Entries": result_entries,
2200 });
2201
2202 Ok(AwsResponse::ok_json(resp))
2203 }
2204
2205 fn tag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2208 let body = req.json_body();
2209 validate_required("ResourceARN", &body["ResourceARN"])?;
2210 let arn = body["ResourceARN"]
2211 .as_str()
2212 .ok_or_else(|| missing("ResourceARN"))?;
2213 validate_string_length("resourceARN", arn, 1, 1600)?;
2214 validate_required("Tags", &body["Tags"])?;
2215
2216 let mut accounts = self.state.write();
2217 let state = accounts.get_or_create(&req.account_id);
2218 let tag_map = find_tags_mut(state, arn)?;
2219
2220 fakecloud_core::tags::apply_tags(tag_map, &body, "Tags", "Key", "Value").map_err(|f| {
2221 AwsServiceError::aws_error(
2222 StatusCode::BAD_REQUEST,
2223 "ValidationException",
2224 format!("{f} must be a list"),
2225 )
2226 })?;
2227
2228 Ok(AwsResponse::ok_json(json!({})))
2229 }
2230
2231 fn untag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2232 let body = req.json_body();
2233 validate_required("ResourceARN", &body["ResourceARN"])?;
2234 let arn = body["ResourceARN"]
2235 .as_str()
2236 .ok_or_else(|| missing("ResourceARN"))?;
2237 validate_string_length("resourceARN", arn, 1, 1600)?;
2238 validate_required("TagKeys", &body["TagKeys"])?;
2239
2240 let mut accounts = self.state.write();
2241 let state = accounts.get_or_create(&req.account_id);
2242 let tag_map = find_tags_mut(state, arn)?;
2243
2244 fakecloud_core::tags::remove_tags(tag_map, &body, "TagKeys").map_err(|f| {
2245 AwsServiceError::aws_error(
2246 StatusCode::BAD_REQUEST,
2247 "ValidationException",
2248 format!("{f} must be a list"),
2249 )
2250 })?;
2251
2252 Ok(AwsResponse::ok_json(json!({})))
2253 }
2254
2255 fn list_tags_for_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2256 let body = req.json_body();
2257 validate_required("ResourceARN", &body["ResourceARN"])?;
2258 let arn = body["ResourceARN"]
2259 .as_str()
2260 .ok_or_else(|| missing("ResourceARN"))?;
2261 validate_string_length("resourceARN", arn, 1, 1600)?;
2262
2263 let accounts = self.state.read();
2264 let empty = EventBridgeState::new(&req.account_id, &req.region);
2265 let state = accounts.get(&req.account_id).unwrap_or(&empty);
2266 let tag_map = find_tags(state, arn)?;
2267
2268 let tags = fakecloud_core::tags::tags_to_json(tag_map, "Key", "Value");
2269
2270 Ok(AwsResponse::ok_json(json!({ "Tags": tags })))
2271 }
2272
2273 fn create_archive(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2276 let body = req.json_body();
2277 validate_required("ArchiveName", &body["ArchiveName"])?;
2278 let name = body["ArchiveName"]
2279 .as_str()
2280 .ok_or_else(|| missing("ArchiveName"))?
2281 .to_string();
2282 validate_string_length("archiveName", &name, 1, 48)?;
2283 validate_required("EventSourceArn", &body["EventSourceArn"])?;
2284 let event_source_arn = body["EventSourceArn"]
2285 .as_str()
2286 .ok_or_else(|| missing("EventSourceArn"))?
2287 .to_string();
2288 validate_string_length("eventSourceArn", &event_source_arn, 1, 1600)?;
2289 validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2290 validate_optional_string_length("eventPattern", body["EventPattern"].as_str(), 0, 4096)?;
2291 if let Some(rd) = body["RetentionDays"].as_i64() {
2292 validate_range_i64("retentionDays", rd, 0, i64::MAX)?;
2293 }
2294 let description = body["Description"].as_str().map(|s| s.to_string());
2295 let event_pattern = body["EventPattern"].as_str().map(|s| s.to_string());
2296 let retention_days = body["RetentionDays"].as_i64().unwrap_or(0);
2297
2298 if let Some(ref pattern) = event_pattern {
2300 validate_event_pattern(pattern)?;
2301 }
2302
2303 let mut accounts = self.state.write();
2304 let state = accounts.get_or_create(&req.account_id);
2305
2306 let bus_name = state.resolve_bus_name(&event_source_arn);
2308 if !state.buses.contains_key(&bus_name) {
2309 return Err(AwsServiceError::aws_error(
2310 StatusCode::BAD_REQUEST,
2311 "ResourceNotFoundException",
2312 format!("Event bus {bus_name} does not exist."),
2313 ));
2314 }
2315
2316 if state.archives.contains_key(&name) {
2318 return Err(AwsServiceError::aws_error(
2319 StatusCode::BAD_REQUEST,
2320 "ResourceAlreadyExistsException",
2321 format!("Archive {name} already exists."),
2322 ));
2323 }
2324
2325 let now = Utc::now();
2326 let arn = format!(
2327 "arn:aws:events:{}:{}:archive/{}",
2328 req.region, state.account_id, name
2329 );
2330
2331 let archive = Archive {
2332 name: name.clone(),
2333 arn: arn.clone(),
2334 event_source_arn: event_source_arn.clone(),
2335 description,
2336 event_pattern: event_pattern.clone(),
2337 retention_days,
2338 state: "ENABLED".to_string(),
2339 creation_time: now,
2340 event_count: 0,
2341 size_bytes: 0,
2342 events: Vec::new(),
2343 };
2344 state.archives.insert(name.clone(), archive);
2345
2346 let rule_name = format!("Events-Archive-{name}");
2348 let rule_arn = format!(
2349 "arn:aws:events:{}:{}:rule/{}",
2350 req.region, state.account_id, rule_name
2351 );
2352 let rule_event_pattern = {
2354 let mut merged = if let Some(ref ep) = event_pattern {
2355 serde_json::from_str::<Value>(ep).unwrap_or_else(|_| json!({}))
2356 } else {
2357 json!({})
2358 };
2359 if let Some(obj) = merged.as_object_mut() {
2360 obj.insert("replay-name".to_string(), json!([{"exists": false}]));
2361 }
2362 serde_json::to_string(&merged).unwrap_or_default()
2363 };
2364
2365 let archive_target = EventTarget {
2367 id: name.clone(),
2368 arn: format!("arn:aws:events:{}:::", req.region),
2369 input: None,
2370 input_path: None,
2371 input_transformer: Some(json!({
2372 "InputPathsMap": {},
2373 "InputTemplate": format!(
2374 "{{\"archive-arn\": \"{}\", \"event\": <aws.events.event.json>, \"ingestion-time\": <aws.events.event.ingestion-time>}}",
2375 arn
2376 )
2377 })),
2378 sqs_parameters: None,
2379 };
2380
2381 let archive_rule = EventRule {
2382 name: rule_name.clone(),
2383 arn: rule_arn,
2384 event_bus_name: bus_name.clone(),
2385 event_pattern: Some(rule_event_pattern),
2386 schedule_expression: None,
2387 state: "ENABLED".to_string(),
2388 description: None,
2389 role_arn: None,
2390 managed_by: Some("prod.vhs.events.aws.internal".to_string()),
2391 created_by: Some(state.account_id.clone()),
2392 targets: vec![archive_target],
2393 tags: HashMap::new(),
2394 last_fired: None,
2395 };
2396 let key = (bus_name, rule_name);
2397 state.rules.insert(key, archive_rule);
2398
2399 Ok(AwsResponse::ok_json(json!({
2400 "ArchiveArn": arn,
2401 "CreationTime": now.timestamp() as f64,
2402 "State": "ENABLED",
2403 })))
2404 }
2405
2406 fn describe_archive(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2407 let body = req.json_body();
2408 validate_required("ArchiveName", &body["ArchiveName"])?;
2409 let name = body["ArchiveName"]
2410 .as_str()
2411 .ok_or_else(|| missing("ArchiveName"))?;
2412 validate_string_length("archiveName", name, 1, 48)?;
2413
2414 let accounts = self.state.read();
2415 let empty = EventBridgeState::new(&req.account_id, &req.region);
2416 let state = accounts.get(&req.account_id).unwrap_or(&empty);
2417 let archive = state.archives.get(name).ok_or_else(|| {
2418 AwsServiceError::aws_error(
2419 StatusCode::BAD_REQUEST,
2420 "ResourceNotFoundException",
2421 format!("Archive {name} does not exist."),
2422 )
2423 })?;
2424
2425 let mut resp = json!({
2426 "ArchiveArn": archive.arn,
2427 "ArchiveName": archive.name,
2428 "CreationTime": archive.creation_time.timestamp() as f64,
2429 "EventCount": archive.event_count,
2430 "EventSourceArn": archive.event_source_arn,
2431 "RetentionDays": archive.retention_days,
2432 "SizeBytes": archive.size_bytes,
2433 "State": archive.state,
2434 });
2435 if let Some(ref desc) = archive.description {
2436 resp["Description"] = json!(desc);
2437 }
2438 if let Some(ref ep) = archive.event_pattern {
2439 resp["EventPattern"] = json!(ep);
2440 }
2441
2442 Ok(AwsResponse::ok_json(resp))
2443 }
2444
2445 fn list_archives(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2446 let body = req.json_body();
2447 validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 48)?;
2448 validate_optional_string_length(
2449 "eventSourceArn",
2450 body["EventSourceArn"].as_str(),
2451 1,
2452 1600,
2453 )?;
2454 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
2455 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
2456 let name_prefix = body["NamePrefix"].as_str();
2457 let source_arn = body["EventSourceArn"].as_str();
2458 let archive_state = body["State"].as_str();
2459
2460 let filter_count = [
2462 name_prefix.is_some(),
2463 source_arn.is_some(),
2464 archive_state.is_some(),
2465 ]
2466 .iter()
2467 .filter(|&&x| x)
2468 .count();
2469 if filter_count > 1 {
2470 return Err(AwsServiceError::aws_error(
2471 StatusCode::BAD_REQUEST,
2472 "ValidationException",
2473 "At most one filter is allowed for ListArchives. Use either : State, EventSourceArn, or NamePrefix.",
2474 ));
2475 }
2476
2477 if let Some(s) = archive_state {
2479 let valid = [
2480 "ENABLED",
2481 "DISABLED",
2482 "CREATING",
2483 "UPDATING",
2484 "CREATE_FAILED",
2485 "UPDATE_FAILED",
2486 ];
2487 if !valid.contains(&s) {
2488 return Err(AwsServiceError::aws_error(
2489 StatusCode::BAD_REQUEST,
2490 "ValidationException",
2491 format!(
2492 "1 validation error detected: Value '{}' at 'state' failed to satisfy constraint: Member must satisfy enum value set: [ENABLED, DISABLED, CREATING, UPDATING, CREATE_FAILED, UPDATE_FAILED]",
2493 s
2494 ),
2495 ));
2496 }
2497 }
2498
2499 let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
2500
2501 let accounts = self.state.read();
2502 let empty = EventBridgeState::new(&req.account_id, &req.region);
2503 let state = accounts.get(&req.account_id).unwrap_or(&empty);
2504 let all: Vec<Value> = state
2505 .archives
2506 .values()
2507 .filter(|a| {
2508 if let Some(prefix) = name_prefix {
2509 a.name.starts_with(prefix)
2510 } else if let Some(arn) = source_arn {
2511 a.event_source_arn == arn
2512 } else if let Some(s) = archive_state {
2513 a.state == s
2514 } else {
2515 true
2516 }
2517 })
2518 .map(|a| {
2519 json!({
2520 "ArchiveName": a.name,
2521 "CreationTime": a.creation_time.timestamp() as f64,
2522 "EventCount": a.event_count,
2523 "EventSourceArn": a.event_source_arn,
2524 "RetentionDays": a.retention_days,
2525 "SizeBytes": a.size_bytes,
2526 "State": a.state,
2527 })
2528 })
2529 .collect();
2530
2531 let (archives, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
2532 let mut resp = json!({ "Archives": archives });
2533 if let Some(token) = next_token {
2534 resp["NextToken"] = json!(token);
2535 }
2536
2537 Ok(AwsResponse::ok_json(resp))
2538 }
2539
2540 fn update_archive(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2541 let body = req.json_body();
2542 validate_required("ArchiveName", &body["ArchiveName"])?;
2543 let name = body["ArchiveName"]
2544 .as_str()
2545 .ok_or_else(|| missing("ArchiveName"))?;
2546 validate_string_length("archiveName", name, 1, 48)?;
2547 validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2548 validate_optional_string_length("eventPattern", body["EventPattern"].as_str(), 0, 4096)?;
2549 if let Some(rd) = body["RetentionDays"].as_i64() {
2550 validate_range_i64("retentionDays", rd, 0, i64::MAX)?;
2551 }
2552
2553 if let Some(pattern) = body["EventPattern"].as_str() {
2555 validate_event_pattern(pattern)?;
2556 }
2557
2558 let mut accounts = self.state.write();
2559 let state = accounts.get_or_create(&req.account_id);
2560 let archive = state.archives.get_mut(name).ok_or_else(|| {
2561 AwsServiceError::aws_error(
2562 StatusCode::BAD_REQUEST,
2563 "ResourceNotFoundException",
2564 format!("Archive {name} does not exist."),
2565 )
2566 })?;
2567
2568 if let Some(desc) = body["Description"].as_str() {
2569 archive.description = Some(desc.to_string());
2570 }
2571 if let Some(pattern) = body["EventPattern"].as_str() {
2572 archive.event_pattern = Some(pattern.to_string());
2573 }
2574 if let Some(days) = body["RetentionDays"].as_i64() {
2575 archive.retention_days = days;
2576 }
2577
2578 Ok(AwsResponse::ok_json(json!({
2579 "ArchiveArn": archive.arn,
2580 "CreationTime": archive.creation_time.timestamp() as f64,
2581 "State": archive.state,
2582 })))
2583 }
2584
2585 fn delete_archive(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2586 let body = req.json_body();
2587 validate_required("ArchiveName", &body["ArchiveName"])?;
2588 let name = body["ArchiveName"]
2589 .as_str()
2590 .ok_or_else(|| missing("ArchiveName"))?;
2591 validate_string_length("archiveName", name, 1, 48)?;
2592
2593 let mut accounts = self.state.write();
2594 let state = accounts.get_or_create(&req.account_id);
2595 if !state.archives.contains_key(name) {
2596 return Err(AwsServiceError::aws_error(
2597 StatusCode::BAD_REQUEST,
2598 "ResourceNotFoundException",
2599 format!("Archive {name} does not exist."),
2600 ));
2601 }
2602
2603 state.archives.remove(name);
2604
2605 let rule_name = format!("Events-Archive-{name}");
2607 state.rules.retain(|k, _| k.1 != rule_name);
2608
2609 Ok(AwsResponse::ok_json(json!({})))
2610 }
2611
2612 fn create_connection(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2615 let body = req.json_body();
2616 validate_required("Name", &body["Name"])?;
2617 let name = body["Name"]
2618 .as_str()
2619 .ok_or_else(|| missing("Name"))?
2620 .to_string();
2621 validate_string_length("name", &name, 1, 64)?;
2622 validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2623 validate_required("AuthorizationType", &body["AuthorizationType"])?;
2624 let description = body["Description"].as_str().map(|s| s.to_string());
2625 let auth_type = body["AuthorizationType"]
2626 .as_str()
2627 .ok_or_else(|| missing("AuthorizationType"))?
2628 .to_string();
2629 validate_enum(
2630 "authorizationType",
2631 &auth_type,
2632 &["BASIC", "OAUTH_CLIENT_CREDENTIALS", "API_KEY"],
2633 )?;
2634 validate_optional_string_length(
2635 "kmsKeyIdentifier",
2636 body["KmsKeyIdentifier"].as_str(),
2637 0,
2638 2048,
2639 )?;
2640 validate_required("AuthParameters", &body["AuthParameters"])?;
2641 let auth_params = body["AuthParameters"].clone();
2642
2643 let mut accounts = self.state.write();
2644 let state = accounts.get_or_create(&req.account_id);
2645 let now = Utc::now();
2646 let conn_uuid = uuid::Uuid::new_v4();
2647 let arn = format!(
2648 "arn:aws:events:{}:{}:connection/{}/{}",
2649 req.region, state.account_id, name, conn_uuid
2650 );
2651 let secret_arn = format!(
2652 "arn:aws:secretsmanager:{}:{}:secret:events!connection/{}/{}",
2653 req.region, state.account_id, name, conn_uuid
2654 );
2655
2656 let conn = Connection {
2657 name: name.clone(),
2658 arn: arn.clone(),
2659 description,
2660 authorization_type: auth_type.clone(),
2661 auth_parameters: auth_params,
2662 connection_state: "AUTHORIZED".to_string(),
2663 secret_arn: secret_arn.clone(),
2664 creation_time: now,
2665 last_modified_time: now,
2666 last_authorized_time: now,
2667 };
2668 state.connections.insert(name, conn);
2669
2670 Ok(AwsResponse::ok_json(json!({
2671 "ConnectionArn": arn,
2672 "ConnectionState": "AUTHORIZED",
2673 "CreationTime": now.timestamp() as f64,
2674 "LastModifiedTime": now.timestamp() as f64,
2675 })))
2676 }
2677
2678 fn describe_connection(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2679 let body = req.json_body();
2680 validate_required("Name", &body["Name"])?;
2681 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
2682 validate_string_length("name", name, 1, 64)?;
2683
2684 let accounts = self.state.read();
2685 let empty = EventBridgeState::new(&req.account_id, &req.region);
2686 let state = accounts.get(&req.account_id).unwrap_or(&empty);
2687 let conn = state.connections.get(name).ok_or_else(|| {
2688 AwsServiceError::aws_error(
2689 StatusCode::BAD_REQUEST,
2690 "ResourceNotFoundException",
2691 format!("Connection '{name}' does not exist."),
2692 )
2693 })?;
2694
2695 let auth_params_response =
2697 build_auth_params_response(&conn.authorization_type, &conn.auth_parameters);
2698
2699 let mut resp = json!({
2700 "ConnectionArn": conn.arn,
2701 "Name": conn.name,
2702 "AuthorizationType": conn.authorization_type,
2703 "AuthParameters": auth_params_response,
2704 "ConnectionState": conn.connection_state,
2705 "SecretArn": conn.secret_arn,
2706 "CreationTime": conn.creation_time.timestamp() as f64,
2707 "LastModifiedTime": conn.last_modified_time.timestamp() as f64,
2708 "LastAuthorizedTime": conn.last_authorized_time.timestamp() as f64,
2709 });
2710 if let Some(ref desc) = conn.description {
2711 resp["Description"] = json!(desc);
2712 }
2713
2714 Ok(AwsResponse::ok_json(resp))
2715 }
2716
2717 fn list_connections(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2718 let body = req.json_body();
2719 validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 64)?;
2720 validate_optional_enum(
2721 "connectionState",
2722 body["ConnectionState"].as_str(),
2723 &[
2724 "CREATING",
2725 "UPDATING",
2726 "DELETING",
2727 "AUTHORIZED",
2728 "DEAUTHORIZED",
2729 "AUTHORIZING",
2730 "DEAUTHORIZING",
2731 "ACTIVE",
2732 "FAILED_CONNECTIVITY",
2733 ],
2734 )?;
2735 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
2736 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
2737
2738 let name_prefix = body["NamePrefix"].as_str();
2739 let connection_state = body["ConnectionState"].as_str();
2740 let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
2741
2742 let accounts = self.state.read();
2743 let empty = EventBridgeState::new(&req.account_id, &req.region);
2744 let state = accounts.get(&req.account_id).unwrap_or(&empty);
2745 let all: Vec<Value> = state
2746 .connections
2747 .values()
2748 .filter(|c| {
2749 if let Some(prefix) = name_prefix {
2750 if !c.name.starts_with(prefix) {
2751 return false;
2752 }
2753 }
2754 if let Some(cs) = connection_state {
2755 if c.connection_state != cs {
2756 return false;
2757 }
2758 }
2759 true
2760 })
2761 .map(|c| {
2762 json!({
2763 "ConnectionArn": c.arn,
2764 "Name": c.name,
2765 "AuthorizationType": c.authorization_type,
2766 "ConnectionState": c.connection_state,
2767 "CreationTime": c.creation_time.timestamp() as f64,
2768 "LastModifiedTime": c.last_modified_time.timestamp() as f64,
2769 "LastAuthorizedTime": c.last_authorized_time.timestamp() as f64,
2770 })
2771 })
2772 .collect();
2773
2774 let (conns, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
2775 let mut resp = json!({ "Connections": conns });
2776 if let Some(token) = next_token {
2777 resp["NextToken"] = json!(token);
2778 }
2779
2780 Ok(AwsResponse::ok_json(resp))
2781 }
2782
2783 fn update_connection(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2784 let body = req.json_body();
2785 validate_required("Name", &body["Name"])?;
2786 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
2787 validate_string_length("name", name, 1, 64)?;
2788 validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2789 validate_optional_enum(
2790 "authorizationType",
2791 body["AuthorizationType"].as_str(),
2792 &["BASIC", "OAUTH_CLIENT_CREDENTIALS", "API_KEY"],
2793 )?;
2794
2795 let mut accounts = self.state.write();
2796 let state = accounts.get_or_create(&req.account_id);
2797 let conn = state.connections.get_mut(name).ok_or_else(|| {
2798 AwsServiceError::aws_error(
2799 StatusCode::BAD_REQUEST,
2800 "ResourceNotFoundException",
2801 format!("Connection '{name}' does not exist."),
2802 )
2803 })?;
2804
2805 if let Some(desc) = body["Description"].as_str() {
2806 conn.description = Some(desc.to_string());
2807 }
2808 if let Some(auth_type) = body["AuthorizationType"].as_str() {
2809 conn.authorization_type = auth_type.to_string();
2810 }
2811 if body.get("AuthParameters").is_some() {
2812 conn.auth_parameters = body["AuthParameters"].clone();
2813 }
2814 conn.last_modified_time = Utc::now();
2815
2816 Ok(AwsResponse::ok_json(json!({
2817 "ConnectionArn": conn.arn,
2818 "ConnectionState": conn.connection_state,
2819 "CreationTime": conn.creation_time.timestamp() as f64,
2820 "LastModifiedTime": conn.last_modified_time.timestamp() as f64,
2821 "LastAuthorizedTime": conn.last_authorized_time.timestamp() as f64,
2822 })))
2823 }
2824
2825 fn delete_connection(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2826 let body = req.json_body();
2827 validate_required("Name", &body["Name"])?;
2828 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
2829 validate_string_length("name", name, 1, 64)?;
2830
2831 let mut accounts = self.state.write();
2832 let state = accounts.get_or_create(&req.account_id);
2833 let conn = state.connections.remove(name).ok_or_else(|| {
2834 AwsServiceError::aws_error(
2835 StatusCode::BAD_REQUEST,
2836 "ResourceNotFoundException",
2837 format!("Connection '{name}' does not exist."),
2838 )
2839 })?;
2840
2841 Ok(AwsResponse::ok_json(json!({
2842 "ConnectionArn": conn.arn,
2843 "ConnectionState": conn.connection_state,
2844 "CreationTime": conn.creation_time.timestamp() as f64,
2845 "LastModifiedTime": conn.last_modified_time.timestamp() as f64,
2846 "LastAuthorizedTime": conn.last_authorized_time.timestamp() as f64,
2847 })))
2848 }
2849
2850 fn create_api_destination(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2853 let body = req.json_body();
2854 validate_required("Name", &body["Name"])?;
2855 let name = body["Name"]
2856 .as_str()
2857 .ok_or_else(|| missing("Name"))?
2858 .to_string();
2859 validate_string_length("name", &name, 1, 64)?;
2860 validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2861 validate_required("ConnectionArn", &body["ConnectionArn"])?;
2862 let description = body["Description"].as_str().map(|s| s.to_string());
2863 let connection_arn = body["ConnectionArn"]
2864 .as_str()
2865 .ok_or_else(|| missing("ConnectionArn"))?
2866 .to_string();
2867 validate_string_length("connectionArn", &connection_arn, 1, 1600)?;
2868 validate_required("InvocationEndpoint", &body["InvocationEndpoint"])?;
2869 let endpoint = body["InvocationEndpoint"]
2870 .as_str()
2871 .ok_or_else(|| missing("InvocationEndpoint"))?
2872 .to_string();
2873 validate_string_length("invocationEndpoint", &endpoint, 1, 2048)?;
2874 validate_required("HttpMethod", &body["HttpMethod"])?;
2875 let http_method = body["HttpMethod"]
2876 .as_str()
2877 .ok_or_else(|| missing("HttpMethod"))?
2878 .to_string();
2879 validate_enum(
2880 "httpMethod",
2881 &http_method,
2882 &["POST", "GET", "HEAD", "OPTIONS", "PUT", "PATCH", "DELETE"],
2883 )?;
2884 let rate_limit = body["InvocationRateLimitPerSecond"].as_i64();
2885 if let Some(r) = rate_limit {
2886 validate_range_i64("invocationRateLimitPerSecond", r, 1, i64::MAX)?;
2887 }
2888
2889 let mut accounts = self.state.write();
2890 let state = accounts.get_or_create(&req.account_id);
2891 let now = Utc::now();
2892 let dest_uuid = uuid::Uuid::new_v4();
2893 let arn = format!(
2894 "arn:aws:events:{}:{}:api-destination/{}/{}",
2895 req.region, state.account_id, name, dest_uuid
2896 );
2897
2898 let dest = ApiDestination {
2899 name: name.clone(),
2900 arn: arn.clone(),
2901 description,
2902 connection_arn,
2903 invocation_endpoint: endpoint,
2904 http_method,
2905 invocation_rate_limit_per_second: rate_limit,
2906 state: "ACTIVE".to_string(),
2907 creation_time: now,
2908 last_modified_time: now,
2909 };
2910 state.api_destinations.insert(name, dest);
2911
2912 Ok(AwsResponse::ok_json(json!({
2913 "ApiDestinationArn": arn,
2914 "ApiDestinationState": "ACTIVE",
2915 "CreationTime": now.timestamp() as f64,
2916 "LastModifiedTime": now.timestamp() as f64,
2917 })))
2918 }
2919
2920 fn describe_api_destination(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2921 let body = req.json_body();
2922 validate_required("Name", &body["Name"])?;
2923 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
2924 validate_string_length("name", name, 1, 64)?;
2925
2926 let accounts = self.state.read();
2927 let empty = EventBridgeState::new(&req.account_id, &req.region);
2928 let state = accounts.get(&req.account_id).unwrap_or(&empty);
2929 let dest = state.api_destinations.get(name).ok_or_else(|| {
2930 AwsServiceError::aws_error(
2931 StatusCode::BAD_REQUEST,
2932 "ResourceNotFoundException",
2933 format!("An api-destination '{name}' does not exist."),
2934 )
2935 })?;
2936
2937 let mut resp = json!({
2938 "ApiDestinationArn": dest.arn,
2939 "Name": dest.name,
2940 "ConnectionArn": dest.connection_arn,
2941 "InvocationEndpoint": dest.invocation_endpoint,
2942 "HttpMethod": dest.http_method,
2943 "ApiDestinationState": dest.state,
2944 "CreationTime": dest.creation_time.timestamp() as f64,
2945 "LastModifiedTime": dest.last_modified_time.timestamp() as f64,
2946 });
2947 if let Some(ref desc) = dest.description {
2948 resp["Description"] = json!(desc);
2949 }
2950 if let Some(rate) = dest.invocation_rate_limit_per_second {
2951 resp["InvocationRateLimitPerSecond"] = json!(rate);
2952 }
2953
2954 Ok(AwsResponse::ok_json(resp))
2955 }
2956
2957 fn list_api_destinations(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2958 let body = req.json_body();
2959 validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 64)?;
2960 validate_optional_string_length("connectionArn", body["ConnectionArn"].as_str(), 1, 1600)?;
2961 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
2962 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
2963
2964 let name_prefix = body["NamePrefix"].as_str();
2965 let connection_arn = body["ConnectionArn"].as_str();
2966 let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
2967
2968 let accounts = self.state.read();
2969 let empty = EventBridgeState::new(&req.account_id, &req.region);
2970 let state = accounts.get(&req.account_id).unwrap_or(&empty);
2971 let all: Vec<Value> = state
2972 .api_destinations
2973 .values()
2974 .filter(|d| {
2975 if let Some(prefix) = name_prefix {
2976 if !d.name.starts_with(prefix) {
2977 return false;
2978 }
2979 }
2980 if let Some(arn) = connection_arn {
2981 if d.connection_arn != arn {
2982 return false;
2983 }
2984 }
2985 true
2986 })
2987 .map(|d| {
2988 let mut obj = json!({
2989 "ApiDestinationArn": d.arn,
2990 "Name": d.name,
2991 "ConnectionArn": d.connection_arn,
2992 "InvocationEndpoint": d.invocation_endpoint,
2993 "HttpMethod": d.http_method,
2994 "ApiDestinationState": d.state,
2995 "CreationTime": d.creation_time.timestamp() as f64,
2996 "LastModifiedTime": d.last_modified_time.timestamp() as f64,
2997 });
2998 if let Some(rate) = d.invocation_rate_limit_per_second {
2999 obj["InvocationRateLimitPerSecond"] = json!(rate);
3000 }
3001 obj
3002 })
3003 .collect();
3004
3005 let (dests, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
3006 let mut resp = json!({ "ApiDestinations": dests });
3007 if let Some(token) = next_token {
3008 resp["NextToken"] = json!(token);
3009 }
3010
3011 Ok(AwsResponse::ok_json(resp))
3012 }
3013
3014 fn update_api_destination(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3015 let body = req.json_body();
3016 validate_required("Name", &body["Name"])?;
3017 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
3018 validate_string_length("name", name, 1, 64)?;
3019 validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
3020 validate_optional_string_length("connectionArn", body["ConnectionArn"].as_str(), 1, 1600)?;
3021 validate_optional_string_length(
3022 "invocationEndpoint",
3023 body["InvocationEndpoint"].as_str(),
3024 1,
3025 2048,
3026 )?;
3027 validate_optional_enum(
3028 "httpMethod",
3029 body["HttpMethod"].as_str(),
3030 &["POST", "GET", "HEAD", "OPTIONS", "PUT", "PATCH", "DELETE"],
3031 )?;
3032 if let Some(r) = body["InvocationRateLimitPerSecond"].as_i64() {
3033 validate_range_i64("invocationRateLimitPerSecond", r, 1, i64::MAX)?;
3034 }
3035
3036 let mut accounts = self.state.write();
3037 let state = accounts.get_or_create(&req.account_id);
3038 let dest = state.api_destinations.get_mut(name).ok_or_else(|| {
3039 AwsServiceError::aws_error(
3040 StatusCode::BAD_REQUEST,
3041 "ResourceNotFoundException",
3042 format!("An api-destination '{name}' does not exist."),
3043 )
3044 })?;
3045
3046 if let Some(desc) = body["Description"].as_str() {
3047 dest.description = Some(desc.to_string());
3048 }
3049 if let Some(endpoint) = body["InvocationEndpoint"].as_str() {
3050 dest.invocation_endpoint = endpoint.to_string();
3051 }
3052 if let Some(method) = body["HttpMethod"].as_str() {
3053 dest.http_method = method.to_string();
3054 }
3055 if let Some(rate) = body["InvocationRateLimitPerSecond"].as_i64() {
3056 dest.invocation_rate_limit_per_second = Some(rate);
3057 }
3058 if let Some(conn) = body["ConnectionArn"].as_str() {
3059 dest.connection_arn = conn.to_string();
3060 }
3061 dest.last_modified_time = Utc::now();
3062
3063 Ok(AwsResponse::ok_json(json!({
3064 "ApiDestinationArn": dest.arn,
3065 "ApiDestinationState": dest.state,
3066 "CreationTime": dest.creation_time.timestamp() as f64,
3067 "LastModifiedTime": dest.last_modified_time.timestamp() as f64,
3068 })))
3069 }
3070
3071 fn delete_api_destination(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3072 let body = req.json_body();
3073 validate_required("Name", &body["Name"])?;
3074 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
3075 validate_string_length("name", name, 1, 64)?;
3076
3077 let mut accounts = self.state.write();
3078 let state = accounts.get_or_create(&req.account_id);
3079 if !state.api_destinations.contains_key(name) {
3080 return Err(AwsServiceError::aws_error(
3081 StatusCode::BAD_REQUEST,
3082 "ResourceNotFoundException",
3083 format!("An api-destination '{name}' does not exist."),
3084 ));
3085 }
3086 state.api_destinations.remove(name);
3087
3088 Ok(AwsResponse::ok_json(json!({})))
3089 }
3090
3091 fn start_replay(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3094 let input = StartReplayInput::from_body(&req.json_body())?;
3095
3096 let mut accounts = self.state.write();
3097 let state = accounts.get_or_create(&req.account_id);
3098
3099 let bus_name = state.resolve_bus_name(&input.destination_arn);
3101 if !state.buses.contains_key(&bus_name) {
3102 return Err(AwsServiceError::aws_error(
3103 StatusCode::BAD_REQUEST,
3104 "ResourceNotFoundException",
3105 format!("Event bus {bus_name} does not exist."),
3106 ));
3107 }
3108
3109 let archive_name = input
3110 .event_source_arn
3111 .rsplit_once("archive/")
3112 .map(|(_, n)| n.to_string())
3113 .unwrap_or_default();
3114 let archive = state.archives.get(&archive_name).ok_or_else(|| {
3115 AwsServiceError::aws_error(
3116 StatusCode::BAD_REQUEST,
3117 "ValidationException",
3118 format!(
3119 "Parameter EventSourceArn is not valid. Reason: Archive {archive_name} does not exist."
3120 ),
3121 )
3122 })?;
3123 let archive_bus = state.resolve_bus_name(&archive.event_source_arn);
3124 if archive_bus != bus_name {
3125 return Err(AwsServiceError::aws_error(
3126 StatusCode::BAD_REQUEST,
3127 "ValidationException",
3128 "Parameter Destination.Arn is not valid. Reason: Cross event bus replay is not permitted.",
3129 ));
3130 }
3131
3132 if input.event_end_time <= input.event_start_time {
3133 return Err(AwsServiceError::aws_error(
3134 StatusCode::BAD_REQUEST,
3135 "ValidationException",
3136 "Parameter EventEndTime is not valid. Reason: EventStartTime must be before EventEndTime.",
3137 ));
3138 }
3139
3140 if state.replays.contains_key(&input.name) {
3141 return Err(AwsServiceError::aws_error(
3142 StatusCode::BAD_REQUEST,
3143 "ResourceAlreadyExistsException",
3144 format!("Replay {} already exists.", input.name),
3145 ));
3146 }
3147
3148 let now = Utc::now();
3149 let arn = format!(
3150 "arn:aws:events:{}:{}:replay/{}",
3151 req.region, state.account_id, input.name
3152 );
3153
3154 let events_to_deliver = collect_replay_events_with_targets(
3155 state,
3156 &archive_name,
3157 &bus_name,
3158 input.event_start_time,
3159 input.event_end_time,
3160 &req.account_id,
3161 &req.region,
3162 );
3163
3164 let replay = Replay {
3165 name: input.name.clone(),
3166 arn: arn.clone(),
3167 description: input.description,
3168 event_source_arn: input.event_source_arn,
3169 destination: input.destination,
3170 event_start_time: input.event_start_time,
3171 event_end_time: input.event_end_time,
3172 state: "COMPLETED".to_string(),
3173 replay_start_time: now,
3174 replay_end_time: Some(now),
3175 };
3176 state.replays.insert(input.name, replay);
3177
3178 drop(accounts);
3179
3180 for (event, targets) in events_to_deliver {
3181 let detail_value: Value = serde_json::from_str(&event.detail).unwrap_or(json!({}));
3182 let event_json = json!({
3183 "version": "0",
3184 "id": event.event_id,
3185 "source": event.source,
3186 "account": req.account_id,
3187 "detail-type": event.detail_type,
3188 "detail": detail_value,
3189 "time": event.time.format("%Y-%m-%dT%H:%M:%SZ").to_string(),
3190 "region": req.region,
3191 "resources": event.resources,
3192 "replay-name": arn,
3193 });
3194 let event_str = event_json.to_string();
3195
3196 for target in targets {
3197 self.deliver_replay_event_to_target(
3198 &target,
3199 &event,
3200 &event_json,
3201 &event_str,
3202 &req.account_id,
3203 );
3204 }
3205 }
3206
3207 Ok(AwsResponse::ok_json(json!({
3208 "ReplayArn": arn,
3209 "ReplayStartTime": now.timestamp() as f64,
3210 "State": "STARTING",
3211 })))
3212 }
3213
3214 fn deliver_replay_event_to_target(
3215 &self,
3216 target: &EventTarget,
3217 event: &PutEvent,
3218 event_json: &Value,
3219 event_str: &str,
3220 account_id: &str,
3221 ) {
3222 let target_arn = &target.arn;
3223 let body_str = if let Some(ref transformer) = target.input_transformer {
3224 apply_input_transformer(transformer, event_json)
3225 } else if let Some(ref input) = target.input {
3226 input.clone()
3227 } else if let Some(ref input_path) = target.input_path {
3228 resolve_json_path(event_json, input_path)
3229 .map(|v| v.to_string())
3230 .unwrap_or_else(|| event_str.to_string())
3231 } else {
3232 event_str.to_string()
3233 };
3234
3235 if target_arn.contains(":sqs:") {
3236 let group_id = target
3237 .sqs_parameters
3238 .as_ref()
3239 .and_then(|p| p["MessageGroupId"].as_str())
3240 .map(|s| s.to_string());
3241 if group_id.is_some() {
3242 self.delivery.send_to_sqs_with_attrs(
3243 target_arn,
3244 &body_str,
3245 &HashMap::new(),
3246 group_id.as_deref(),
3247 None,
3248 );
3249 } else {
3250 self.delivery
3251 .send_to_sqs(target_arn, &body_str, &HashMap::new());
3252 }
3253 } else if target_arn.contains(":sns:") {
3254 self.delivery
3255 .publish_to_sns(target_arn, &body_str, Some(&event.detail_type));
3256 } else if target_arn.contains(":lambda:") {
3257 let mut accounts = self.state.write();
3258 let state = accounts.get_or_create(account_id);
3259 state
3260 .lambda_invocations
3261 .push(crate::state::LambdaInvocation {
3262 function_arn: target_arn.clone(),
3263 payload: body_str.clone(),
3264 timestamp: Utc::now(),
3265 });
3266 drop(accounts);
3267 if let Some(ref ls) = self.lambda_state {
3268 ls.write()
3269 .get_or_create(account_id)
3270 .invocations
3271 .push(LambdaInvocation {
3272 function_arn: target_arn.clone(),
3273 payload: body_str.clone(),
3274 timestamp: Utc::now(),
3275 source: "aws:events".to_string(),
3276 });
3277 }
3278 invoke_lambda_async(
3279 &self.container_runtime,
3280 &self.lambda_state,
3281 target_arn,
3282 &body_str,
3283 );
3284 } else if target_arn.contains(":logs:") {
3285 let mut accounts = self.state.write();
3286 let state = accounts.get_or_create(account_id);
3287 state.log_deliveries.push(crate::state::LogDelivery {
3288 log_group_arn: target_arn.clone(),
3289 payload: body_str.clone(),
3290 timestamp: Utc::now(),
3291 });
3292 drop(accounts);
3293 if let Some(ref log_state) = self.logs_state {
3294 deliver_to_logs(log_state, target_arn, &body_str, Utc::now());
3295 }
3296 } else if target_arn.contains(":states:") {
3297 self.delivery
3298 .start_stepfunctions_execution(target_arn, &body_str);
3299 let mut accounts = self.state.write();
3300 let state = accounts.get_or_create(account_id);
3301 state
3302 .step_function_executions
3303 .push(crate::state::StepFunctionExecution {
3304 state_machine_arn: target_arn.clone(),
3305 payload: body_str.clone(),
3306 timestamp: Utc::now(),
3307 });
3308 } else if target_arn.starts_with("https://") || target_arn.starts_with("http://") {
3309 let url = target_arn.clone();
3310 let payload = body_str.clone();
3311 tokio::spawn(async move {
3312 let client = reqwest::Client::new();
3313 let _ = client
3314 .post(&url)
3315 .header("Content-Type", "application/json")
3316 .body(payload)
3317 .send()
3318 .await;
3319 });
3320 }
3321 }
3322
3323 fn describe_replay(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3324 let body = req.json_body();
3325 validate_required("ReplayName", &body["ReplayName"])?;
3326 let name = body["ReplayName"]
3327 .as_str()
3328 .ok_or_else(|| missing("ReplayName"))?;
3329 validate_string_length("replayName", name, 1, 64)?;
3330
3331 let accounts = self.state.read();
3332 let empty = EventBridgeState::new(&req.account_id, &req.region);
3333 let state = accounts.get(&req.account_id).unwrap_or(&empty);
3334 let replay = state.replays.get(name).ok_or_else(|| {
3335 AwsServiceError::aws_error(
3336 StatusCode::BAD_REQUEST,
3337 "ResourceNotFoundException",
3338 format!("Replay {name} does not exist."),
3339 )
3340 })?;
3341
3342 let mut resp = json!({
3343 "Destination": replay.destination,
3344 "EventSourceArn": replay.event_source_arn,
3345 "EventStartTime": replay.event_start_time.timestamp() as f64,
3346 "EventEndTime": replay.event_end_time.timestamp() as f64,
3347 "ReplayArn": replay.arn,
3348 "ReplayName": replay.name,
3349 "ReplayStartTime": replay.replay_start_time.timestamp() as f64,
3350 "State": replay.state,
3351 });
3352 if let Some(ref desc) = replay.description {
3353 resp["Description"] = json!(desc);
3354 }
3355 if let Some(ref end) = replay.replay_end_time {
3356 resp["ReplayEndTime"] = json!(end.timestamp() as f64);
3357 }
3358
3359 Ok(AwsResponse::ok_json(resp))
3360 }
3361
3362 fn list_replays(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3363 let body = req.json_body();
3364 validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 64)?;
3365 validate_optional_string_length(
3366 "eventSourceArn",
3367 body["EventSourceArn"].as_str(),
3368 1,
3369 1600,
3370 )?;
3371 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
3372 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
3373 let name_prefix = body["NamePrefix"].as_str();
3374 let source_arn = body["EventSourceArn"].as_str();
3375 let replay_state = body["State"].as_str();
3376
3377 let filter_count = [
3379 name_prefix.is_some(),
3380 source_arn.is_some(),
3381 replay_state.is_some(),
3382 ]
3383 .iter()
3384 .filter(|&&x| x)
3385 .count();
3386 if filter_count > 1 {
3387 return Err(AwsServiceError::aws_error(
3388 StatusCode::BAD_REQUEST,
3389 "ValidationException",
3390 "At most one filter is allowed for ListReplays. Use either : State, EventSourceArn, or NamePrefix.",
3391 ));
3392 }
3393
3394 if let Some(s) = replay_state {
3396 let valid = [
3397 "CANCELLED",
3398 "CANCELLING",
3399 "COMPLETED",
3400 "FAILED",
3401 "RUNNING",
3402 "STARTING",
3403 ];
3404 if !valid.contains(&s) {
3405 return Err(AwsServiceError::aws_error(
3406 StatusCode::BAD_REQUEST,
3407 "ValidationException",
3408 format!(
3409 "1 validation error detected: Value '{}' at 'state' failed to satisfy constraint: Member must satisfy enum value set: [CANCELLED, CANCELLING, COMPLETED, FAILED, RUNNING, STARTING]",
3410 s
3411 ),
3412 ));
3413 }
3414 }
3415
3416 let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
3417
3418 let accounts = self.state.read();
3419 let empty = EventBridgeState::new(&req.account_id, &req.region);
3420 let state = accounts.get(&req.account_id).unwrap_or(&empty);
3421 let all: Vec<Value> = state
3422 .replays
3423 .values()
3424 .filter(|r| {
3425 if let Some(prefix) = name_prefix {
3426 r.name.starts_with(prefix)
3427 } else if let Some(arn) = source_arn {
3428 r.event_source_arn == arn
3429 } else if let Some(s) = replay_state {
3430 r.state == s
3431 } else {
3432 true
3433 }
3434 })
3435 .map(|r| {
3436 let mut obj = json!({
3437 "EventSourceArn": r.event_source_arn,
3438 "EventStartTime": r.event_start_time.timestamp() as f64,
3439 "EventEndTime": r.event_end_time.timestamp() as f64,
3440 "ReplayName": r.name,
3441 "ReplayStartTime": r.replay_start_time.timestamp() as f64,
3442 "State": r.state,
3443 });
3444 if let Some(ref end) = r.replay_end_time {
3445 obj["ReplayEndTime"] = json!(end.timestamp() as f64);
3446 }
3447 obj
3448 })
3449 .collect();
3450
3451 let (replays, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
3452 let mut resp = json!({ "Replays": replays });
3453 if let Some(token) = next_token {
3454 resp["NextToken"] = json!(token);
3455 }
3456
3457 Ok(AwsResponse::ok_json(resp))
3458 }
3459
3460 fn cancel_replay(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3461 let body = req.json_body();
3462 validate_required("ReplayName", &body["ReplayName"])?;
3463 let name = body["ReplayName"]
3464 .as_str()
3465 .ok_or_else(|| missing("ReplayName"))?;
3466 validate_string_length("replayName", name, 1, 64)?;
3467
3468 let mut accounts = self.state.write();
3469 let state = accounts.get_or_create(&req.account_id);
3470 let replay = state.replays.get_mut(name).ok_or_else(|| {
3471 AwsServiceError::aws_error(
3472 StatusCode::BAD_REQUEST,
3473 "ResourceNotFoundException",
3474 format!("Replay {name} does not exist."),
3475 )
3476 })?;
3477
3478 if replay.state == "CANCELLED" || replay.state == "CANCELLING" {
3480 return Err(AwsServiceError::aws_error(
3481 StatusCode::BAD_REQUEST,
3482 "IllegalStatusException",
3483 format!("Replay {name} is not in a valid state for this operation."),
3484 ));
3485 }
3486
3487 let arn = replay.arn.clone();
3488 replay.state = "CANCELLED".to_string();
3489
3490 Ok(AwsResponse::ok_json(json!({
3491 "ReplayArn": arn,
3492 "State": "CANCELLING",
3493 })))
3494 }
3495}
3496
3497fn find_tags_mut<'a>(
3500 state: &'a mut crate::state::EventBridgeState,
3501 arn: &str,
3502) -> Result<&'a mut HashMap<String, String>, AwsServiceError> {
3503 for bus in state.buses.values_mut() {
3505 if bus.arn == arn {
3506 return Ok(&mut bus.tags);
3507 }
3508 }
3509 for rule in state.rules.values_mut() {
3511 if rule.arn == arn {
3512 return Ok(&mut rule.tags);
3513 }
3514 }
3515
3516 let error_msg = if arn.contains(":rule/") {
3518 let parts: Vec<&str> = arn.rsplitn(2, ":rule/").collect();
3520 if let Some(rule_path) = parts.first() {
3521 if let Some((bus, rule_name)) = rule_path.rsplit_once('/') {
3522 format!("Rule {rule_name} does not exist on EventBus {bus}.")
3523 } else {
3524 format!("Rule {} does not exist on EventBus default.", rule_path)
3525 }
3526 } else {
3527 format!("Resource {arn} not found.")
3528 }
3529 } else {
3530 format!("Resource {arn} not found.")
3531 };
3532
3533 Err(AwsServiceError::aws_error(
3534 StatusCode::BAD_REQUEST,
3535 "ResourceNotFoundException",
3536 error_msg,
3537 ))
3538}
3539
3540fn find_tags<'a>(
3541 state: &'a crate::state::EventBridgeState,
3542 arn: &str,
3543) -> Result<&'a HashMap<String, String>, AwsServiceError> {
3544 for bus in state.buses.values() {
3545 if bus.arn == arn {
3546 return Ok(&bus.tags);
3547 }
3548 }
3549 for rule in state.rules.values() {
3550 if rule.arn == arn {
3551 return Ok(&rule.tags);
3552 }
3553 }
3554
3555 let error_msg = if arn.contains(":rule/") {
3556 let parts: Vec<&str> = arn.rsplitn(2, ":rule/").collect();
3557 if let Some(rule_path) = parts.first() {
3558 if let Some((bus, rule_name)) = rule_path.rsplit_once('/') {
3559 format!("Rule {rule_name} does not exist on EventBus {bus}.")
3560 } else {
3561 format!("Rule {} does not exist on EventBus default.", rule_path)
3562 }
3563 } else {
3564 format!("Resource {arn} not found.")
3565 }
3566 } else {
3567 format!("Resource {arn} not found.")
3568 };
3569
3570 Err(AwsServiceError::aws_error(
3571 StatusCode::BAD_REQUEST,
3572 "ResourceNotFoundException",
3573 error_msg,
3574 ))
3575}
3576
3577fn validate_event_pattern(pattern: &str) -> Result<(), AwsServiceError> {
3580 let parsed: Value = serde_json::from_str(pattern).map_err(|_| {
3581 AwsServiceError::aws_error(
3582 StatusCode::BAD_REQUEST,
3583 "InvalidEventPatternException",
3584 "Event pattern is not valid. Reason: Invalid JSON",
3585 )
3586 })?;
3587
3588 validate_pattern_values(&parsed, "")?;
3589 Ok(())
3590}
3591
3592fn validate_pattern_values(value: &Value, path: &str) -> Result<(), AwsServiceError> {
3593 match value {
3594 Value::Object(obj) => {
3595 for (key, val) in obj {
3596 let new_path = if path.is_empty() {
3597 key.clone()
3598 } else {
3599 format!("{path}.{key}")
3600 };
3601 match val {
3602 Value::Object(_) => validate_pattern_values(val, &new_path)?,
3603 Value::Array(_) => {} _ => {
3605 return Err(AwsServiceError::aws_error(
3606 StatusCode::BAD_REQUEST,
3607 "InvalidEventPatternException",
3608 format!(
3609 "Event pattern is not valid. Reason: '{}' must be an object or an array",
3610 key
3611 ),
3612 ));
3613 }
3614 }
3615 }
3616 Ok(())
3617 }
3618 _ => Ok(()),
3619 }
3620}
3621
3622fn build_auth_params_response(auth_type: &str, params: &Value) -> Value {
3625 match auth_type {
3626 "API_KEY" => {
3627 let mut resp = json!({});
3628 if let Some(api_key) = params.get("ApiKeyAuthParameters") {
3629 resp["ApiKeyAuthParameters"] = json!({
3630 "ApiKeyName": api_key["ApiKeyName"],
3631 });
3632 }
3633 resp
3634 }
3635 "BASIC" => {
3636 let mut resp = json!({});
3637 if let Some(basic) = params.get("BasicAuthParameters") {
3638 resp["BasicAuthParameters"] = json!({
3639 "Username": basic["Username"],
3640 });
3641 }
3642 resp
3643 }
3644 "OAUTH_CLIENT_CREDENTIALS" => {
3645 let mut resp = json!({});
3646 if let Some(oauth) = params.get("OAuthParameters") {
3647 resp["OAuthParameters"] = json!({
3648 "AuthorizationEndpoint": oauth["AuthorizationEndpoint"],
3649 "HttpMethod": oauth["HttpMethod"],
3650 "ClientParameters": {
3651 "ClientID": oauth.get("ClientParameters").and_then(|c| c.get("ClientID")),
3652 },
3653 });
3654 }
3655 resp
3656 }
3657 _ => params.clone(),
3658 }
3659}
3660
3661pub fn matches_pattern(
3665 pattern_json: Option<&str>,
3666 source: &str,
3667 detail_type: &str,
3668 detail: &str,
3669 account: &str,
3670 region: &str,
3671 resources: &[String],
3672) -> bool {
3673 let pattern_json = match pattern_json {
3674 Some(p) => p,
3675 None => return true,
3676 };
3677
3678 let pattern: Value = match serde_json::from_str(pattern_json) {
3679 Ok(v) => v,
3680 Err(_) => return false,
3681 };
3682
3683 let pattern_obj = match pattern.as_object() {
3684 Some(o) => o,
3685 None => return false,
3686 };
3687
3688 let detail_value: Value = serde_json::from_str(detail).unwrap_or(json!({}));
3689 let event = json!({
3690 "source": source,
3691 "detail-type": detail_type,
3692 "detail": detail_value,
3693 "account": account,
3694 "region": region,
3695 "resources": resources,
3696 });
3697
3698 for (key, pattern_value) in pattern_obj {
3699 let event_value = &event[key];
3700 if !matches_value(pattern_value, event_value) {
3701 return false;
3702 }
3703 }
3704
3705 true
3706}
3707
3708fn matches_value(pattern: &Value, event_value: &Value) -> bool {
3709 match pattern {
3710 Value::Object(obj) => {
3711 for (key, sub_pattern) in obj {
3712 let sub_value = &event_value[key];
3713 if !matches_value(sub_pattern, sub_value) {
3714 return false;
3715 }
3716 }
3717 true
3718 }
3719 Value::Array(arr) => arr.iter().any(|elem| matches_single(elem, event_value)),
3720 _ => false,
3721 }
3722}
3723
3724fn matches_single(pattern_elem: &Value, event_value: &Value) -> bool {
3725 match pattern_elem {
3726 Value::Object(obj) => {
3727 if let Some(prefix_val) = obj.get("prefix") {
3728 if let (Some(prefix), Some(actual)) = (prefix_val.as_str(), event_value.as_str()) {
3729 return actual.starts_with(prefix);
3730 }
3731 return false;
3732 }
3733 if let Some(exists_val) = obj.get("exists") {
3734 let should_exist = exists_val.as_bool().unwrap_or(true);
3735 let does_exist = !event_value.is_null();
3736 return should_exist == does_exist;
3737 }
3738 if let Some(anything_but_val) = obj.get("anything-but") {
3739 return match anything_but_val {
3740 Value::String(s) => event_value.as_str() != Some(s.as_str()),
3741 Value::Array(arr) => !arr.iter().any(|v| values_equal(v, event_value)),
3742 Value::Number(_) => event_value != anything_but_val,
3743 _ => true,
3744 };
3745 }
3746 if let Some(numeric_val) = obj.get("numeric") {
3747 return matches_numeric(numeric_val, event_value);
3748 }
3749 false
3750 }
3751 _ => values_equal(pattern_elem, event_value),
3752 }
3753}
3754
3755#[allow(clippy::too_many_arguments)]
3759fn archive_matching_event(
3760 state: &mut crate::state::EventBridgeState,
3761 event: &PutEvent,
3762 event_bus_name: &str,
3763 source: &str,
3764 detail_type: &str,
3765 detail: &str,
3766 account_id: &str,
3767 region: &str,
3768 resources: &[String],
3769) {
3770 let archive_keys: Vec<String> = state.archives.keys().cloned().collect();
3771 for akey in archive_keys {
3772 let (archive_bus, archive_pattern, archive_enabled) = {
3773 let a = &state.archives[&akey];
3774 (
3775 state.resolve_bus_name(&a.event_source_arn),
3776 a.event_pattern.clone(),
3777 a.state == "ENABLED",
3778 )
3779 };
3780 if archive_bus != event_bus_name || !archive_enabled {
3781 continue;
3782 }
3783 let pattern_matches = matches_pattern(
3784 archive_pattern.as_deref(),
3785 source,
3786 detail_type,
3787 detail,
3788 account_id,
3789 region,
3790 resources,
3791 );
3792 if !pattern_matches {
3793 continue;
3794 }
3795 if let Some(archive) = state.archives.get_mut(&akey) {
3796 archive.event_count += 1;
3797 archive.size_bytes += detail.len() as i64;
3798 archive.events.push(event.clone());
3799 }
3800 }
3801}
3802
3803struct StartReplayInput {
3805 name: String,
3806 description: Option<String>,
3807 event_source_arn: String,
3808 destination: Value,
3809 destination_arn: String,
3810 event_start_time: DateTime<Utc>,
3811 event_end_time: DateTime<Utc>,
3812}
3813
3814impl StartReplayInput {
3815 fn from_body(body: &Value) -> Result<Self, AwsServiceError> {
3816 validate_required("ReplayName", &body["ReplayName"])?;
3817 let name = body["ReplayName"]
3818 .as_str()
3819 .ok_or_else(|| missing("ReplayName"))?
3820 .to_string();
3821 validate_string_length("replayName", &name, 1, 64)?;
3822 validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
3823 validate_required("EventSourceArn", &body["EventSourceArn"])?;
3824 let description = body["Description"].as_str().map(|s| s.to_string());
3825 let event_source_arn = body["EventSourceArn"]
3826 .as_str()
3827 .ok_or_else(|| missing("EventSourceArn"))?
3828 .to_string();
3829 validate_string_length("eventSourceArn", &event_source_arn, 1, 1600)?;
3830 validate_required("EventStartTime", &body["EventStartTime"])?;
3831 validate_required("EventEndTime", &body["EventEndTime"])?;
3832 validate_required("Destination", &body["Destination"])?;
3833 let destination = body["Destination"].clone();
3834
3835 let event_start_time = body["EventStartTime"]
3836 .as_f64()
3837 .and_then(|f| DateTime::from_timestamp(f as i64, 0))
3838 .unwrap_or_else(Utc::now);
3839 let event_end_time = body["EventEndTime"]
3840 .as_f64()
3841 .and_then(|f| DateTime::from_timestamp(f as i64, 0))
3842 .unwrap_or_else(Utc::now);
3843
3844 let destination_arn = destination["Arn"].as_str().unwrap_or("").to_string();
3845 if !destination_arn.contains(":event-bus/") {
3846 return Err(AwsServiceError::aws_error(
3847 StatusCode::BAD_REQUEST,
3848 "ValidationException",
3849 "Parameter Destination.Arn is not valid. Reason: Must contain an event bus ARN.",
3850 ));
3851 }
3852
3853 Ok(Self {
3854 name,
3855 description,
3856 event_source_arn,
3857 destination,
3858 destination_arn,
3859 event_start_time,
3860 event_end_time,
3861 })
3862 }
3863}
3864
3865#[allow(clippy::too_many_arguments)]
3870fn collect_replay_events_with_targets(
3871 state: &crate::state::EventBridgeState,
3872 archive_name: &str,
3873 bus_name: &str,
3874 event_start_time: DateTime<Utc>,
3875 event_end_time: DateTime<Utc>,
3876 account_id: &str,
3877 region: &str,
3878) -> Vec<(PutEvent, Vec<EventTarget>)> {
3879 let Some(archive) = state.archives.get(archive_name) else {
3880 return Vec::new();
3881 };
3882
3883 let replay_events: Vec<PutEvent> = archive
3884 .events
3885 .iter()
3886 .filter(|e| e.time >= event_start_time && e.time < event_end_time)
3887 .cloned()
3888 .collect();
3889
3890 let mut events_to_deliver: Vec<(PutEvent, Vec<EventTarget>)> = Vec::new();
3891 for event in replay_events {
3892 let matching_targets: Vec<EventTarget> = state
3893 .rules
3894 .values()
3895 .filter(|r| {
3896 r.event_bus_name == bus_name
3897 && r.state == "ENABLED"
3898 && matches_pattern(
3899 r.event_pattern.as_deref(),
3900 &event.source,
3901 &event.detail_type,
3902 &event.detail,
3903 account_id,
3904 region,
3905 &event.resources,
3906 )
3907 })
3908 .flat_map(|r| r.targets.clone())
3909 .collect();
3910
3911 if !matching_targets.is_empty() {
3912 events_to_deliver.push((event, matching_targets));
3913 }
3914 }
3915 events_to_deliver
3916}
3917
3918fn matches_numeric(numeric_arr: &Value, event_value: &Value) -> bool {
3919 let arr = match numeric_arr.as_array() {
3920 Some(a) => a,
3921 None => return false,
3922 };
3923 let actual = match event_value.as_f64() {
3924 Some(n) => n,
3925 None => return false,
3926 };
3927 let mut i = 0;
3928 while i + 1 < arr.len() {
3929 let op = match arr[i].as_str() {
3930 Some(s) => s,
3931 None => return false,
3932 };
3933 let threshold = match arr[i + 1].as_f64() {
3934 Some(n) => n,
3935 None => return false,
3936 };
3937 let ok = match op {
3938 ">" => actual > threshold,
3939 ">=" => actual >= threshold,
3940 "<" => actual < threshold,
3941 "<=" => actual <= threshold,
3942 "=" => (actual - threshold).abs() < f64::EPSILON,
3943 _ => return false,
3944 };
3945 if !ok {
3946 return false;
3947 }
3948 i += 2;
3949 }
3950 true
3951}
3952
3953fn values_equal(a: &Value, b: &Value) -> bool {
3954 a == b
3955}
3956
3957fn resolve_json_path(event: &Value, path: &str) -> Option<Value> {
3959 let path = path.strip_prefix('$').unwrap_or(path);
3960 let mut current = event;
3961 for segment in path.split('.') {
3962 if segment.is_empty() {
3963 continue;
3964 }
3965 current = current.get(segment)?;
3966 }
3967 Some(current.clone())
3968}
3969
3970fn apply_input_transformer(transformer: &Value, event: &Value) -> String {
3972 let input_paths_map = transformer
3973 .get("InputPathsMap")
3974 .and_then(|v| v.as_object())
3975 .cloned()
3976 .unwrap_or_default();
3977 let template = transformer
3978 .get("InputTemplate")
3979 .and_then(|v| v.as_str())
3980 .unwrap_or("")
3981 .to_string();
3982
3983 let mut resolved: HashMap<String, Value> = HashMap::new();
3985 for (var_name, path_val) in &input_paths_map {
3986 if let Some(path_str) = path_val.as_str() {
3987 if let Some(val) = resolve_json_path(event, path_str) {
3988 resolved.insert(var_name.clone(), val);
3989 }
3990 }
3991 }
3992
3993 let mut result = template;
3995 for (var_name, val) in &resolved {
3996 let placeholder = format!("<{var_name}>");
3997 let replacement = match val {
3998 Value::String(s) => s.clone(),
3999 other => other.to_string(),
4000 };
4001 result = result.replace(&placeholder, &replacement);
4002 }
4003
4004 result
4005}
4006
4007fn missing(name: &str) -> AwsServiceError {
4008 AwsServiceError::aws_error(
4009 StatusCode::BAD_REQUEST,
4010 "ValidationException",
4011 format!("The request must contain the parameter {name}"),
4012 )
4013}
4014
4015fn function_name_from_arn(arn: &str) -> &str {
4020 let parts: Vec<&str> = arn.split(':').collect();
4021 if parts.len() >= 7 && parts[5] == "function" {
4022 parts[6]
4023 } else {
4024 arn
4025 }
4026}
4027
4028pub fn invoke_lambda_async(
4031 container_runtime: &Option<Arc<ContainerRuntime>>,
4032 lambda_state: &Option<SharedLambdaState>,
4033 function_arn: &str,
4034 payload: &str,
4035) {
4036 let runtime = match container_runtime {
4037 Some(rt) => rt.clone(),
4038 None => return,
4039 };
4040 let lambda_state = match lambda_state {
4041 Some(ls) => ls.clone(),
4042 None => return,
4043 };
4044 let func_name = function_name_from_arn(function_arn).to_string();
4045 let payload = payload.as_bytes().to_vec();
4046
4047 tokio::spawn(async move {
4048 let func = {
4049 let accounts = lambda_state.read();
4050 let state = accounts.default_ref();
4051 state.functions.get(&func_name).cloned()
4052 };
4053 let func = match func {
4054 Some(f) => f,
4055 None => {
4056 tracing::warn!(
4057 function = %func_name,
4058 "EventBridge Lambda target not found, skipping invocation"
4059 );
4060 return;
4061 }
4062 };
4063 match runtime.invoke(&func, &payload).await {
4064 Ok(_) => {
4065 tracing::info!(function = %func_name, "EventBridge Lambda invocation succeeded");
4066 }
4067 Err(e) => {
4068 tracing::warn!(
4069 function = %func_name,
4070 error = %e,
4071 "EventBridge Lambda invocation failed"
4072 );
4073 }
4074 }
4075 });
4076}
4077
4078pub fn deliver_to_logs(
4081 logs_state: &SharedLogsState,
4082 log_group_arn: &str,
4083 payload: &str,
4084 timestamp: chrono::DateTime<chrono::Utc>,
4085) {
4086 let group_name = if log_group_arn.contains(":log-group:") {
4089 log_group_arn
4090 .split(":log-group:")
4091 .nth(1)
4092 .unwrap_or(log_group_arn)
4093 .trim_end_matches(":*")
4094 } else {
4095 log_group_arn
4096 };
4097
4098 let stream_name = "events".to_string();
4099 let ts_millis = timestamp.timestamp_millis();
4100
4101 let mut accounts = logs_state.write();
4102 let state = accounts.default_mut();
4103 let region = state.region.clone();
4104 let account_id = state.account_id.clone();
4105
4106 let group = state
4108 .log_groups
4109 .entry(group_name.to_string())
4110 .or_insert_with(|| fakecloud_logs::state::LogGroup {
4111 name: group_name.to_string(),
4112 arn: Arn::new(
4113 "logs",
4114 ®ion,
4115 &account_id,
4116 &format!("log-group:{group_name}"),
4117 )
4118 .to_string(),
4119 creation_time: ts_millis,
4120 retention_in_days: None,
4121 kms_key_id: None,
4122 tags: HashMap::new(),
4123 log_streams: HashMap::new(),
4124 stored_bytes: 0,
4125 subscription_filters: Vec::new(),
4126 data_protection_policy: None,
4127 index_policies: Vec::new(),
4128 transformer: None,
4129 deletion_protection: false,
4130 log_group_class: Some("STANDARD".to_string()),
4131 });
4132
4133 let stream = group
4134 .log_streams
4135 .entry(stream_name.clone())
4136 .or_insert_with(|| fakecloud_logs::state::LogStream {
4137 name: stream_name,
4138 arn: format!("{}:log-stream:events", group.arn),
4139 creation_time: ts_millis,
4140 first_event_timestamp: None,
4141 last_event_timestamp: None,
4142 last_ingestion_time: None,
4143 upload_sequence_token: "1".to_string(),
4144 events: Vec::new(),
4145 });
4146
4147 stream.events.push(fakecloud_logs::state::LogEvent {
4148 timestamp: ts_millis,
4149 message: payload.to_string(),
4150 ingestion_time: ts_millis,
4151 });
4152 stream.last_event_timestamp = Some(ts_millis);
4153 stream.last_ingestion_time = Some(ts_millis);
4154 if stream.first_event_timestamp.is_none() {
4155 stream.first_event_timestamp = Some(ts_millis);
4156 }
4157}
4158
4159fn apply_connection_auth(
4161 mut builder: reqwest::RequestBuilder,
4162 conn: &Connection,
4163) -> reqwest::RequestBuilder {
4164 match conn.authorization_type.as_str() {
4165 "API_KEY" => {
4166 if let Some(params) = conn.auth_parameters.get("ApiKeyAuthParameters") {
4167 if let (Some(name), Some(value)) = (
4168 params["ApiKeyName"].as_str(),
4169 params["ApiKeyValue"].as_str(),
4170 ) {
4171 builder = builder.header(name, value);
4172 }
4173 }
4174 }
4175 "BASIC" => {
4176 if let Some(params) = conn.auth_parameters.get("BasicAuthParameters") {
4177 if let (Some(user), Some(pass)) =
4178 (params["Username"].as_str(), params["Password"].as_str())
4179 {
4180 builder = builder.basic_auth(user, Some(pass));
4181 }
4182 }
4183 }
4184 "OAUTH_CLIENT_CREDENTIALS" => {
4185 if let Some(params) = conn.auth_parameters.get("OAuthParameters") {
4188 if let (Some(client_id), Some(client_secret)) = (
4189 params["ClientParameters"]["ClientID"].as_str(),
4190 params["ClientParameters"]["ClientSecret"].as_str(),
4191 ) {
4192 builder = builder.basic_auth(client_id, Some(client_secret));
4193 }
4194 }
4195 }
4196 _ => {}
4197 }
4198 builder
4199}
4200
4201#[cfg(test)]
4202mod tests {
4203 use super::*;
4204
4205 fn test_matches(
4207 pattern_json: Option<&str>,
4208 source: &str,
4209 detail_type: &str,
4210 detail: &str,
4211 ) -> bool {
4212 matches_pattern(
4213 pattern_json,
4214 source,
4215 detail_type,
4216 detail,
4217 "123456789012",
4218 "us-east-1",
4219 &[],
4220 )
4221 }
4222
4223 #[test]
4224 fn pattern_matches_source() {
4225 assert!(test_matches(
4226 Some(r#"{"source": ["my.app"]}"#),
4227 "my.app",
4228 "OrderPlaced",
4229 "{}"
4230 ));
4231 assert!(!test_matches(
4232 Some(r#"{"source": ["other.app"]}"#),
4233 "my.app",
4234 "OrderPlaced",
4235 "{}"
4236 ));
4237 }
4238
4239 #[test]
4240 fn pattern_matches_detail_type() {
4241 assert!(test_matches(
4242 Some(r#"{"detail-type": ["OrderPlaced"]}"#),
4243 "my.app",
4244 "OrderPlaced",
4245 "{}"
4246 ));
4247 assert!(!test_matches(
4248 Some(r#"{"detail-type": ["OrderShipped"]}"#),
4249 "my.app",
4250 "OrderPlaced",
4251 "{}"
4252 ));
4253 }
4254
4255 #[test]
4256 fn pattern_matches_detail_field() {
4257 assert!(test_matches(
4258 Some(r#"{"detail": {"status": ["ACTIVE"]}}"#),
4259 "my.app",
4260 "StatusChange",
4261 r#"{"status": "ACTIVE"}"#
4262 ));
4263 assert!(!test_matches(
4264 Some(r#"{"detail": {"status": ["ACTIVE"]}}"#),
4265 "my.app",
4266 "StatusChange",
4267 r#"{"status": "INACTIVE"}"#
4268 ));
4269 }
4270
4271 #[test]
4272 fn no_pattern_matches_everything() {
4273 assert!(test_matches(None, "any", "any", "{}"));
4274 }
4275
4276 #[test]
4277 fn combined_pattern() {
4278 let pattern = r#"{"source": ["orders"], "detail-type": ["OrderPlaced"]}"#;
4279 assert!(test_matches(Some(pattern), "orders", "OrderPlaced", "{}"));
4280 assert!(!test_matches(Some(pattern), "orders", "OrderShipped", "{}"));
4281 assert!(!test_matches(Some(pattern), "other", "OrderPlaced", "{}"));
4282 }
4283
4284 #[test]
4285 fn nested_detail_pattern() {
4286 let pattern = r#"{"detail": {"order": {"status": ["PLACED"]}}}"#;
4287 assert!(test_matches(
4288 Some(pattern),
4289 "my.app",
4290 "OrderEvent",
4291 r#"{"order": {"status": "PLACED", "id": "123"}}"#
4292 ));
4293 assert!(!test_matches(
4294 Some(pattern),
4295 "my.app",
4296 "OrderEvent",
4297 r#"{"order": {"status": "SHIPPED", "id": "123"}}"#
4298 ));
4299 assert!(!test_matches(
4300 Some(pattern),
4301 "my.app",
4302 "OrderEvent",
4303 r#"{"order": {"id": "123"}}"#
4304 ));
4305 }
4306
4307 #[test]
4308 fn deeply_nested_detail_pattern() {
4309 let pattern = r#"{"detail": {"a": {"b": {"c": ["deep"]}}}}"#;
4310 assert!(test_matches(
4311 Some(pattern),
4312 "src",
4313 "type",
4314 r#"{"a": {"b": {"c": "deep"}}}"#
4315 ));
4316 assert!(!test_matches(
4317 Some(pattern),
4318 "src",
4319 "type",
4320 r#"{"a": {"b": {"c": "shallow"}}}"#
4321 ));
4322 }
4323
4324 #[test]
4325 fn prefix_matcher() {
4326 let pattern = r#"{"source": [{"prefix": "com.myapp"}]}"#;
4327 assert!(test_matches(
4328 Some(pattern),
4329 "com.myapp.orders",
4330 "OrderPlaced",
4331 "{}"
4332 ));
4333 assert!(test_matches(
4334 Some(pattern),
4335 "com.myapp",
4336 "OrderPlaced",
4337 "{}"
4338 ));
4339 assert!(!test_matches(
4340 Some(pattern),
4341 "com.other",
4342 "OrderPlaced",
4343 "{}"
4344 ));
4345 }
4346
4347 #[test]
4348 fn prefix_matcher_in_detail() {
4349 let pattern = r#"{"detail": {"region": [{"prefix": "us-"}]}}"#;
4350 assert!(test_matches(
4351 Some(pattern),
4352 "src",
4353 "type",
4354 r#"{"region": "us-east-1"}"#
4355 ));
4356 assert!(!test_matches(
4357 Some(pattern),
4358 "src",
4359 "type",
4360 r#"{"region": "eu-west-1"}"#
4361 ));
4362 }
4363
4364 #[test]
4365 fn exists_matcher() {
4366 let pattern = r#"{"detail": {"error": [{"exists": true}]}}"#;
4367 assert!(test_matches(
4368 Some(pattern),
4369 "src",
4370 "type",
4371 r#"{"error": "something broke"}"#
4372 ));
4373 assert!(!test_matches(
4374 Some(pattern),
4375 "src",
4376 "type",
4377 r#"{"status": "ok"}"#
4378 ));
4379
4380 let pattern = r#"{"detail": {"error": [{"exists": false}]}}"#;
4381 assert!(test_matches(
4382 Some(pattern),
4383 "src",
4384 "type",
4385 r#"{"status": "ok"}"#
4386 ));
4387 assert!(!test_matches(
4388 Some(pattern),
4389 "src",
4390 "type",
4391 r#"{"error": "something broke"}"#
4392 ));
4393 }
4394
4395 #[test]
4396 fn anything_but_matcher() {
4397 let pattern = r#"{"source": [{"anything-but": "internal"}]}"#;
4398 assert!(test_matches(Some(pattern), "external", "Event", "{}"));
4399 assert!(!test_matches(Some(pattern), "internal", "Event", "{}"));
4400
4401 let pattern = r#"{"source": [{"anything-but": ["internal", "test"]}]}"#;
4402 assert!(test_matches(Some(pattern), "external", "Event", "{}"));
4403 assert!(!test_matches(Some(pattern), "internal", "Event", "{}"));
4404 assert!(!test_matches(Some(pattern), "test", "Event", "{}"));
4405 }
4406
4407 #[test]
4408 fn anything_but_in_detail() {
4409 let pattern = r#"{"detail": {"env": [{"anything-but": "prod"}]}}"#;
4410 assert!(test_matches(
4411 Some(pattern),
4412 "src",
4413 "type",
4414 r#"{"env": "staging"}"#
4415 ));
4416 assert!(!test_matches(
4417 Some(pattern),
4418 "src",
4419 "type",
4420 r#"{"env": "prod"}"#
4421 ));
4422 }
4423
4424 #[test]
4425 fn numeric_greater_than() {
4426 let pattern = r#"{"detail": {"count": [{"numeric": [">", 100]}]}}"#;
4427 assert!(test_matches(
4428 Some(pattern),
4429 "src",
4430 "type",
4431 r#"{"count": 150}"#
4432 ));
4433 assert!(!test_matches(
4434 Some(pattern),
4435 "src",
4436 "type",
4437 r#"{"count": 100}"#
4438 ));
4439 assert!(!test_matches(
4440 Some(pattern),
4441 "src",
4442 "type",
4443 r#"{"count": 50}"#
4444 ));
4445 }
4446
4447 #[test]
4448 fn numeric_less_than() {
4449 let pattern = r#"{"detail": {"count": [{"numeric": ["<", 10]}]}}"#;
4450 assert!(test_matches(
4451 Some(pattern),
4452 "src",
4453 "type",
4454 r#"{"count": 5}"#
4455 ));
4456 assert!(!test_matches(
4457 Some(pattern),
4458 "src",
4459 "type",
4460 r#"{"count": 10}"#
4461 ));
4462 assert!(!test_matches(
4463 Some(pattern),
4464 "src",
4465 "type",
4466 r#"{"count": 15}"#
4467 ));
4468 }
4469
4470 #[test]
4471 fn numeric_range() {
4472 let pattern = r#"{"detail": {"count": [{"numeric": [">=", 50, "<", 200]}]}}"#;
4473 assert!(test_matches(
4474 Some(pattern),
4475 "src",
4476 "type",
4477 r#"{"count": 50}"#
4478 ));
4479 assert!(test_matches(
4480 Some(pattern),
4481 "src",
4482 "type",
4483 r#"{"count": 100}"#
4484 ));
4485 assert!(!test_matches(
4486 Some(pattern),
4487 "src",
4488 "type",
4489 r#"{"count": 200}"#
4490 ));
4491 assert!(!test_matches(
4492 Some(pattern),
4493 "src",
4494 "type",
4495 r#"{"count": 49}"#
4496 ));
4497 }
4498
4499 #[test]
4500 fn mixed_matchers_and_literals() {
4501 let pattern = r#"{"source": ["exact.match", {"prefix": "com.myapp"}]}"#;
4502 assert!(test_matches(Some(pattern), "exact.match", "Event", "{}"));
4503 assert!(test_matches(
4504 Some(pattern),
4505 "com.myapp.orders",
4506 "Event",
4507 "{}"
4508 ));
4509 assert!(!test_matches(Some(pattern), "other.source", "Event", "{}"));
4510 }
4511
4512 use fakecloud_core::delivery::DeliveryBus;
4515 use parking_lot::RwLock;
4516
4517 fn make_service() -> EventBridgeService {
4518 let state = Arc::new(RwLock::new(
4519 fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
4520 ));
4521 let delivery = Arc::new(DeliveryBus::new());
4522 EventBridgeService::new(state, delivery)
4523 }
4524
4525 fn make_request(action: &str, body: Value) -> AwsRequest {
4526 AwsRequest {
4527 service: "events".to_string(),
4528 action: action.to_string(),
4529 region: "us-east-1".to_string(),
4530 account_id: "123456789012".to_string(),
4531 request_id: "test-id".to_string(),
4532 headers: http::HeaderMap::new(),
4533 query_params: HashMap::new(),
4534 body: serde_json::to_vec(&body).unwrap().into(),
4535 path_segments: vec![],
4536 raw_path: "/".to_string(),
4537 raw_query: String::new(),
4538 method: http::Method::POST,
4539 is_query_protocol: false,
4540 access_key_id: None,
4541 principal: None,
4542 }
4543 }
4544
4545 fn create_connection(svc: &EventBridgeService, name: &str) {
4546 let req = make_request(
4547 "CreateConnection",
4548 json!({
4549 "Name": name,
4550 "AuthorizationType": "API_KEY",
4551 "AuthParameters": {
4552 "ApiKeyAuthParameters": {
4553 "ApiKeyName": "x-api-key",
4554 "ApiKeyValue": "secret"
4555 }
4556 }
4557 }),
4558 );
4559 svc.create_connection(&req).unwrap();
4560 }
4561
4562 fn create_api_destination(svc: &EventBridgeService, name: &str, conn_name: &str) {
4563 let conn_arn_field = {
4564 let _mas = svc.state.read();
4565 let state = _mas.default_ref();
4566 state.connections.get(conn_name).unwrap().arn.clone()
4567 };
4568 let req = make_request(
4569 "CreateApiDestination",
4570 json!({
4571 "Name": name,
4572 "ConnectionArn": conn_arn_field,
4573 "InvocationEndpoint": "https://example.com",
4574 "HttpMethod": "POST"
4575 }),
4576 );
4577 svc.create_api_destination(&req).unwrap();
4578 }
4579
4580 #[test]
4583 fn list_connections_returns_all_by_default() {
4584 let svc = make_service();
4585 create_connection(&svc, "conn-alpha");
4586 create_connection(&svc, "conn-beta");
4587 create_connection(&svc, "conn-gamma");
4588
4589 let req = make_request("ListConnections", json!({}));
4590 let resp = svc.list_connections(&req).unwrap();
4591 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4592 assert_eq!(body["Connections"].as_array().unwrap().len(), 3);
4593 assert!(body["NextToken"].is_null());
4594 }
4595
4596 #[test]
4597 fn list_connections_name_prefix_filter() {
4598 let svc = make_service();
4599 create_connection(&svc, "prod-conn-1");
4600 create_connection(&svc, "prod-conn-2");
4601 create_connection(&svc, "dev-conn-1");
4602
4603 let req = make_request("ListConnections", json!({ "NamePrefix": "prod-" }));
4604 let resp = svc.list_connections(&req).unwrap();
4605 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4606 let names: Vec<&str> = body["Connections"]
4607 .as_array()
4608 .unwrap()
4609 .iter()
4610 .map(|c| c["Name"].as_str().unwrap())
4611 .collect();
4612 assert_eq!(names.len(), 2);
4613 assert!(names.iter().all(|n| n.starts_with("prod-")));
4614 }
4615
4616 #[test]
4617 fn list_connections_state_filter() {
4618 let svc = make_service();
4619 create_connection(&svc, "conn-a");
4620 create_connection(&svc, "conn-b");
4621
4622 {
4624 let mut _mas = svc.state.write();
4625 let state = _mas.default_mut();
4626 state
4627 .connections
4628 .get_mut("conn-b")
4629 .unwrap()
4630 .connection_state = "DEAUTHORIZED".to_string();
4631 }
4632
4633 let req = make_request(
4634 "ListConnections",
4635 json!({ "ConnectionState": "AUTHORIZED" }),
4636 );
4637 let resp = svc.list_connections(&req).unwrap();
4638 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4639 let conns = body["Connections"].as_array().unwrap();
4640 assert_eq!(conns.len(), 1);
4641 assert_eq!(conns[0]["Name"].as_str().unwrap(), "conn-a");
4642 }
4643
4644 #[test]
4645 fn list_connections_pagination() {
4646 let svc = make_service();
4647 for i in 0..5 {
4648 create_connection(&svc, &format!("conn-{i:02}"));
4649 }
4650
4651 let req = make_request("ListConnections", json!({ "Limit": 2 }));
4653 let resp = svc.list_connections(&req).unwrap();
4654 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4655 assert_eq!(body["Connections"].as_array().unwrap().len(), 2);
4656 let token = body["NextToken"].as_str().unwrap();
4657 assert_eq!(token, "2");
4658
4659 let req = make_request("ListConnections", json!({ "Limit": 2, "NextToken": token }));
4661 let resp = svc.list_connections(&req).unwrap();
4662 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4663 assert_eq!(body["Connections"].as_array().unwrap().len(), 2);
4664 let token = body["NextToken"].as_str().unwrap();
4665 assert_eq!(token, "4");
4666
4667 let req = make_request("ListConnections", json!({ "Limit": 2, "NextToken": token }));
4669 let resp = svc.list_connections(&req).unwrap();
4670 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4671 assert_eq!(body["Connections"].as_array().unwrap().len(), 1);
4672 assert!(body["NextToken"].is_null());
4673 }
4674
4675 #[test]
4676 fn list_connections_pagination_with_filter() {
4677 let svc = make_service();
4678 for i in 0..4 {
4679 create_connection(&svc, &format!("prod-{i:02}"));
4680 }
4681 create_connection(&svc, "dev-00");
4682
4683 let req = make_request(
4684 "ListConnections",
4685 json!({ "NamePrefix": "prod-", "Limit": 2 }),
4686 );
4687 let resp = svc.list_connections(&req).unwrap();
4688 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4689 assert_eq!(body["Connections"].as_array().unwrap().len(), 2);
4690 assert!(body["NextToken"].as_str().is_some());
4691 }
4692
4693 #[test]
4696 fn list_api_destinations_returns_all_by_default() {
4697 let svc = make_service();
4698 create_connection(&svc, "my-conn");
4699 create_api_destination(&svc, "dest-alpha", "my-conn");
4700 create_api_destination(&svc, "dest-beta", "my-conn");
4701
4702 let req = make_request("ListApiDestinations", json!({}));
4703 let resp = svc.list_api_destinations(&req).unwrap();
4704 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4705 assert_eq!(body["ApiDestinations"].as_array().unwrap().len(), 2);
4706 assert!(body["NextToken"].is_null());
4707 }
4708
4709 #[test]
4710 fn list_api_destinations_name_prefix_filter() {
4711 let svc = make_service();
4712 create_connection(&svc, "my-conn");
4713 create_api_destination(&svc, "prod-dest-1", "my-conn");
4714 create_api_destination(&svc, "prod-dest-2", "my-conn");
4715 create_api_destination(&svc, "dev-dest-1", "my-conn");
4716
4717 let req = make_request("ListApiDestinations", json!({ "NamePrefix": "prod-" }));
4718 let resp = svc.list_api_destinations(&req).unwrap();
4719 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4720 let names: Vec<&str> = body["ApiDestinations"]
4721 .as_array()
4722 .unwrap()
4723 .iter()
4724 .map(|d| d["Name"].as_str().unwrap())
4725 .collect();
4726 assert_eq!(names.len(), 2);
4727 assert!(names.iter().all(|n| n.starts_with("prod-")));
4728 }
4729
4730 #[test]
4731 fn list_api_destinations_connection_arn_filter() {
4732 let svc = make_service();
4733 create_connection(&svc, "conn-a");
4734 create_connection(&svc, "conn-b");
4735 create_api_destination(&svc, "dest-1", "conn-a");
4736 create_api_destination(&svc, "dest-2", "conn-b");
4737 create_api_destination(&svc, "dest-3", "conn-a");
4738
4739 let conn_a_arn = {
4740 let _mas = svc.state.read();
4741 let state = _mas.default_ref();
4742 state.connections.get("conn-a").unwrap().arn.clone()
4743 };
4744
4745 let req = make_request(
4746 "ListApiDestinations",
4747 json!({ "ConnectionArn": conn_a_arn }),
4748 );
4749 let resp = svc.list_api_destinations(&req).unwrap();
4750 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4751 let names: Vec<&str> = body["ApiDestinations"]
4752 .as_array()
4753 .unwrap()
4754 .iter()
4755 .map(|d| d["Name"].as_str().unwrap())
4756 .collect();
4757 assert_eq!(names.len(), 2);
4758 assert!(names.contains(&"dest-1"));
4759 assert!(names.contains(&"dest-3"));
4760 }
4761
4762 #[test]
4763 fn list_api_destinations_pagination() {
4764 let svc = make_service();
4765 create_connection(&svc, "my-conn");
4766 for i in 0..5 {
4767 create_api_destination(&svc, &format!("dest-{i:02}"), "my-conn");
4768 }
4769
4770 let req = make_request("ListApiDestinations", json!({ "Limit": 2 }));
4772 let resp = svc.list_api_destinations(&req).unwrap();
4773 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4774 assert_eq!(body["ApiDestinations"].as_array().unwrap().len(), 2);
4775 let token = body["NextToken"].as_str().unwrap();
4776 assert_eq!(token, "2");
4777
4778 let req = make_request(
4780 "ListApiDestinations",
4781 json!({ "Limit": 2, "NextToken": token }),
4782 );
4783 let resp = svc.list_api_destinations(&req).unwrap();
4784 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4785 assert_eq!(body["ApiDestinations"].as_array().unwrap().len(), 2);
4786 let token = body["NextToken"].as_str().unwrap();
4787 assert_eq!(token, "4");
4788
4789 let req = make_request(
4791 "ListApiDestinations",
4792 json!({ "Limit": 2, "NextToken": token }),
4793 );
4794 let resp = svc.list_api_destinations(&req).unwrap();
4795 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4796 assert_eq!(body["ApiDestinations"].as_array().unwrap().len(), 1);
4797 assert!(body["NextToken"].is_null());
4798 }
4799
4800 fn create_event_bus(svc: &EventBridgeService, name: &str) {
4803 let req = make_request("CreateEventBus", json!({ "Name": name }));
4804 svc.create_event_bus(&req).unwrap();
4805 }
4806
4807 #[test]
4808 fn list_event_buses_pagination() {
4809 let svc = make_service();
4810 for i in 0..4 {
4812 create_event_bus(&svc, &format!("bus-{i:02}"));
4813 }
4814
4815 let req = make_request("ListEventBuses", json!({ "Limit": 2 }));
4817 let resp = svc.list_event_buses(&req).unwrap();
4818 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4819 assert_eq!(body["EventBuses"].as_array().unwrap().len(), 2);
4820 let token = body["NextToken"].as_str().unwrap();
4821 assert_eq!(token, "2");
4822
4823 let req = make_request("ListEventBuses", json!({ "Limit": 2, "NextToken": token }));
4825 let resp = svc.list_event_buses(&req).unwrap();
4826 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4827 assert_eq!(body["EventBuses"].as_array().unwrap().len(), 2);
4828 let token = body["NextToken"].as_str().unwrap();
4829 assert_eq!(token, "4");
4830
4831 let req = make_request("ListEventBuses", json!({ "Limit": 2, "NextToken": token }));
4833 let resp = svc.list_event_buses(&req).unwrap();
4834 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4835 assert_eq!(body["EventBuses"].as_array().unwrap().len(), 1);
4836 assert!(body["NextToken"].is_null());
4837 }
4838
4839 #[test]
4840 fn list_event_buses_no_pagination_returns_all() {
4841 let svc = make_service();
4842 create_event_bus(&svc, "bus-alpha");
4843 create_event_bus(&svc, "bus-beta");
4844
4845 let req = make_request("ListEventBuses", json!({}));
4846 let resp = svc.list_event_buses(&req).unwrap();
4847 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4848 assert_eq!(body["EventBuses"].as_array().unwrap().len(), 3);
4850 assert!(body["NextToken"].is_null());
4851 }
4852
4853 #[test]
4856 fn put_events_never_includes_endpoint_id_in_response() {
4857 let svc = make_service();
4858 let req = make_request(
4860 "PutEvents",
4861 json!({
4862 "EndpointId": "my-endpoint.abc123",
4863 "Entries": [{
4864 "Source": "my.source",
4865 "DetailType": "MyType",
4866 "Detail": "{}",
4867 "EventBusName": "default"
4868 }]
4869 }),
4870 );
4871 let resp = svc.put_events(&req).unwrap();
4872 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4873 assert!(
4874 !body.as_object().unwrap().contains_key("EndpointId"),
4875 "EndpointId should never be in the PutEvents response"
4876 );
4877 assert_eq!(body["FailedEntryCount"], 0);
4878 }
4879
4880 fn create_archive(svc: &EventBridgeService, name: &str) {
4883 let req = make_request(
4884 "CreateArchive",
4885 json!({
4886 "ArchiveName": name,
4887 "EventSourceArn": "arn:aws:events:us-east-1:123456789012:event-bus/default"
4888 }),
4889 );
4890 svc.create_archive(&req).unwrap();
4891 }
4892
4893 #[test]
4894 fn list_archives_pagination() {
4895 let svc = make_service();
4896 for i in 0..5 {
4897 create_archive(&svc, &format!("archive-{i:02}"));
4898 }
4899
4900 let req = make_request("ListArchives", json!({ "Limit": 2 }));
4902 let resp = svc.list_archives(&req).unwrap();
4903 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4904 assert_eq!(body["Archives"].as_array().unwrap().len(), 2);
4905 let token = body["NextToken"].as_str().unwrap();
4906 assert_eq!(token, "2");
4907
4908 let req = make_request("ListArchives", json!({ "Limit": 2, "NextToken": token }));
4910 let resp = svc.list_archives(&req).unwrap();
4911 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4912 assert_eq!(body["Archives"].as_array().unwrap().len(), 2);
4913 let token = body["NextToken"].as_str().unwrap();
4914 assert_eq!(token, "4");
4915
4916 let req = make_request("ListArchives", json!({ "Limit": 2, "NextToken": token }));
4918 let resp = svc.list_archives(&req).unwrap();
4919 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4920 assert_eq!(body["Archives"].as_array().unwrap().len(), 1);
4921 assert!(body["NextToken"].is_null());
4922 }
4923
4924 fn create_replay(svc: &EventBridgeService, name: &str) {
4927 let archive_arn = {
4929 let guard = svc.state.read();
4930 let st = guard.default_ref();
4931 if st.archives.contains_key("replay-archive") {
4932 st.archives["replay-archive"].arn.clone()
4933 } else {
4934 drop(guard);
4935 create_archive(svc, "replay-archive");
4936 svc.state.read().default_ref().archives["replay-archive"]
4937 .arn
4938 .clone()
4939 }
4940 };
4941 let req = make_request(
4942 "StartReplay",
4943 json!({
4944 "ReplayName": name,
4945 "EventSourceArn": archive_arn,
4946 "EventStartTime": 1000000.0,
4947 "EventEndTime": 2000000.0,
4948 "Destination": {
4949 "Arn": "arn:aws:events:us-east-1:123456789012:event-bus/default"
4950 }
4951 }),
4952 );
4953 svc.start_replay(&req).unwrap();
4954 }
4955
4956 #[test]
4957 fn list_replays_pagination() {
4958 let svc = make_service();
4959 for i in 0..5 {
4960 create_replay(&svc, &format!("replay-{i:02}"));
4961 }
4962
4963 let req = make_request("ListReplays", json!({ "Limit": 2 }));
4965 let resp = svc.list_replays(&req).unwrap();
4966 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4967 assert_eq!(body["Replays"].as_array().unwrap().len(), 2);
4968 let token = body["NextToken"].as_str().unwrap();
4969 assert_eq!(token, "2");
4970
4971 let req = make_request("ListReplays", json!({ "Limit": 2, "NextToken": token }));
4973 let resp = svc.list_replays(&req).unwrap();
4974 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4975 assert_eq!(body["Replays"].as_array().unwrap().len(), 2);
4976 let token = body["NextToken"].as_str().unwrap();
4977 assert_eq!(token, "4");
4978
4979 let req = make_request("ListReplays", json!({ "Limit": 2, "NextToken": token }));
4981 let resp = svc.list_replays(&req).unwrap();
4982 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4983 assert_eq!(body["Replays"].as_array().unwrap().len(), 1);
4984 assert!(body["NextToken"].is_null());
4985 }
4986
4987 #[test]
4988 fn list_event_buses_invalid_next_token_returns_error() {
4989 let svc = make_service();
4990
4991 let req = make_request("ListEventBuses", json!({ "NextToken": "not-a-number" }));
4992 let result = svc.list_event_buses(&req);
4993 assert!(
4994 result.is_err(),
4995 "non-numeric NextToken should return an error"
4996 );
4997 }
4998
4999 #[test]
5002 fn test_event_pattern_match() {
5003 let svc = make_service();
5004 let req = make_request(
5005 "TestEventPattern",
5006 json!({
5007 "EventPattern": r#"{"source": ["my.app"]}"#,
5008 "Event": r#"{"source": "my.app", "detail-type": "Test", "detail": {}}"#
5009 }),
5010 );
5011 let resp = svc.test_event_pattern(&req).unwrap();
5012 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5013 assert_eq!(body["Result"], true);
5014 }
5015
5016 #[test]
5017 fn test_event_pattern_no_match() {
5018 let svc = make_service();
5019 let req = make_request(
5020 "TestEventPattern",
5021 json!({
5022 "EventPattern": r#"{"source": ["other.app"]}"#,
5023 "Event": r#"{"source": "my.app", "detail-type": "Test", "detail": {}}"#
5024 }),
5025 );
5026 let resp = svc.test_event_pattern(&req).unwrap();
5027 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5028 assert_eq!(body["Result"], false);
5029 }
5030
5031 #[test]
5032 fn test_event_pattern_detail_match() {
5033 let svc = make_service();
5034 let req = make_request(
5035 "TestEventPattern",
5036 json!({
5037 "EventPattern": r#"{"detail": {"status": ["PLACED"]}}"#,
5038 "Event": r#"{"source": "my.app", "detail-type": "Order", "detail": {"status": "PLACED", "id": "123"}}"#
5039 }),
5040 );
5041 let resp = svc.test_event_pattern(&req).unwrap();
5042 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5043 assert_eq!(body["Result"], true);
5044 }
5045
5046 #[test]
5049 fn update_event_bus_description() {
5050 let svc = make_service();
5051 create_event_bus(&svc, "my-bus");
5052
5053 let req = make_request(
5054 "UpdateEventBus",
5055 json!({ "Name": "my-bus", "Description": "Updated desc" }),
5056 );
5057 let resp = svc.update_event_bus(&req).unwrap();
5058 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5059 assert_eq!(body["Name"], "my-bus");
5060
5061 let req = make_request("DescribeEventBus", json!({ "Name": "my-bus" }));
5063 let resp = svc.describe_event_bus(&req).unwrap();
5064 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5065 assert_eq!(body["Description"], "Updated desc");
5066 }
5067
5068 #[test]
5069 fn update_event_bus_not_found() {
5070 let svc = make_service();
5071 let req = make_request(
5072 "UpdateEventBus",
5073 json!({ "Name": "ghost-bus", "Description": "nope" }),
5074 );
5075 assert!(svc.update_event_bus(&req).is_err());
5076 }
5077
5078 fn create_endpoint_helper(svc: &EventBridgeService, name: &str) {
5081 let req = make_request(
5082 "CreateEndpoint",
5083 json!({
5084 "Name": name,
5085 "RoutingConfig": {
5086 "FailoverConfig": {
5087 "Primary": { "HealthCheck": "" },
5088 "Secondary": { "Route": "us-west-2" }
5089 }
5090 },
5091 "EventBuses": [
5092 { "EventBusArn": "arn:aws:events:us-east-1:123456789012:event-bus/default" }
5093 ]
5094 }),
5095 );
5096 svc.create_endpoint(&req).unwrap();
5097 }
5098
5099 #[test]
5100 fn endpoint_create_describe_delete() {
5101 let svc = make_service();
5102 create_endpoint_helper(&svc, "my-endpoint");
5103
5104 let req = make_request("DescribeEndpoint", json!({ "Name": "my-endpoint" }));
5106 let resp = svc.describe_endpoint(&req).unwrap();
5107 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5108 assert_eq!(body["Name"], "my-endpoint");
5109 assert_eq!(body["State"], "ACTIVE");
5110 assert!(body["EndpointId"].as_str().unwrap().contains("my-endpoint"));
5111
5112 let req = make_request("DeleteEndpoint", json!({ "Name": "my-endpoint" }));
5114 svc.delete_endpoint(&req).unwrap();
5115
5116 let req = make_request("DescribeEndpoint", json!({ "Name": "my-endpoint" }));
5118 assert!(svc.describe_endpoint(&req).is_err());
5119 }
5120
5121 #[test]
5122 fn endpoint_list_and_update() {
5123 let svc = make_service();
5124 create_endpoint_helper(&svc, "ep-alpha");
5125 create_endpoint_helper(&svc, "ep-beta");
5126
5127 let req = make_request("ListEndpoints", json!({}));
5129 let resp = svc.list_endpoints(&req).unwrap();
5130 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5131 assert_eq!(body["Endpoints"].as_array().unwrap().len(), 2);
5132
5133 let req = make_request(
5135 "UpdateEndpoint",
5136 json!({ "Name": "ep-alpha", "Description": "updated" }),
5137 );
5138 let resp = svc.update_endpoint(&req).unwrap();
5139 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5140 assert_eq!(body["Name"], "ep-alpha");
5141
5142 let req = make_request("DescribeEndpoint", json!({ "Name": "ep-alpha" }));
5144 let resp = svc.describe_endpoint(&req).unwrap();
5145 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5146 assert_eq!(body["Description"], "updated");
5147 }
5148
5149 #[test]
5150 fn endpoint_duplicate_fails() {
5151 let svc = make_service();
5152 create_endpoint_helper(&svc, "dup-ep");
5153 let req = make_request(
5154 "CreateEndpoint",
5155 json!({
5156 "Name": "dup-ep",
5157 "RoutingConfig": {},
5158 "EventBuses": []
5159 }),
5160 );
5161 assert!(svc.create_endpoint(&req).is_err());
5162 }
5163
5164 #[test]
5167 fn deauthorize_connection_sets_state() {
5168 let svc = make_service();
5169 create_connection(&svc, "deauth-conn");
5170
5171 let req = make_request("DeauthorizeConnection", json!({ "Name": "deauth-conn" }));
5172 let resp = svc.deauthorize_connection(&req).unwrap();
5173 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5174 assert_eq!(body["ConnectionState"], "DEAUTHORIZING");
5175 assert!(body["ConnectionArn"]
5176 .as_str()
5177 .unwrap()
5178 .contains("deauth-conn"));
5179
5180 let req = make_request("DescribeConnection", json!({ "Name": "deauth-conn" }));
5182 let resp = svc.describe_connection(&req).unwrap();
5183 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5184 assert_eq!(body["ConnectionState"], "DEAUTHORIZING");
5185 }
5186
5187 #[test]
5188 fn deauthorize_connection_not_found() {
5189 let svc = make_service();
5190 let req = make_request("DeauthorizeConnection", json!({ "Name": "ghost-conn" }));
5191 assert!(svc.deauthorize_connection(&req).is_err());
5192 }
5193
5194 #[test]
5197 fn partner_event_source_crud() {
5198 let svc = make_service();
5199
5200 let req = make_request(
5202 "CreatePartnerEventSource",
5203 json!({ "Name": "partner/test", "Account": "123456789012" }),
5204 );
5205 svc.create_partner_event_source(&req).unwrap();
5206
5207 let req = make_request(
5209 "DescribePartnerEventSource",
5210 json!({ "Name": "partner/test" }),
5211 );
5212 let resp = svc.describe_partner_event_source(&req).unwrap();
5213 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5214 assert_eq!(body["Name"], "partner/test");
5215
5216 let req = make_request("ListPartnerEventSources", json!({"NamePrefix": "partner/"}));
5218 let resp = svc.list_partner_event_sources(&req).unwrap();
5219 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5220 assert_eq!(body["PartnerEventSources"].as_array().unwrap().len(), 1);
5221
5222 let req = make_request(
5224 "ListPartnerEventSourceAccounts",
5225 json!({ "EventSourceName": "partner/test" }),
5226 );
5227 let resp = svc.list_partner_event_source_accounts(&req).unwrap();
5228 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5229 assert_eq!(
5230 body["PartnerEventSourceAccounts"].as_array().unwrap().len(),
5231 1
5232 );
5233
5234 let req = make_request("DescribeEventSource", json!({ "Name": "partner/test" }));
5236 let resp = svc.describe_event_source(&req).unwrap();
5237 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5238 assert_eq!(body["Name"], "partner/test");
5239 assert_eq!(body["State"], "ACTIVE");
5240
5241 let req = make_request("ListEventSources", json!({}));
5243 let resp = svc.list_event_sources(&req).unwrap();
5244 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5245 assert_eq!(body["EventSources"].as_array().unwrap().len(), 1);
5246
5247 let req = make_request(
5249 "DeletePartnerEventSource",
5250 json!({ "Name": "partner/test", "Account": "123456789012" }),
5251 );
5252 svc.delete_partner_event_source(&req).unwrap();
5253
5254 let req = make_request(
5256 "DescribePartnerEventSource",
5257 json!({ "Name": "partner/test" }),
5258 );
5259 assert!(svc.describe_partner_event_source(&req).is_err());
5260 }
5261
5262 #[test]
5263 fn activate_deactivate_event_source() {
5264 let svc = make_service();
5265
5266 let req = make_request(
5268 "CreatePartnerEventSource",
5269 json!({ "Name": "aws.partner/test", "Account": "123456789012" }),
5270 );
5271 svc.create_partner_event_source(&req).unwrap();
5272
5273 let req = make_request(
5275 "DeactivateEventSource",
5276 json!({ "Name": "aws.partner/test" }),
5277 );
5278 svc.deactivate_event_source(&req).unwrap();
5279 {
5280 let _mas = svc.state.read();
5281 let state = _mas.default_ref();
5282 assert_eq!(
5283 state.partner_event_sources["aws.partner/test"].state,
5284 "INACTIVE"
5285 );
5286 }
5287
5288 let req = make_request("ActivateEventSource", json!({ "Name": "aws.partner/test" }));
5290 svc.activate_event_source(&req).unwrap();
5291 {
5292 let _mas = svc.state.read();
5293 let state = _mas.default_ref();
5294 assert_eq!(
5295 state.partner_event_sources["aws.partner/test"].state,
5296 "ACTIVE"
5297 );
5298 }
5299
5300 let req = make_request("ActivateEventSource", json!({ "Name": "nonexistent" }));
5302 assert!(svc.activate_event_source(&req).is_err());
5303
5304 let req = make_request("DeactivateEventSource", json!({ "Name": "nonexistent" }));
5305 assert!(svc.deactivate_event_source(&req).is_err());
5306 }
5307
5308 #[test]
5309 fn delete_partner_event_source_verifies_account() {
5310 let svc = make_service();
5311
5312 let req = make_request(
5314 "CreatePartnerEventSource",
5315 json!({ "Name": "aws.partner/test", "Account": "123456789012" }),
5316 );
5317 svc.create_partner_event_source(&req).unwrap();
5318
5319 let req = make_request(
5321 "DeletePartnerEventSource",
5322 json!({ "Name": "aws.partner/test", "Account": "999999999999" }),
5323 );
5324 assert!(svc.delete_partner_event_source(&req).is_err());
5325 assert!(svc
5327 .state
5328 .read()
5329 .default_ref()
5330 .partner_event_sources
5331 .contains_key("aws.partner/test"));
5332
5333 let req = make_request(
5335 "DeletePartnerEventSource",
5336 json!({ "Name": "aws.partner/test", "Account": "123456789012" }),
5337 );
5338 svc.delete_partner_event_source(&req).unwrap();
5339 assert!(!svc
5340 .state
5341 .read()
5342 .default_ref()
5343 .partner_event_sources
5344 .contains_key("aws.partner/test"));
5345
5346 let req = make_request(
5348 "DeletePartnerEventSource",
5349 json!({ "Name": "aws.partner/test", "Account": "123456789012" }),
5350 );
5351 assert!(svc.delete_partner_event_source(&req).is_err());
5352 }
5353
5354 #[test]
5355 fn put_partner_events() {
5356 let svc = make_service();
5357 let req = make_request(
5358 "PutPartnerEvents",
5359 json!({
5360 "Entries": [
5361 { "Source": "partner.app", "DetailType": "Test", "Detail": "{}" }
5362 ]
5363 }),
5364 );
5365 let resp = svc.put_partner_events(&req).unwrap();
5366 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5367 assert_eq!(body["FailedEntryCount"], 0);
5368 assert_eq!(body["Entries"].as_array().unwrap().len(), 1);
5369 assert!(body["Entries"][0]["EventId"].as_str().is_some());
5370 }
5371
5372 #[allow(clippy::type_complexity)]
5376 fn make_service_with_sqs_recorder() -> (
5377 EventBridgeService,
5378 Arc<parking_lot::Mutex<Vec<(String, String)>>>,
5379 ) {
5380 use fakecloud_core::delivery::SqsDelivery;
5381
5382 struct RecordingSqsDelivery {
5383 messages: Arc<parking_lot::Mutex<Vec<(String, String)>>>,
5384 }
5385
5386 impl SqsDelivery for RecordingSqsDelivery {
5387 fn deliver_to_queue(
5388 &self,
5389 queue_arn: &str,
5390 message_body: &str,
5391 _attributes: &HashMap<String, String>,
5392 ) {
5393 self.messages
5394 .lock()
5395 .push((queue_arn.to_string(), message_body.to_string()));
5396 }
5397 }
5398
5399 let messages: Arc<parking_lot::Mutex<Vec<(String, String)>>> =
5400 Arc::new(parking_lot::Mutex::new(Vec::new()));
5401 let state = Arc::new(RwLock::new(
5402 fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
5403 ));
5404 let delivery = Arc::new(DeliveryBus::new().with_sqs(Arc::new(RecordingSqsDelivery {
5405 messages: messages.clone(),
5406 })));
5407 let svc = EventBridgeService::new(state, delivery);
5408 (svc, messages)
5409 }
5410
5411 #[test]
5412 fn start_replay_delivers_archived_events_to_sqs_target() {
5413 let (svc, messages) = make_service_with_sqs_recorder();
5414 let queue_arn = "arn:aws:sqs:us-east-1:123456789012:replay-queue";
5415
5416 let req = make_request(
5418 "PutRule",
5419 json!({
5420 "Name": "replay-test-rule",
5421 "EventPattern": r#"{"source": ["my.app"]}"#,
5422 "State": "ENABLED"
5423 }),
5424 );
5425 svc.put_rule(&req).unwrap();
5426
5427 let req = make_request(
5428 "PutTargets",
5429 json!({
5430 "Rule": "replay-test-rule",
5431 "Targets": [{
5432 "Id": "sqs-target",
5433 "Arn": queue_arn
5434 }]
5435 }),
5436 );
5437 svc.put_targets(&req).unwrap();
5438
5439 let req = make_request(
5441 "CreateArchive",
5442 json!({
5443 "ArchiveName": "test-archive",
5444 "EventSourceArn": "arn:aws:events:us-east-1:123456789012:event-bus/default"
5445 }),
5446 );
5447 svc.create_archive(&req).unwrap();
5448
5449 let req = make_request(
5451 "PutEvents",
5452 json!({
5453 "Entries": [
5454 {
5455 "Source": "my.app",
5456 "DetailType": "OrderCreated",
5457 "Detail": "{\"orderId\": \"1\"}",
5458 "EventBusName": "default"
5459 },
5460 {
5461 "Source": "my.app",
5462 "DetailType": "OrderShipped",
5463 "Detail": "{\"orderId\": \"2\"}",
5464 "EventBusName": "default"
5465 }
5466 ]
5467 }),
5468 );
5469 let resp = svc.put_events(&req).unwrap();
5470 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5471 assert_eq!(body["FailedEntryCount"], 0);
5472
5473 {
5475 let _mas = svc.state.read();
5476 let state = _mas.default_ref();
5477 let archive = state.archives.get("test-archive").unwrap();
5478 assert_eq!(archive.events.len(), 2);
5479 assert_eq!(archive.event_count, 2);
5480 }
5481
5482 messages.lock().clear();
5484
5485 let archive_arn = {
5487 let _mas = svc.state.read();
5488 let state = _mas.default_ref();
5489 state.archives.get("test-archive").unwrap().arn.clone()
5490 };
5491
5492 let start_ts = 0.0_f64;
5494 let end_ts = (chrono::Utc::now().timestamp() + 3600) as f64;
5495
5496 let req = make_request(
5497 "StartReplay",
5498 json!({
5499 "ReplayName": "my-replay",
5500 "EventSourceArn": archive_arn,
5501 "Destination": {
5502 "Arn": "arn:aws:events:us-east-1:123456789012:event-bus/default"
5503 },
5504 "EventStartTime": start_ts,
5505 "EventEndTime": end_ts
5506 }),
5507 );
5508 let resp = svc.start_replay(&req).unwrap();
5509 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5510 assert_eq!(body["State"], "STARTING");
5511
5512 let delivered = messages.lock();
5514 assert_eq!(
5515 delivered.len(),
5516 2,
5517 "expected 2 replayed events delivered to SQS"
5518 );
5519 for (arn, msg) in delivered.iter() {
5520 assert_eq!(arn, queue_arn);
5521 let event: Value = serde_json::from_str(msg).unwrap();
5522 assert_eq!(event["source"], "my.app");
5523 assert!(event["replay-name"].as_str().is_some());
5525 }
5526
5527 let _mas = svc.state.read();
5529 let state = _mas.default_ref();
5530 let replay = state.replays.get("my-replay").unwrap();
5531 assert_eq!(replay.state, "COMPLETED");
5532 }
5533
5534 #[test]
5535 fn apply_connection_auth_api_key() {
5536 let conn = Connection {
5537 name: "test-conn".to_string(),
5538 arn: "arn:aws:events:us-east-1:123456789012:connection/test-conn/uuid".to_string(),
5539 description: None,
5540 authorization_type: "API_KEY".to_string(),
5541 auth_parameters: json!({
5542 "ApiKeyAuthParameters": {
5543 "ApiKeyName": "x-api-key",
5544 "ApiKeyValue": "my-secret"
5545 }
5546 }),
5547 connection_state: "AUTHORIZED".to_string(),
5548 secret_arn: "arn:aws:secretsmanager:us-east-1:123456789012:secret:test".to_string(),
5549 creation_time: Utc::now(),
5550 last_modified_time: Utc::now(),
5551 last_authorized_time: Utc::now(),
5552 };
5553
5554 let client = reqwest::Client::new();
5555 let builder = client
5556 .post("http://localhost:12345/test")
5557 .header("Content-Type", "application/json");
5558 let builder = apply_connection_auth(builder, &conn);
5559
5560 let request = builder.body("{}").build().unwrap();
5562 assert_eq!(
5563 request
5564 .headers()
5565 .get("x-api-key")
5566 .unwrap()
5567 .to_str()
5568 .unwrap(),
5569 "my-secret"
5570 );
5571 }
5572
5573 #[test]
5574 fn apply_connection_auth_basic() {
5575 let conn = Connection {
5576 name: "basic-conn".to_string(),
5577 arn: "arn:aws:events:us-east-1:123456789012:connection/basic-conn/uuid".to_string(),
5578 description: None,
5579 authorization_type: "BASIC".to_string(),
5580 auth_parameters: json!({
5581 "BasicAuthParameters": {
5582 "Username": "user",
5583 "Password": "pass"
5584 }
5585 }),
5586 connection_state: "AUTHORIZED".to_string(),
5587 secret_arn: "arn:aws:secretsmanager:us-east-1:123456789012:secret:test".to_string(),
5588 creation_time: Utc::now(),
5589 last_modified_time: Utc::now(),
5590 last_authorized_time: Utc::now(),
5591 };
5592
5593 let client = reqwest::Client::new();
5594 let builder = client.post("http://localhost:12345/test");
5595 let builder = apply_connection_auth(builder, &conn);
5596
5597 let request = builder.body("{}").build().unwrap();
5598 let auth_header = request
5599 .headers()
5600 .get("authorization")
5601 .unwrap()
5602 .to_str()
5603 .unwrap();
5604 assert!(
5605 auth_header.starts_with("Basic "),
5606 "Expected Basic auth header, got: {auth_header}"
5607 );
5608 }
5609
5610 #[tokio::test]
5611 async fn put_events_with_api_destination_target_resolves_destination() {
5612 let state = Arc::new(RwLock::new(
5616 fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
5617 ));
5618 let delivery = Arc::new(DeliveryBus::new());
5619 let svc = EventBridgeService::new(state, delivery);
5620
5621 create_connection(&svc, "my-conn");
5623 let conn_arn = {
5624 let _mas = svc.state.read();
5625 let state = _mas.default_ref();
5626 state.connections.get("my-conn").unwrap().arn.clone()
5627 };
5628 let req = make_request(
5629 "CreateApiDestination",
5630 json!({
5631 "Name": "my-dest",
5632 "ConnectionArn": conn_arn,
5633 "InvocationEndpoint": "http://127.0.0.1:1/noop",
5634 "HttpMethod": "POST"
5635 }),
5636 );
5637 svc.create_api_destination(&req).unwrap();
5638
5639 let dest_arn = {
5640 let _mas = svc.state.read();
5641 let state = _mas.default_ref();
5642 state.api_destinations.get("my-dest").unwrap().arn.clone()
5643 };
5644
5645 let req = make_request(
5647 "PutRule",
5648 json!({
5649 "Name": "api-dest-rule",
5650 "EventPattern": r#"{"source":["test.app"]}"#,
5651 "State": "ENABLED"
5652 }),
5653 );
5654 svc.put_rule(&req).unwrap();
5655
5656 let req = make_request(
5657 "PutTargets",
5658 json!({
5659 "Rule": "api-dest-rule",
5660 "Targets": [{ "Id": "dest-target", "Arn": dest_arn }]
5661 }),
5662 );
5663 svc.put_targets(&req).unwrap();
5664
5665 let req = make_request(
5667 "PutEvents",
5668 json!({
5669 "Entries": [{
5670 "Source": "test.app",
5671 "DetailType": "TestEvent",
5672 "Detail": r#"{"key":"value"}"#
5673 }]
5674 }),
5675 );
5676 let resp = svc.put_events(&req).unwrap();
5677 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5678 assert_eq!(body["FailedEntryCount"], 0);
5679 assert_eq!(body["Entries"].as_array().unwrap().len(), 1);
5680 assert!(body["Entries"][0]["EventId"].as_str().is_some());
5681 }
5682
5683 #[test]
5684 fn test_function_name_from_arn() {
5685 assert_eq!(
5687 super::function_name_from_arn("arn:aws:lambda:us-east-1:123456789012:function:my-func"),
5688 "my-func"
5689 );
5690 assert_eq!(
5692 super::function_name_from_arn(
5693 "arn:aws:lambda:us-east-1:123456789012:function:my-func:prod"
5694 ),
5695 "my-func"
5696 );
5697 assert_eq!(
5699 super::function_name_from_arn(
5700 "arn:aws:lambda:us-east-1:123456789012:function:my-func:42"
5701 ),
5702 "my-func"
5703 );
5704 assert_eq!(super::function_name_from_arn("my-func"), "my-func");
5706 }
5707
5708 fn put_rule_simple(svc: &EventBridgeService, name: &str) {
5711 let req = make_request(
5712 "PutRule",
5713 json!({ "Name": name, "EventPattern": r#"{"source":["a"]}"# }),
5714 );
5715 svc.put_rule(&req).unwrap();
5716 }
5717
5718 #[test]
5719 fn put_rule_persists_event_pattern_and_state() {
5720 let svc = make_service();
5721 put_rule_simple(&svc, "r1");
5722 let _mas = svc.state.read();
5723 let state = _mas.default_ref();
5724 let rule = state
5725 .rules
5726 .get(&("default".to_string(), "r1".to_string()))
5727 .unwrap();
5728 assert_eq!(rule.state, "ENABLED");
5729 assert!(rule.event_pattern.is_some());
5730 assert!(rule.arn.contains("rule/r1"));
5731 }
5732
5733 #[test]
5734 fn put_rule_rejects_schedule_on_non_default_bus() {
5735 let svc = make_service();
5736 let bus_req = make_request("CreateEventBus", json!({ "Name": "custom" }));
5738 svc.create_event_bus(&bus_req).unwrap();
5739
5740 let req = make_request(
5741 "PutRule",
5742 json!({
5743 "Name": "r1",
5744 "EventBusName": "custom",
5745 "ScheduleExpression": "rate(5 minutes)"
5746 }),
5747 );
5748 let err = svc.put_rule(&req).err().expect("expected error");
5749 assert_eq!(err.code(), "ValidationException");
5750 }
5751
5752 #[test]
5753 fn put_rule_rejects_unknown_event_bus() {
5754 let svc = make_service();
5755 let req = make_request(
5756 "PutRule",
5757 json!({ "Name": "r1", "EventBusName": "ghost", "EventPattern": r#"{"source":["a"]}"# }),
5758 );
5759 let err = svc.put_rule(&req).err().expect("expected error");
5760 assert_eq!(err.code(), "ResourceNotFoundException");
5761 }
5762
5763 #[test]
5764 fn put_rule_overlay_preserves_existing_targets() {
5765 let svc = make_service();
5766 put_rule_simple(&svc, "r1");
5767 {
5769 let mut _mas = svc.state.write();
5770 let state = _mas.default_mut();
5771 let rule = state
5772 .rules
5773 .get_mut(&("default".to_string(), "r1".to_string()))
5774 .unwrap();
5775 rule.targets.push(crate::state::EventTarget {
5776 id: "t1".to_string(),
5777 arn: "arn:aws:sqs:us-east-1:123456789012:q".to_string(),
5778 input: None,
5779 input_path: None,
5780 input_transformer: None,
5781 sqs_parameters: None,
5782 });
5783 }
5784
5785 let req = make_request(
5787 "PutRule",
5788 json!({ "Name": "r1", "Description": "updated", "EventPattern": r#"{"source":["a"]}"# }),
5789 );
5790 svc.put_rule(&req).unwrap();
5791 let _mas = svc.state.read();
5792 let state = _mas.default_ref();
5793 let rule = state
5794 .rules
5795 .get(&("default".to_string(), "r1".to_string()))
5796 .unwrap();
5797 assert_eq!(rule.description.as_deref(), Some("updated"));
5798 assert_eq!(rule.targets.len(), 1);
5799 }
5800
5801 #[test]
5802 fn delete_rule_with_targets_errors() {
5803 let svc = make_service();
5804 put_rule_simple(&svc, "r1");
5805 let put_targets_req = make_request(
5806 "PutTargets",
5807 json!({
5808 "Rule": "r1",
5809 "Targets": [{ "Id": "t1", "Arn": "arn:aws:sqs:us-east-1:123456789012:q" }]
5810 }),
5811 );
5812 svc.put_targets(&put_targets_req).unwrap();
5813
5814 let req = make_request("DeleteRule", json!({ "Name": "r1" }));
5815 let err = svc.delete_rule(&req).err().expect("expected error");
5816 assert_eq!(err.code(), "ValidationException");
5817 }
5818
5819 #[test]
5820 fn delete_rule_after_remove_targets_succeeds() {
5821 let svc = make_service();
5822 put_rule_simple(&svc, "r1");
5823 let put_t = make_request(
5824 "PutTargets",
5825 json!({
5826 "Rule": "r1",
5827 "Targets": [{ "Id": "t1", "Arn": "arn:aws:sqs:us-east-1:123456789012:q" }]
5828 }),
5829 );
5830 svc.put_targets(&put_t).unwrap();
5831 let rm_t = make_request("RemoveTargets", json!({ "Rule": "r1", "Ids": ["t1"] }));
5832 svc.remove_targets(&rm_t).unwrap();
5833 let del = make_request("DeleteRule", json!({ "Name": "r1" }));
5834 svc.delete_rule(&del).unwrap();
5835 assert!(!svc
5836 .state
5837 .read()
5838 .default_ref()
5839 .rules
5840 .contains_key(&("default".to_string(), "r1".to_string())));
5841 }
5842
5843 #[test]
5844 fn enable_disable_rule_toggles_state() {
5845 let svc = make_service();
5846 put_rule_simple(&svc, "r1");
5847 let dis = make_request("DisableRule", json!({ "Name": "r1" }));
5848 svc.disable_rule(&dis).unwrap();
5849 assert_eq!(
5850 svc.state
5851 .read()
5852 .default_ref()
5853 .rules
5854 .get(&("default".to_string(), "r1".to_string()))
5855 .unwrap()
5856 .state,
5857 "DISABLED"
5858 );
5859 let en = make_request("EnableRule", json!({ "Name": "r1" }));
5860 svc.enable_rule(&en).unwrap();
5861 assert_eq!(
5862 svc.state
5863 .read()
5864 .default_ref()
5865 .rules
5866 .get(&("default".to_string(), "r1".to_string()))
5867 .unwrap()
5868 .state,
5869 "ENABLED"
5870 );
5871 }
5872
5873 #[test]
5874 fn enable_rule_unknown_errors() {
5875 let svc = make_service();
5876 let req = make_request("EnableRule", json!({ "Name": "ghost" }));
5877 let err = svc.enable_rule(&req).err().expect("expected error");
5878 assert_eq!(err.code(), "ResourceNotFoundException");
5879 }
5880
5881 #[test]
5882 fn list_rules_with_name_prefix_filter() {
5883 let svc = make_service();
5884 put_rule_simple(&svc, "prod-orders");
5885 put_rule_simple(&svc, "prod-shipping");
5886 put_rule_simple(&svc, "dev-orders");
5887
5888 let req = make_request("ListRules", json!({ "NamePrefix": "prod-" }));
5889 let resp = svc.list_rules(&req).unwrap();
5890 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5891 let names: Vec<&str> = body["Rules"]
5892 .as_array()
5893 .unwrap()
5894 .iter()
5895 .map(|r| r["Name"].as_str().unwrap())
5896 .collect();
5897 assert_eq!(names.len(), 2);
5898 assert!(names.iter().all(|n| n.starts_with("prod-")));
5899 }
5900
5901 #[test]
5902 fn list_rules_pagination_emits_next_token() {
5903 let svc = make_service();
5904 for i in 0..5 {
5905 put_rule_simple(&svc, &format!("r{i}"));
5906 }
5907 let req = make_request("ListRules", json!({ "Limit": 2 }));
5908 let resp = svc.list_rules(&req).unwrap();
5909 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5910 assert_eq!(body["Rules"].as_array().unwrap().len(), 2);
5911 assert!(body["NextToken"].is_string());
5912 }
5913
5914 #[test]
5915 fn describe_rule_returns_persisted_fields() {
5916 let svc = make_service();
5917 let put = make_request(
5918 "PutRule",
5919 json!({
5920 "Name": "r1",
5921 "EventPattern": r#"{"source":["a"]}"#,
5922 "Description": "hi",
5923 "State": "DISABLED"
5924 }),
5925 );
5926 svc.put_rule(&put).unwrap();
5927 let desc = make_request("DescribeRule", json!({ "Name": "r1" }));
5928 let resp = svc.describe_rule(&desc).unwrap();
5929 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5930 assert_eq!(body["Name"], json!("r1"));
5931 assert_eq!(body["State"], json!("DISABLED"));
5932 assert_eq!(body["Description"], json!("hi"));
5933 }
5934
5935 #[test]
5936 fn describe_rule_unknown_errors() {
5937 let svc = make_service();
5938 let req = make_request("DescribeRule", json!({ "Name": "ghost" }));
5939 let err = svc.describe_rule(&req).err().expect("expected error");
5940 assert_eq!(err.code(), "ResourceNotFoundException");
5941 }
5942
5943 #[test]
5944 fn put_targets_rejects_fifo_without_sqs_parameters() {
5945 let svc = make_service();
5946 put_rule_simple(&svc, "r1");
5947 let req = make_request(
5948 "PutTargets",
5949 json!({
5950 "Rule": "r1",
5951 "Targets": [{ "Id": "t1", "Arn": "arn:aws:sqs:us-east-1:123456789012:q.fifo" }]
5952 }),
5953 );
5954 let err = svc.put_targets(&req).err().expect("expected error");
5955 assert_eq!(err.code(), "ValidationException");
5956 }
5957
5958 #[test]
5959 fn put_targets_rejects_invalid_arn() {
5960 let svc = make_service();
5961 put_rule_simple(&svc, "r1");
5962 let req = make_request(
5963 "PutTargets",
5964 json!({
5965 "Rule": "r1",
5966 "Targets": [{ "Id": "t1", "Arn": "not-an-arn" }]
5967 }),
5968 );
5969 let err = svc.put_targets(&req).err().expect("expected error");
5970 assert_eq!(err.code(), "ValidationException");
5971 }
5972
5973 #[test]
5974 fn put_targets_unknown_rule_errors() {
5975 let svc = make_service();
5976 let req = make_request(
5977 "PutTargets",
5978 json!({
5979 "Rule": "ghost",
5980 "Targets": [{ "Id": "t1", "Arn": "arn:aws:sqs:us-east-1:123456789012:q" }]
5981 }),
5982 );
5983 let err = svc.put_targets(&req).err().expect("expected error");
5984 assert_eq!(err.code(), "ResourceNotFoundException");
5985 }
5986
5987 #[test]
5988 fn put_targets_replaces_existing_with_same_id() {
5989 let svc = make_service();
5990 put_rule_simple(&svc, "r1");
5991 let first = make_request(
5992 "PutTargets",
5993 json!({
5994 "Rule": "r1",
5995 "Targets": [{ "Id": "t1", "Arn": "arn:aws:sqs:us-east-1:123456789012:q1" }]
5996 }),
5997 );
5998 svc.put_targets(&first).unwrap();
5999 let second = make_request(
6000 "PutTargets",
6001 json!({
6002 "Rule": "r1",
6003 "Targets": [{ "Id": "t1", "Arn": "arn:aws:sqs:us-east-1:123456789012:q2" }]
6004 }),
6005 );
6006 svc.put_targets(&second).unwrap();
6007
6008 let _mas = svc.state.read();
6009 let state = _mas.default_ref();
6010 let rule = state
6011 .rules
6012 .get(&("default".to_string(), "r1".to_string()))
6013 .unwrap();
6014 assert_eq!(rule.targets.len(), 1);
6015 assert!(rule.targets[0].arn.ends_with("q2"));
6016 }
6017
6018 #[test]
6019 fn list_targets_by_rule_returns_pagination_token() {
6020 let svc = make_service();
6021 put_rule_simple(&svc, "r1");
6022 for i in 0..4 {
6023 let req = make_request(
6024 "PutTargets",
6025 json!({
6026 "Rule": "r1",
6027 "Targets": [{
6028 "Id": format!("t{i}"),
6029 "Arn": format!("arn:aws:sqs:us-east-1:123456789012:q{i}")
6030 }]
6031 }),
6032 );
6033 svc.put_targets(&req).unwrap();
6034 }
6035 let req = make_request("ListTargetsByRule", json!({ "Rule": "r1", "Limit": 2 }));
6036 let resp = svc.list_targets_by_rule(&req).unwrap();
6037 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6038 assert_eq!(body["Targets"].as_array().unwrap().len(), 2);
6039 assert!(body["NextToken"].is_string());
6040 }
6041
6042 #[test]
6043 fn list_rule_names_by_target_groups_by_arn() {
6044 let svc = make_service();
6045 put_rule_simple(&svc, "r1");
6046 put_rule_simple(&svc, "r2");
6047 for rule in ["r1", "r2"] {
6048 let req = make_request(
6049 "PutTargets",
6050 json!({
6051 "Rule": rule,
6052 "Targets": [{
6053 "Id": "t1",
6054 "Arn": "arn:aws:sqs:us-east-1:123456789012:shared"
6055 }]
6056 }),
6057 );
6058 svc.put_targets(&req).unwrap();
6059 }
6060 let req = make_request(
6061 "ListRuleNamesByTarget",
6062 json!({ "TargetArn": "arn:aws:sqs:us-east-1:123456789012:shared" }),
6063 );
6064 let resp = svc.list_rule_names_by_target(&req).unwrap();
6065 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6066 let names: Vec<&str> = body["RuleNames"]
6067 .as_array()
6068 .unwrap()
6069 .iter()
6070 .map(|v| v.as_str().unwrap())
6071 .collect();
6072 assert_eq!(names, vec!["r1", "r2"]);
6073 }
6074
6075 #[test]
6078 fn tag_then_list_tags_for_rule() {
6079 let svc = make_service();
6080 put_rule_simple(&svc, "r1");
6081 let arn = svc
6082 .state
6083 .read()
6084 .default_ref()
6085 .rules
6086 .get(&("default".to_string(), "r1".to_string()))
6087 .unwrap()
6088 .arn
6089 .clone();
6090
6091 let tag_req = make_request(
6092 "TagResource",
6093 json!({
6094 "ResourceARN": arn,
6095 "Tags": [{ "Key": "env", "Value": "prod" }]
6096 }),
6097 );
6098 svc.tag_resource(&tag_req).unwrap();
6099
6100 let list_req = make_request("ListTagsForResource", json!({ "ResourceARN": arn }));
6101 let resp = svc.list_tags_for_resource(&list_req).unwrap();
6102 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6103 let tags = body["Tags"].as_array().unwrap();
6104 assert_eq!(tags.len(), 1);
6105 assert_eq!(tags[0]["Key"], json!("env"));
6106 assert_eq!(tags[0]["Value"], json!("prod"));
6107 }
6108
6109 #[test]
6110 fn untag_resource_removes_listed_keys() {
6111 let svc = make_service();
6112 put_rule_simple(&svc, "r1");
6113 let arn = svc
6114 .state
6115 .read()
6116 .default_ref()
6117 .rules
6118 .get(&("default".to_string(), "r1".to_string()))
6119 .unwrap()
6120 .arn
6121 .clone();
6122 let tag_req = make_request(
6123 "TagResource",
6124 json!({
6125 "ResourceARN": &arn,
6126 "Tags": [{ "Key": "env", "Value": "prod" }, { "Key": "team", "Value": "core" }]
6127 }),
6128 );
6129 svc.tag_resource(&tag_req).unwrap();
6130
6131 let untag = make_request(
6132 "UntagResource",
6133 json!({ "ResourceARN": &arn, "TagKeys": ["env"] }),
6134 );
6135 svc.untag_resource(&untag).unwrap();
6136
6137 let _mas = svc.state.read();
6138 let state = _mas.default_ref();
6139 let rule = state
6140 .rules
6141 .get(&("default".to_string(), "r1".to_string()))
6142 .unwrap();
6143 assert!(!rule.tags.contains_key("env"));
6144 assert_eq!(rule.tags.get("team").map(String::as_str), Some("core"));
6145 }
6146
6147 #[test]
6150 fn test_event_pattern_returns_result_field() {
6151 let svc = make_service();
6152 let req = make_request(
6153 "TestEventPattern",
6154 json!({
6155 "EventPattern": r#"{"source":["my.app"]}"#,
6156 "Event": r#"{"source":"my.app","detail-type":"x","detail":{}}"#
6157 }),
6158 );
6159 let resp = svc.test_event_pattern(&req).unwrap();
6160 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6161 assert_eq!(body["Result"], json!(true));
6162 }
6163
6164 #[test]
6167 fn describe_event_bus_default_returns_arn() {
6168 let svc = make_service();
6169 let req = make_request("DescribeEventBus", json!({}));
6170 let resp = svc.describe_event_bus(&req).unwrap();
6171 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6172 assert_eq!(body["Name"], json!("default"));
6173 assert!(body["Arn"].as_str().unwrap().contains("event-bus/default"));
6174 }
6175
6176 #[test]
6177 fn delete_event_bus_default_fails() {
6178 let svc = make_service();
6179 let req = make_request("DeleteEventBus", json!({ "Name": "default" }));
6180 let err = svc.delete_event_bus(&req).err().expect("expected error");
6181 assert_eq!(err.code(), "ValidationException");
6182 }
6183
6184 #[test]
6187 fn describe_rule_not_found() {
6188 let svc = make_service();
6189 let req = make_request("DescribeRule", json!({"Name": "nonexistent"}));
6190 let err = svc.describe_rule(&req).err().expect("expected error");
6191 assert_eq!(err.code(), "ResourceNotFoundException");
6192 }
6193
6194 #[test]
6195 fn delete_rule_nonexistent_is_noop() {
6196 let svc = make_service();
6197 let req = make_request("DeleteRule", json!({"Name": "nope"}));
6198 svc.delete_rule(&req).unwrap();
6200 }
6201
6202 #[test]
6203 fn put_targets_rule_not_found() {
6204 let svc = make_service();
6205 let req = make_request(
6206 "PutTargets",
6207 json!({"Rule": "ghost", "Targets": [{"Id": "t1", "Arn": "arn:a"}]}),
6208 );
6209 let err = svc.put_targets(&req).err().expect("expected error");
6210 assert_eq!(err.code(), "ResourceNotFoundException");
6211 }
6212
6213 #[test]
6214 fn remove_targets_rule_not_found() {
6215 let svc = make_service();
6216 let req = make_request("RemoveTargets", json!({"Rule": "ghost", "Ids": ["t1"]}));
6217 let err = svc.remove_targets(&req).err().expect("expected error");
6218 assert_eq!(err.code(), "ResourceNotFoundException");
6219 }
6220
6221 #[test]
6222 fn list_targets_by_rule_not_found() {
6223 let svc = make_service();
6224 let req = make_request("ListTargetsByRule", json!({"Rule": "ghost"}));
6225 let err = svc
6226 .list_targets_by_rule(&req)
6227 .err()
6228 .expect("expected error");
6229 assert_eq!(err.code(), "ResourceNotFoundException");
6230 }
6231
6232 #[test]
6233 fn enable_rule_not_found() {
6234 let svc = make_service();
6235 let req = make_request("EnableRule", json!({"Name": "ghost"}));
6236 let err = svc.enable_rule(&req).err().expect("expected error");
6237 assert_eq!(err.code(), "ResourceNotFoundException");
6238 }
6239
6240 #[test]
6241 fn disable_rule_not_found() {
6242 let svc = make_service();
6243 let req = make_request("DisableRule", json!({"Name": "ghost"}));
6244 let err = svc.disable_rule(&req).err().expect("expected error");
6245 assert_eq!(err.code(), "ResourceNotFoundException");
6246 }
6247
6248 #[test]
6249 fn describe_event_bus_not_found() {
6250 let svc = make_service();
6251 let req = make_request("DescribeEventBus", json!({"Name": "nonexistent-bus"}));
6252 let err = svc.describe_event_bus(&req).err().expect("expected error");
6253 assert_eq!(err.code(), "ResourceNotFoundException");
6254 }
6255
6256 #[test]
6257 fn tag_resource_not_found() {
6258 let svc = make_service();
6259 let req = make_request(
6260 "TagResource",
6261 json!({"ResourceARN": "arn:aws:events:us-east-1:123:nope", "Tags": [{"Key": "k", "Value": "v"}]}),
6262 );
6263 let err = svc.tag_resource(&req).err().expect("expected error");
6264 assert_eq!(err.code(), "ResourceNotFoundException");
6265 }
6266
6267 #[test]
6268 fn untag_resource_not_found() {
6269 let svc = make_service();
6270 let req = make_request(
6271 "UntagResource",
6272 json!({"ResourceARN": "arn:aws:events:us-east-1:123:nope", "TagKeys": ["k"]}),
6273 );
6274 let err = svc.untag_resource(&req).err().expect("expected error");
6275 assert_eq!(err.code(), "ResourceNotFoundException");
6276 }
6277
6278 #[test]
6279 fn describe_archive_not_found() {
6280 let svc = make_service();
6281 let req = make_request("DescribeArchive", json!({"ArchiveName": "ghost"}));
6282 let err = svc.describe_archive(&req).err().expect("expected error");
6283 assert_eq!(err.code(), "ResourceNotFoundException");
6284 }
6285
6286 #[test]
6287 fn delete_archive_not_found() {
6288 let svc = make_service();
6289 let req = make_request("DeleteArchive", json!({"ArchiveName": "ghost"}));
6290 let err = svc.delete_archive(&req).err().expect("expected error");
6291 assert_eq!(err.code(), "ResourceNotFoundException");
6292 }
6293
6294 #[test]
6295 fn describe_connection_not_found() {
6296 let svc = make_service();
6297 let req = make_request("DescribeConnection", json!({"Name": "ghost"}));
6298 let err = svc.describe_connection(&req).err().expect("expected error");
6299 assert_eq!(err.code(), "ResourceNotFoundException");
6300 }
6301
6302 #[test]
6303 fn describe_api_destination_not_found() {
6304 let svc = make_service();
6305 let req = make_request("DescribeApiDestination", json!({"Name": "ghost"}));
6306 let err = svc
6307 .describe_api_destination(&req)
6308 .err()
6309 .expect("expected error");
6310 assert_eq!(err.code(), "ResourceNotFoundException");
6311 }
6312
6313 #[test]
6314 fn describe_replay_not_found() {
6315 let svc = make_service();
6316 let req = make_request("DescribeReplay", json!({"ReplayName": "ghost"}));
6317 let err = svc.describe_replay(&req).err().expect("expected error");
6318 assert_eq!(err.code(), "ResourceNotFoundException");
6319 }
6320
6321 #[test]
6322 fn create_event_bus_duplicate() {
6323 let svc = make_service();
6324 let req = make_request("CreateEventBus", json!({"Name": "dup-bus"}));
6325 svc.create_event_bus(&req).unwrap();
6326 let err = svc.create_event_bus(&req).err().expect("expected error");
6327 assert_eq!(err.code(), "ResourceAlreadyExistsException");
6328 }
6329
6330 #[test]
6333 fn rule_put_describe_enable_disable_delete() {
6334 let svc = make_service();
6335 svc.put_rule(&make_request(
6336 "PutRule",
6337 json!({"Name": "my-rule", "EventPattern": "{\"source\":[\"aws.s3\"]}", "State": "ENABLED"}),
6338 ))
6339 .unwrap();
6340
6341 let resp = svc
6342 .describe_rule(&make_request("DescribeRule", json!({"Name": "my-rule"})))
6343 .unwrap();
6344 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6345 assert_eq!(body["State"], "ENABLED");
6346
6347 svc.disable_rule(&make_request("DisableRule", json!({"Name": "my-rule"})))
6348 .unwrap();
6349 svc.enable_rule(&make_request("EnableRule", json!({"Name": "my-rule"})))
6350 .unwrap();
6351 svc.delete_rule(&make_request("DeleteRule", json!({"Name": "my-rule"})))
6352 .unwrap();
6353 }
6354
6355 #[test]
6356 fn list_rules_returns_created() {
6357 let svc = make_service();
6358 for name in &["r1", "r2", "r3"] {
6359 svc.put_rule(&make_request(
6360 "PutRule",
6361 json!({"Name": name, "EventPattern": "{\"source\":[\"aws.s3\"]}"}),
6362 ))
6363 .unwrap();
6364 }
6365 let resp = svc
6366 .list_rules(&make_request("ListRules", json!({})))
6367 .unwrap();
6368 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6369 assert_eq!(body["Rules"].as_array().unwrap().len(), 3);
6370 }
6371
6372 #[test]
6375 fn put_list_remove_targets() {
6376 let svc = make_service();
6377 svc.put_rule(&make_request(
6378 "PutRule",
6379 json!({"Name": "tr", "EventPattern": "{\"source\":[\"aws.s3\"]}"}),
6380 ))
6381 .unwrap();
6382
6383 svc.put_targets(&make_request(
6384 "PutTargets",
6385 json!({
6386 "Rule": "tr",
6387 "Targets": [
6388 {"Id": "t1", "Arn": "arn:aws:sqs:us-east-1:123456789012:q1"},
6389 {"Id": "t2", "Arn": "arn:aws:lambda:us-east-1:123456789012:function:fn1"},
6390 ]
6391 }),
6392 ))
6393 .unwrap();
6394
6395 let resp = svc
6396 .list_targets_by_rule(&make_request("ListTargetsByRule", json!({"Rule": "tr"})))
6397 .unwrap();
6398 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6399 assert_eq!(body["Targets"].as_array().unwrap().len(), 2);
6400
6401 svc.remove_targets(&make_request(
6402 "RemoveTargets",
6403 json!({"Rule": "tr", "Ids": ["t1"]}),
6404 ))
6405 .unwrap();
6406
6407 let resp = svc
6408 .list_targets_by_rule(&make_request("ListTargetsByRule", json!({"Rule": "tr"})))
6409 .unwrap();
6410 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6411 assert_eq!(body["Targets"].as_array().unwrap().len(), 1);
6412 }
6413
6414 #[test]
6417 fn put_events_basic() {
6418 let svc = make_service();
6419 let resp = svc
6420 .put_events(&make_request(
6421 "PutEvents",
6422 json!({
6423 "Entries": [
6424 {"Source": "aws.s3", "DetailType": "Object Created", "Detail": "{}"},
6425 ]
6426 }),
6427 ))
6428 .unwrap();
6429 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6430 assert_eq!(body["FailedEntryCount"], 0);
6431 }
6432
6433 #[test]
6436 fn archive_create_describe_list_delete() {
6437 let svc = make_service();
6438
6439 svc.create_archive(&make_request(
6440 "CreateArchive",
6441 json!({
6442 "ArchiveName": "my-archive",
6443 "EventSourceArn": "arn:aws:events:us-east-1:123456789012:event-bus/default",
6444 }),
6445 ))
6446 .unwrap();
6447
6448 let resp = svc
6449 .describe_archive(&make_request(
6450 "DescribeArchive",
6451 json!({"ArchiveName": "my-archive"}),
6452 ))
6453 .unwrap();
6454 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6455 assert_eq!(body["ArchiveName"], "my-archive");
6456
6457 let resp = svc
6458 .list_archives(&make_request("ListArchives", json!({})))
6459 .unwrap();
6460 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6461 assert!(!body["Archives"].as_array().unwrap().is_empty());
6462
6463 svc.delete_archive(&make_request(
6464 "DeleteArchive",
6465 json!({"ArchiveName": "my-archive"}),
6466 ))
6467 .unwrap();
6468 }
6469
6470 #[test]
6473 fn connection_create_list_describe_deauthorize() {
6474 let svc = make_service();
6475
6476 svc.create_connection(&make_request(
6477 "CreateConnection",
6478 json!({
6479 "Name": "my-conn",
6480 "AuthorizationType": "API_KEY",
6481 "AuthParameters": {
6482 "ApiKeyAuthParameters": {"ApiKeyName": "x-key", "ApiKeyValue": "secret"}
6483 }
6484 }),
6485 ))
6486 .unwrap();
6487
6488 let resp = svc
6489 .list_connections(&make_request("ListConnections", json!({})))
6490 .unwrap();
6491 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6492 assert!(!body["Connections"].as_array().unwrap().is_empty());
6493
6494 svc.describe_connection(&make_request(
6495 "DescribeConnection",
6496 json!({"Name": "my-conn"}),
6497 ))
6498 .unwrap();
6499 svc.deauthorize_connection(&make_request(
6500 "DeauthorizeConnection",
6501 json!({"Name": "my-conn"}),
6502 ))
6503 .unwrap();
6504 }
6505
6506 #[test]
6509 fn list_event_buses_returns_default_and_custom() {
6510 let svc = make_service();
6511 svc.create_event_bus(&make_request(
6512 "CreateEventBus",
6513 json!({"Name": "custom-bus"}),
6514 ))
6515 .unwrap();
6516
6517 let resp = svc
6518 .list_event_buses(&make_request("ListEventBuses", json!({})))
6519 .unwrap();
6520 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6521 let names: Vec<&str> = body["EventBuses"]
6522 .as_array()
6523 .unwrap()
6524 .iter()
6525 .map(|v| v["Name"].as_str().unwrap())
6526 .collect();
6527 assert!(names.contains(&"default"));
6528 assert!(names.contains(&"custom-bus"));
6529 }
6530
6531 #[test]
6534 fn tag_list_untag_rule_resource() {
6535 let svc = make_service();
6536 svc.put_rule(&make_request(
6537 "PutRule",
6538 json!({"Name": "tagged-rule", "EventPattern": "{\"source\":[\"aws.s3\"]}"}),
6539 ))
6540 .unwrap();
6541
6542 let arn = "arn:aws:events:us-east-1:123456789012:rule/tagged-rule";
6543
6544 svc.tag_resource(&make_request(
6545 "TagResource",
6546 json!({"ResourceARN": arn, "Tags": [{"Key": "env", "Value": "prod"}]}),
6547 ))
6548 .unwrap();
6549
6550 let resp = svc
6551 .list_tags_for_resource(&make_request(
6552 "ListTagsForResource",
6553 json!({"ResourceARN": arn}),
6554 ))
6555 .unwrap();
6556 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6557 assert_eq!(body["Tags"].as_array().unwrap().len(), 1);
6558
6559 svc.untag_resource(&make_request(
6560 "UntagResource",
6561 json!({"ResourceARN": arn, "TagKeys": ["env"]}),
6562 ))
6563 .unwrap();
6564 }
6565
6566 #[test]
6569 fn put_permission_with_policy_json() {
6570 let svc = make_service();
6571 let policy = r#"{"Version":"2012-10-17","Statement":[]}"#;
6572 let req = make_request("PutPermission", json!({"Policy": policy}));
6573 svc.put_permission(&req).unwrap();
6574 }
6575
6576 #[test]
6577 fn put_permission_invalid_action_errors() {
6578 let svc = make_service();
6579 let req = make_request(
6580 "PutPermission",
6581 json!({
6582 "Action": "events:NotARealAction",
6583 "Principal": "123456789012",
6584 "StatementId": "s1"
6585 }),
6586 );
6587 assert!(svc.put_permission(&req).is_err());
6588 }
6589
6590 #[test]
6591 fn put_permission_unknown_bus_errors() {
6592 let svc = make_service();
6593 let req = make_request(
6594 "PutPermission",
6595 json!({
6596 "EventBusName": "missing",
6597 "Action": "events:PutEvents",
6598 "Principal": "123456789012",
6599 "StatementId": "s1"
6600 }),
6601 );
6602 assert!(svc.put_permission(&req).is_err());
6603 }
6604
6605 #[test]
6606 fn put_permission_add_and_remove_statement() {
6607 let svc = make_service();
6608 let req = make_request(
6609 "PutPermission",
6610 json!({
6611 "Action": "events:PutEvents",
6612 "Principal": "123456789012",
6613 "StatementId": "s1"
6614 }),
6615 );
6616 svc.put_permission(&req).unwrap();
6617
6618 let req = make_request("RemovePermission", json!({"StatementId": "s1"}));
6619 svc.remove_permission(&req).unwrap();
6620 }
6621
6622 #[test]
6623 fn remove_permission_remove_all_flag() {
6624 let svc = make_service();
6625 let req = make_request(
6626 "PutPermission",
6627 json!({
6628 "Action": "events:PutEvents",
6629 "Principal": "123456789012",
6630 "StatementId": "s1"
6631 }),
6632 );
6633 svc.put_permission(&req).unwrap();
6634
6635 let req = make_request("RemovePermission", json!({"RemoveAllPermissions": true}));
6636 svc.remove_permission(&req).unwrap();
6637 }
6638
6639 #[test]
6640 fn remove_permission_unknown_bus_errors() {
6641 let svc = make_service();
6642 let req = make_request(
6643 "RemovePermission",
6644 json!({"EventBusName": "missing", "StatementId": "s1"}),
6645 );
6646 assert!(svc.remove_permission(&req).is_err());
6647 }
6648
6649 #[test]
6650 fn remove_permission_no_policy_errors() {
6651 let svc = make_service();
6652 let req = make_request("RemovePermission", json!({"StatementId": "s1"}));
6653 assert!(svc.remove_permission(&req).is_err());
6654 }
6655
6656 #[test]
6657 fn remove_permission_unknown_statement_errors() {
6658 let svc = make_service();
6659 svc.put_permission(&make_request(
6660 "PutPermission",
6661 json!({
6662 "Action": "events:PutEvents",
6663 "Principal": "123456789012",
6664 "StatementId": "s1"
6665 }),
6666 ))
6667 .unwrap();
6668
6669 let req = make_request("RemovePermission", json!({"StatementId": "ghost"}));
6670 assert!(svc.remove_permission(&req).is_err());
6671 }
6672
6673 #[test]
6676 fn put_rule_missing_name_errors() {
6677 let svc = make_service();
6678 let req = make_request("PutRule", json!({}));
6679 assert!(svc.put_rule(&req).is_err());
6680 }
6681
6682 #[test]
6683 fn put_rule_name_too_long_errors() {
6684 let svc = make_service();
6685 let name = "x".repeat(65);
6686 let req = make_request("PutRule", json!({"Name": name}));
6687 assert!(svc.put_rule(&req).is_err());
6688 }
6689
6690 #[test]
6691 fn put_rule_invalid_state_errors() {
6692 let svc = make_service();
6693 let req = make_request("PutRule", json!({"Name": "r1", "State": "BOGUS"}));
6694 assert!(svc.put_rule(&req).is_err());
6695 }
6696
6697 #[test]
6700 fn create_connection_api_key_auth() {
6701 let svc = make_service();
6702 let req = make_request(
6703 "CreateConnection",
6704 json!({
6705 "Name": "conn-apikey",
6706 "AuthorizationType": "API_KEY",
6707 "AuthParameters": {
6708 "ApiKeyAuthParameters": {
6709 "ApiKeyName": "X-Api-Key",
6710 "ApiKeyValue": "secret"
6711 }
6712 }
6713 }),
6714 );
6715 svc.create_connection(&req).unwrap();
6716 }
6717
6718 #[test]
6719 fn create_connection_basic_auth() {
6720 let svc = make_service();
6721 let req = make_request(
6722 "CreateConnection",
6723 json!({
6724 "Name": "conn-basic",
6725 "AuthorizationType": "BASIC",
6726 "AuthParameters": {
6727 "BasicAuthParameters": {
6728 "Username": "u",
6729 "Password": "p"
6730 }
6731 }
6732 }),
6733 );
6734 svc.create_connection(&req).unwrap();
6735 }
6736
6737 #[test]
6738 fn create_connection_missing_name_errors() {
6739 let svc = make_service();
6740 let req = make_request("CreateConnection", json!({"AuthorizationType": "API_KEY"}));
6741 assert!(svc.create_connection(&req).is_err());
6742 }
6743
6744 #[test]
6745 fn create_connection_missing_auth_type_errors() {
6746 let svc = make_service();
6747 let req = make_request("CreateConnection", json!({"Name": "c-noauth"}));
6748 assert!(svc.create_connection(&req).is_err());
6749 }
6750
6751 #[test]
6752 fn delete_connection_not_found() {
6753 let svc = make_service();
6754 let req = make_request("DeleteConnection", json!({"Name": "ghost"}));
6755 assert!(svc.delete_connection(&req).is_err());
6756 }
6757
6758 #[test]
6761 fn create_api_destination_missing_name_errors() {
6762 let svc = make_service();
6763 let req = make_request(
6764 "CreateApiDestination",
6765 json!({
6766 "ConnectionArn": "arn:aws:events:us-east-1:123456789012:connection/c",
6767 "InvocationEndpoint": "https://example.com",
6768 "HttpMethod": "POST"
6769 }),
6770 );
6771 assert!(svc.create_api_destination(&req).is_err());
6772 }
6773
6774 #[test]
6775 fn create_api_destination_invalid_method_errors() {
6776 let svc = make_service();
6777 create_connection(&svc, "conn-m");
6778 let guard = svc.state.read();
6779 let st = guard.default_ref();
6780 let conn_arn = st
6781 .connections
6782 .get("conn-m")
6783 .map(|c| c.arn.clone())
6784 .unwrap_or_default();
6785 drop(guard);
6786
6787 let req = make_request(
6788 "CreateApiDestination",
6789 json!({
6790 "Name": "d1",
6791 "ConnectionArn": conn_arn,
6792 "InvocationEndpoint": "https://example.com",
6793 "HttpMethod": "FLY"
6794 }),
6795 );
6796 assert!(svc.create_api_destination(&req).is_err());
6797 }
6798
6799 #[test]
6800 fn delete_api_destination_not_found() {
6801 let svc = make_service();
6802 let req = make_request("DeleteApiDestination", json!({"Name": "ghost"}));
6803 assert!(svc.delete_api_destination(&req).is_err());
6804 }
6805
6806 #[test]
6809 fn create_archive_missing_name_errors() {
6810 let svc = make_service();
6811 let req = make_request(
6812 "CreateArchive",
6813 json!({"EventSourceArn": "arn:aws:events:us-east-1:123456789012:event-bus/default"}),
6814 );
6815 assert!(svc.create_archive(&req).is_err());
6816 }
6817
6818 #[test]
6819 fn create_archive_missing_source_arn_errors() {
6820 let svc = make_service();
6821 let req = make_request("CreateArchive", json!({"ArchiveName": "arc1"}));
6822 assert!(svc.create_archive(&req).is_err());
6823 }
6824
6825 #[test]
6826 fn delete_archive_missing_errors() {
6827 let svc = make_service();
6828 let req = make_request("DeleteArchive", json!({"ArchiveName": "ghost"}));
6829 assert!(svc.delete_archive(&req).is_err());
6830 }
6831
6832 #[test]
6835 fn cancel_replay_not_found() {
6836 let svc = make_service();
6837 let req = make_request("CancelReplay", json!({"ReplayName": "ghost"}));
6838 assert!(svc.cancel_replay(&req).is_err());
6839 }
6840
6841 #[test]
6844 fn put_events_empty_entries_errors() {
6845 let svc = make_service();
6846 let req = make_request("PutEvents", json!({"Entries": []}));
6847 assert!(svc.put_events(&req).is_err());
6848 }
6849
6850 #[test]
6851 fn put_events_success_count() {
6852 let svc = make_service();
6853 let req = make_request(
6854 "PutEvents",
6855 json!({
6856 "Entries": [
6857 {"Source": "my.app", "DetailType": "Test", "Detail": "{}"}
6858 ]
6859 }),
6860 );
6861 let resp = svc.put_events(&req).unwrap();
6862 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6863 assert_eq!(body["FailedEntryCount"], 0);
6864 assert_eq!(body["Entries"].as_array().unwrap().len(), 1);
6865 }
6866
6867 #[test]
6870 fn list_tags_for_resource_unknown_errors() {
6871 let svc = make_service();
6872 let req = make_request(
6873 "ListTagsForResource",
6874 json!({
6875 "ResourceARN": "arn:aws:events:us-east-1:123456789012:rule/ghost"
6876 }),
6877 );
6878 assert!(svc.list_tags_for_resource(&req).is_err());
6879 }
6880
6881 #[test]
6884 fn describe_rule_custom_bus() {
6885 let svc = make_service();
6886 svc.create_event_bus(&make_request("CreateEventBus", json!({"Name": "cb"})))
6887 .unwrap();
6888
6889 svc.put_rule(&make_request(
6890 "PutRule",
6891 json!({
6892 "Name": "r-cb",
6893 "EventPattern": "{\"source\":[\"aws.s3\"]}",
6894 "EventBusName": "cb"
6895 }),
6896 ))
6897 .unwrap();
6898
6899 let resp = svc
6900 .describe_rule(&make_request(
6901 "DescribeRule",
6902 json!({"Name": "r-cb", "EventBusName": "cb"}),
6903 ))
6904 .unwrap();
6905 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6906 assert_eq!(body["Name"], "r-cb");
6907 }
6908
6909 #[test]
6912 fn disable_rule_on_custom_bus() {
6913 let svc = make_service();
6914 svc.create_event_bus(&make_request("CreateEventBus", json!({"Name": "dcb"})))
6915 .unwrap();
6916 svc.put_rule(&make_request(
6917 "PutRule",
6918 json!({
6919 "Name": "r-d",
6920 "EventPattern": "{\"source\":[\"s\"]}",
6921 "EventBusName": "dcb"
6922 }),
6923 ))
6924 .unwrap();
6925 svc.disable_rule(&make_request(
6926 "DisableRule",
6927 json!({"Name": "r-d", "EventBusName": "dcb"}),
6928 ))
6929 .unwrap();
6930 }
6931
6932 #[test]
6935 fn describe_event_bus_custom() {
6936 let svc = make_service();
6937 svc.create_event_bus(&make_request("CreateEventBus", json!({"Name": "deb"})))
6938 .unwrap();
6939 let resp = svc
6940 .describe_event_bus(&make_request("DescribeEventBus", json!({"Name": "deb"})))
6941 .unwrap();
6942 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6943 assert_eq!(body["Name"], "deb");
6944 }
6945
6946 #[test]
6947 fn list_event_buses_with_name_prefix() {
6948 let svc = make_service();
6949 for name in &["dev-x", "dev-y", "prod-z"] {
6950 svc.create_event_bus(&make_request("CreateEventBus", json!({"Name": name})))
6951 .unwrap();
6952 }
6953 let resp = svc
6954 .list_event_buses(&make_request(
6955 "ListEventBuses",
6956 json!({"NamePrefix": "dev-"}),
6957 ))
6958 .unwrap();
6959 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6960 assert_eq!(body["EventBuses"].as_array().unwrap().len(), 2);
6961 }
6962
6963 #[test]
6964 fn list_rules_on_custom_bus() {
6965 let svc = make_service();
6966 svc.create_event_bus(&make_request("CreateEventBus", json!({"Name": "lrcb"})))
6967 .unwrap();
6968 svc.put_rule(&make_request(
6969 "PutRule",
6970 json!({
6971 "Name": "r1",
6972 "EventPattern": "{\"source\":[\"s\"]}",
6973 "EventBusName": "lrcb"
6974 }),
6975 ))
6976 .unwrap();
6977
6978 let resp = svc
6979 .list_rules(&make_request("ListRules", json!({"EventBusName": "lrcb"})))
6980 .unwrap();
6981 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6982 assert_eq!(body["Rules"].as_array().unwrap().len(), 1);
6983 }
6984
6985 #[test]
6988 fn put_targets_on_custom_bus() {
6989 let svc = make_service();
6990 svc.create_event_bus(&make_request("CreateEventBus", json!({"Name": "ptcb"})))
6991 .unwrap();
6992 svc.put_rule(&make_request(
6993 "PutRule",
6994 json!({
6995 "Name": "rt",
6996 "EventPattern": "{\"source\":[\"s\"]}",
6997 "EventBusName": "ptcb"
6998 }),
6999 ))
7000 .unwrap();
7001
7002 svc.put_targets(&make_request(
7003 "PutTargets",
7004 json!({
7005 "Rule": "rt",
7006 "EventBusName": "ptcb",
7007 "Targets": [{"Id": "t1", "Arn": "arn:aws:sqs:us-east-1:123456789012:q1"}]
7008 }),
7009 ))
7010 .unwrap();
7011 }
7012
7013 #[test]
7016 fn remove_targets_unknown_ids_returns_failed() {
7017 let svc = make_service();
7018 svc.put_rule(&make_request(
7019 "PutRule",
7020 json!({"Name": "rmt", "EventPattern": "{\"source\":[\"s\"]}"}),
7021 ))
7022 .unwrap();
7023
7024 let resp = svc
7025 .remove_targets(&make_request(
7026 "RemoveTargets",
7027 json!({"Rule": "rmt", "Ids": ["ghost1", "ghost2"]}),
7028 ))
7029 .unwrap();
7030 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
7031 assert!(body.is_object());
7033 }
7034
7035 #[test]
7036 fn describe_event_source_unknown_errors() {
7037 let svc = make_service();
7038 let req = make_request("DescribeEventSource", json!({"Name": "ghost"}));
7039 assert!(svc.describe_event_source(&req).is_err());
7040 }
7041
7042 #[test]
7043 fn describe_partner_event_source_unknown_errors() {
7044 let svc = make_service();
7045 let req = make_request("DescribePartnerEventSource", json!({"Name": "ghost"}));
7046 assert!(svc.describe_partner_event_source(&req).is_err());
7047 }
7048
7049 #[test]
7050 fn list_partner_event_sources_empty_ok() {
7051 let svc = make_service();
7052 let req = make_request(
7053 "ListPartnerEventSources",
7054 json!({"NamePrefix": "aws.partner"}),
7055 );
7056 let resp = svc.list_partner_event_sources(&req).unwrap();
7057 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
7058 assert!(body["PartnerEventSources"].is_array());
7059 }
7060
7061 #[test]
7062 fn list_event_sources_empty_ok() {
7063 let svc = make_service();
7064 let req = make_request("ListEventSources", json!({}));
7065 let resp = svc.list_event_sources(&req).unwrap();
7066 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
7067 assert!(body["EventSources"].is_array());
7068 }
7069
7070 #[test]
7071 fn update_connection_unknown_errors() {
7072 let svc = make_service();
7073 let req = make_request(
7074 "UpdateConnection",
7075 json!({"Name": "ghost", "AuthorizationType": "API_KEY"}),
7076 );
7077 assert!(svc.update_connection(&req).is_err());
7078 }
7079
7080 #[test]
7081 fn describe_api_destination_unknown_errors() {
7082 let svc = make_service();
7083 let req = make_request("DescribeApiDestination", json!({"Name": "ghost"}));
7084 assert!(svc.describe_api_destination(&req).is_err());
7085 }
7086
7087 #[test]
7088 fn update_api_destination_unknown_errors() {
7089 let svc = make_service();
7090 let req = make_request("UpdateApiDestination", json!({"Name": "ghost"}));
7091 assert!(svc.update_api_destination(&req).is_err());
7092 }
7093
7094 #[test]
7095 fn update_archive_unknown_errors() {
7096 let svc = make_service();
7097 let req = make_request("UpdateArchive", json!({"ArchiveName": "ghost"}));
7098 assert!(svc.update_archive(&req).is_err());
7099 }
7100
7101 #[test]
7102 fn describe_archive_unknown_errors_b() {
7103 let svc = make_service();
7104 let req = make_request("DescribeArchive", json!({"ArchiveName": "ghost"}));
7105 assert!(svc.describe_archive(&req).is_err());
7106 }
7107
7108 #[test]
7109 fn list_archives_empty_ok() {
7110 let svc = make_service();
7111 let req = make_request("ListArchives", json!({}));
7112 let resp = svc.list_archives(&req).unwrap();
7113 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
7114 assert!(body["Archives"].is_array());
7115 }
7116
7117 #[test]
7118 fn list_replays_empty_ok() {
7119 let svc = make_service();
7120 let req = make_request("ListReplays", json!({}));
7121 let resp = svc.list_replays(&req).unwrap();
7122 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
7123 assert!(body["Replays"].is_array());
7124 }
7125
7126 #[test]
7127 fn describe_endpoint_unknown_errors() {
7128 let svc = make_service();
7129 let req = make_request("DescribeEndpoint", json!({"Name": "ghost"}));
7130 assert!(svc.describe_endpoint(&req).is_err());
7131 }
7132
7133 #[test]
7134 fn delete_endpoint_unknown_errors() {
7135 let svc = make_service();
7136 let req = make_request("DeleteEndpoint", json!({"Name": "ghost"}));
7137 assert!(svc.delete_endpoint(&req).is_err());
7138 }
7139
7140 #[test]
7141 fn list_endpoints_empty_ok() {
7142 let svc = make_service();
7143 let req = make_request("ListEndpoints", json!({}));
7144 let resp = svc.list_endpoints(&req).unwrap();
7145 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
7146 assert!(body["Endpoints"].is_array());
7147 }
7148}