1use async_trait::async_trait;
2use chrono::{DateTime, Utc};
3use http::StatusCode;
4use serde_json::{json, Value};
5
6use std::collections::{BTreeMap, HashMap};
7use std::sync::Arc;
8
9use tokio::sync::Mutex as AsyncMutex;
10
11use fakecloud_aws::arn::Arn;
12use fakecloud_core::delivery::DeliveryBus;
13use fakecloud_core::pagination::paginate;
14use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
15use fakecloud_core::validation::*;
16use fakecloud_persistence::SnapshotStore;
17
18use fakecloud_lambda::runtime::ContainerRuntime;
19use fakecloud_lambda::{LambdaInvocation, SharedLambdaState};
20use fakecloud_logs::SharedLogsState;
21
22use crate::state::{
23 ApiDestination, Archive, Connection, Endpoint, EventBridgeSnapshot, EventBridgeState, EventBus,
24 EventRule, EventTarget, PartnerEventSource, PutEvent, Replay, SharedEventBridgeState,
25 EVENTBRIDGE_SNAPSHOT_SCHEMA_VERSION,
26};
27
28pub struct EventBridgeService {
29 state: SharedEventBridgeState,
30 delivery: Arc<DeliveryBus>,
31 lambda_state: Option<SharedLambdaState>,
32 logs_state: Option<SharedLogsState>,
33 container_runtime: Option<Arc<ContainerRuntime>>,
34 snapshot_store: Option<Arc<dyn SnapshotStore>>,
35 snapshot_lock: Arc<AsyncMutex<()>>,
36}
37
38impl EventBridgeService {
39 pub fn new(state: SharedEventBridgeState, delivery: Arc<DeliveryBus>) -> Self {
40 Self {
41 state,
42 delivery,
43 lambda_state: None,
44 logs_state: None,
45 container_runtime: None,
46 snapshot_store: None,
47 snapshot_lock: Arc::new(AsyncMutex::new(())),
48 }
49 }
50
51 pub fn with_lambda(mut self, lambda_state: SharedLambdaState) -> Self {
52 self.lambda_state = Some(lambda_state);
53 self
54 }
55
56 pub fn with_logs(mut self, logs_state: SharedLogsState) -> Self {
57 self.logs_state = Some(logs_state);
58 self
59 }
60
61 pub fn with_runtime(mut self, runtime: Arc<ContainerRuntime>) -> Self {
62 self.container_runtime = Some(runtime);
63 self
64 }
65
66 pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
67 self.snapshot_store = Some(store);
68 self
69 }
70
71 async fn save_snapshot(&self) {
75 let Some(store) = self.snapshot_store.clone() else {
76 return;
77 };
78 let _guard = self.snapshot_lock.lock().await;
79 let snapshot = EventBridgeSnapshot {
80 schema_version: EVENTBRIDGE_SNAPSHOT_SCHEMA_VERSION,
81 accounts: Some(self.state.read().clone()),
82 state: None,
83 };
84 let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
85 let bytes = serde_json::to_vec(&snapshot)
86 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
87 store.save(&bytes)
88 })
89 .await;
90 match join {
91 Ok(Ok(())) => {}
92 Ok(Err(err)) => tracing::error!(%err, "failed to write eventbridge snapshot"),
93 Err(err) => tracing::error!(%err, "eventbridge snapshot task panicked"),
94 }
95 }
96}
97
98#[async_trait]
99impl AwsService for EventBridgeService {
100 fn service_name(&self) -> &str {
101 "events"
102 }
103
104 async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
105 let mutates = is_mutating_action(req.action.as_str());
106 let result = match req.action.as_str() {
107 "CreateEventBus" => self.create_event_bus(&req),
108 "DeleteEventBus" => self.delete_event_bus(&req),
109 "ListEventBuses" => self.list_event_buses(&req),
110 "DescribeEventBus" => self.describe_event_bus(&req),
111 "PutRule" => self.put_rule(&req),
112 "DeleteRule" => self.delete_rule(&req),
113 "ListRules" => self.list_rules(&req),
114 "DescribeRule" => self.describe_rule(&req),
115 "EnableRule" => self.enable_rule(&req),
116 "DisableRule" => self.disable_rule(&req),
117 "PutTargets" => self.put_targets(&req),
118 "RemoveTargets" => self.remove_targets(&req),
119 "ListTargetsByRule" => self.list_targets_by_rule(&req),
120 "ListRuleNamesByTarget" => self.list_rule_names_by_target(&req),
121 "PutEvents" => self.put_events(&req),
122 "PutPermission" => self.put_permission(&req),
123 "RemovePermission" => self.remove_permission(&req),
124 "TagResource" => self.tag_resource(&req),
125 "UntagResource" => self.untag_resource(&req),
126 "ListTagsForResource" => self.list_tags_for_resource(&req),
127 "CreateArchive" => self.create_archive(&req),
128 "DescribeArchive" => self.describe_archive(&req),
129 "ListArchives" => self.list_archives(&req),
130 "UpdateArchive" => self.update_archive(&req),
131 "DeleteArchive" => self.delete_archive(&req),
132 "CreateConnection" => self.create_connection(&req),
133 "DescribeConnection" => self.describe_connection(&req),
134 "ListConnections" => self.list_connections(&req),
135 "UpdateConnection" => self.update_connection(&req),
136 "DeleteConnection" => self.delete_connection(&req),
137 "CreateApiDestination" => self.create_api_destination(&req),
138 "DescribeApiDestination" => self.describe_api_destination(&req),
139 "ListApiDestinations" => self.list_api_destinations(&req),
140 "UpdateApiDestination" => self.update_api_destination(&req),
141 "DeleteApiDestination" => self.delete_api_destination(&req),
142 "StartReplay" => self.start_replay(&req),
143 "DescribeReplay" => self.describe_replay(&req),
144 "ListReplays" => self.list_replays(&req),
145 "CancelReplay" => self.cancel_replay(&req),
146 "CreatePartnerEventSource" => self.create_partner_event_source(&req),
147 "DeletePartnerEventSource" => self.delete_partner_event_source(&req),
148 "DescribePartnerEventSource" => self.describe_partner_event_source(&req),
149 "ListPartnerEventSources" => self.list_partner_event_sources(&req),
150 "ListPartnerEventSourceAccounts" => self.list_partner_event_source_accounts(&req),
151 "ActivateEventSource" => self.activate_event_source(&req),
152 "DeactivateEventSource" => self.deactivate_event_source(&req),
153 "DescribeEventSource" => self.describe_event_source(&req),
154 "ListEventSources" => self.list_event_sources(&req),
155 "PutPartnerEvents" => self.put_partner_events(&req),
156 "TestEventPattern" => self.test_event_pattern(&req),
157 "UpdateEventBus" => self.update_event_bus(&req),
158 "CreateEndpoint" => self.create_endpoint(&req),
159 "DeleteEndpoint" => self.delete_endpoint(&req),
160 "DescribeEndpoint" => self.describe_endpoint(&req),
161 "ListEndpoints" => self.list_endpoints(&req),
162 "UpdateEndpoint" => self.update_endpoint(&req),
163 "DeauthorizeConnection" => self.deauthorize_connection(&req),
164 _ => Err(AwsServiceError::action_not_implemented(
165 "events",
166 &req.action,
167 )),
168 };
169 if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
170 self.save_snapshot().await;
171 }
172 result
173 }
174
175 fn supported_actions(&self) -> &[&str] {
176 &[
177 "CreateEventBus",
178 "DeleteEventBus",
179 "ListEventBuses",
180 "DescribeEventBus",
181 "PutRule",
182 "DeleteRule",
183 "ListRules",
184 "DescribeRule",
185 "EnableRule",
186 "DisableRule",
187 "PutTargets",
188 "RemoveTargets",
189 "ListTargetsByRule",
190 "ListRuleNamesByTarget",
191 "PutEvents",
192 "PutPermission",
193 "RemovePermission",
194 "TagResource",
195 "UntagResource",
196 "ListTagsForResource",
197 "CreateArchive",
198 "DescribeArchive",
199 "ListArchives",
200 "UpdateArchive",
201 "DeleteArchive",
202 "CreateConnection",
203 "DescribeConnection",
204 "ListConnections",
205 "UpdateConnection",
206 "DeleteConnection",
207 "CreateApiDestination",
208 "DescribeApiDestination",
209 "ListApiDestinations",
210 "UpdateApiDestination",
211 "DeleteApiDestination",
212 "StartReplay",
213 "DescribeReplay",
214 "ListReplays",
215 "CancelReplay",
216 "CreatePartnerEventSource",
217 "DeletePartnerEventSource",
218 "DescribePartnerEventSource",
219 "ListPartnerEventSources",
220 "ListPartnerEventSourceAccounts",
221 "ActivateEventSource",
222 "DeactivateEventSource",
223 "DescribeEventSource",
224 "ListEventSources",
225 "PutPartnerEvents",
226 "TestEventPattern",
227 "UpdateEventBus",
228 "CreateEndpoint",
229 "DeleteEndpoint",
230 "DescribeEndpoint",
231 "ListEndpoints",
232 "UpdateEndpoint",
233 "DeauthorizeConnection",
234 ]
235 }
236}
237
238impl EventBridgeService {
240 fn create_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
241 let body = req.json_body();
242 validate_required("Name", &body["Name"])?;
243 let name = body["Name"]
244 .as_str()
245 .ok_or_else(|| missing("Name"))?
246 .to_string();
247 validate_string_length("name", &name, 1, 256)?;
248 validate_optional_string_length(
249 "eventSourceName",
250 body["EventSourceName"].as_str(),
251 1,
252 256,
253 )?;
254 validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
255 validate_optional_string_length(
256 "kmsKeyIdentifier",
257 body["KmsKeyIdentifier"].as_str(),
258 0,
259 2048,
260 )?;
261
262 if name.contains('/') && !name.starts_with("aws.partner/") {
264 return Err(AwsServiceError::aws_error(
265 StatusCode::BAD_REQUEST,
266 "ValidationException",
267 "Event bus name must not contain '/'.",
268 ));
269 }
270
271 if name.starts_with("aws.partner/") {
273 let event_source = body["EventSourceName"].as_str().unwrap_or("");
274 let accounts_r = self.state.read();
275 let empty_r = EventBridgeState::new(&req.account_id, &req.region);
276 let state_r = accounts_r.get(&req.account_id).unwrap_or(&empty_r);
277 let has_source = state_r.partner_event_sources.contains_key(event_source);
278 drop(accounts_r);
279 if !has_source {
280 return Err(AwsServiceError::aws_error(
281 StatusCode::BAD_REQUEST,
282 "ResourceNotFoundException",
283 format!("Event source {event_source} does not exist."),
284 ));
285 }
286 }
287
288 let mut accounts = self.state.write();
289 let state = accounts.get_or_create(&req.account_id);
290
291 if state.buses.contains_key(&name) {
292 return Err(AwsServiceError::aws_error(
293 StatusCode::BAD_REQUEST,
294 "ResourceAlreadyExistsException",
295 format!("Event bus {name} already exists."),
296 ));
297 }
298
299 let arn = format!(
300 "arn:aws:events:{}:{}:event-bus/{}",
301 req.region, state.account_id, name
302 );
303 let now = Utc::now();
304 let description = body["Description"].as_str().map(|s| s.to_string());
305 let kms_key_identifier = body["KmsKeyIdentifier"].as_str().map(|s| s.to_string());
306 let dead_letter_config = body.get("DeadLetterConfig").cloned();
307
308 let tags = parse_tags(&body);
309
310 let bus = EventBus {
311 name: name.clone(),
312 arn: arn.clone(),
313 tags,
314 policy: None,
315 description,
316 kms_key_identifier,
317 dead_letter_config,
318 creation_time: now,
319 last_modified_time: now,
320 };
321 state.buses.insert(name, bus);
322
323 Ok(AwsResponse::ok_json(json!({ "EventBusArn": arn })))
324 }
325
326 fn delete_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
327 let body = req.json_body();
328 validate_required("Name", &body["Name"])?;
329 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
330 validate_string_length("name", name, 1, 256)?;
331
332 if name == "default" {
333 return Err(AwsServiceError::aws_error(
334 StatusCode::BAD_REQUEST,
335 "ValidationException",
336 format!("Cannot delete event bus {name}."),
337 ));
338 }
339
340 let mut accounts = self.state.write();
341 let state = accounts.get_or_create(&req.account_id);
342 state.buses.remove(name);
343 state.rules.retain(|k, _| k.0 != name);
344
345 Ok(AwsResponse::ok_json(json!({})))
346 }
347
348 fn list_event_buses(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
349 let body = req.json_body();
350 validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 256)?;
351 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
352 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
353 let name_prefix = body["NamePrefix"].as_str();
354 let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
355 if let Some(t) = body["NextToken"].as_str() {
356 t.parse::<usize>().map_err(|_| {
357 AwsServiceError::aws_error(
358 StatusCode::BAD_REQUEST,
359 "InvalidNextTokenException",
360 format!("Invalid NextToken value: '{t}'"),
361 )
362 })?;
363 }
364
365 let accounts = self.state.read();
366 let empty = EventBridgeState::new(&req.account_id, &req.region);
367 let state = accounts.get(&req.account_id).unwrap_or(&empty);
368 let filtered: Vec<&_> = state
369 .buses
370 .values()
371 .filter(|b| match name_prefix {
372 Some(prefix) => b.name.starts_with(prefix),
373 None => true,
374 })
375 .collect();
376
377 let (page, next_token) = paginate(&filtered, body["NextToken"].as_str(), limit);
378 let buses: Vec<Value> = page
379 .iter()
380 .map(|b| json!({ "Name": b.name, "Arn": b.arn }))
381 .collect();
382 let mut resp = json!({ "EventBuses": buses });
383 if let Some(token) = next_token {
384 resp["NextToken"] = json!(token);
385 }
386
387 Ok(AwsResponse::ok_json(resp))
388 }
389
390 fn describe_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
391 let body = req.json_body();
392 validate_optional_string_length("name", body["Name"].as_str(), 1, 1600)?;
393 let name = body["Name"].as_str().unwrap_or("default");
394
395 let accounts = self.state.read();
396 let empty = EventBridgeState::new(&req.account_id, &req.region);
397 let state = accounts.get(&req.account_id).unwrap_or(&empty);
398 let bus = state.buses.get(name).ok_or_else(|| {
399 AwsServiceError::aws_error(
400 StatusCode::BAD_REQUEST,
401 "ResourceNotFoundException",
402 format!("Event bus {name} does not exist."),
403 )
404 })?;
405
406 let mut resp = json!({
407 "Name": bus.name,
408 "Arn": bus.arn,
409 "CreationTime": bus.creation_time.timestamp() as f64,
410 "LastModifiedTime": bus.last_modified_time.timestamp() as f64,
411 });
412
413 if let Some(ref policy) = bus.policy {
414 resp["Policy"] = Value::String(serde_json::to_string(policy).unwrap());
415 }
416 if let Some(ref desc) = bus.description {
417 resp["Description"] = json!(desc);
418 }
419 if let Some(ref kms) = bus.kms_key_identifier {
420 resp["KmsKeyIdentifier"] = json!(kms);
421 }
422 if let Some(ref dlc) = bus.dead_letter_config {
423 resp["DeadLetterConfig"] = dlc.clone();
424 }
425
426 Ok(AwsResponse::ok_json(resp))
427 }
428
429 fn put_permission(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
432 let body = req.json_body();
433 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 256)?;
434 validate_optional_string_length("action", body["Action"].as_str(), 1, 64)?;
435 validate_optional_string_length("principal", body["Principal"].as_str(), 1, 12)?;
436 validate_optional_string_length("statementId", body["StatementId"].as_str(), 1, 64)?;
437 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
438
439 let mut accounts = self.state.write();
440 let state = accounts.get_or_create(&req.account_id);
441
442 let bus = state.buses.get_mut(event_bus_name).ok_or_else(|| {
443 AwsServiceError::aws_error(
444 StatusCode::BAD_REQUEST,
445 "ResourceNotFoundException",
446 format!("Event bus {event_bus_name} does not exist."),
447 )
448 })?;
449
450 if let Some(policy_str) = body["Policy"].as_str() {
452 if let Ok(policy) = serde_json::from_str::<Value>(policy_str) {
453 bus.policy = Some(policy);
454 return Ok(AwsResponse::ok_json(json!({})));
455 }
456 }
457
458 let action = body["Action"].as_str().unwrap_or("");
460 let principal = body["Principal"].as_str().unwrap_or("");
461 let statement_id = body["StatementId"].as_str().unwrap_or("");
462
463 if action != "events:PutEvents" {
465 return Err(AwsServiceError::aws_error(
466 StatusCode::BAD_REQUEST,
467 "ValidationException",
468 "Provided value in parameter 'action' is not supported.",
469 ));
470 }
471
472 let statement = json!({
473 "Sid": statement_id,
474 "Effect": "Allow",
475 "Principal": { "AWS": Arn::global("iam", principal, "root").to_string() },
476 "Action": action,
477 "Resource": bus.arn,
478 });
479
480 let policy = bus.policy.get_or_insert_with(|| {
481 json!({
482 "Version": "2012-10-17",
483 "Statement": [],
484 })
485 });
486
487 if let Some(stmts) = policy["Statement"].as_array_mut() {
488 stmts.push(statement);
489 }
490
491 Ok(AwsResponse::ok_json(json!({})))
492 }
493
494 fn remove_permission(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
495 let body = req.json_body();
496 validate_optional_string_length("statementId", body["StatementId"].as_str(), 1, 64)?;
497 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 256)?;
498 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
499 let statement_id = body["StatementId"].as_str().unwrap_or("");
500 let remove_all = body["RemoveAllPermissions"].as_bool().unwrap_or(false);
501
502 let mut accounts = self.state.write();
503 let state = accounts.get_or_create(&req.account_id);
504
505 let bus = state.buses.get_mut(event_bus_name).ok_or_else(|| {
506 AwsServiceError::aws_error(
507 StatusCode::BAD_REQUEST,
508 "ResourceNotFoundException",
509 format!("Event bus {event_bus_name} does not exist."),
510 )
511 })?;
512
513 if remove_all {
514 bus.policy = None;
515 return Ok(AwsResponse::ok_json(json!({})));
516 }
517
518 let policy = bus.policy.as_mut().ok_or_else(|| {
519 AwsServiceError::aws_error(
520 StatusCode::BAD_REQUEST,
521 "ResourceNotFoundException",
522 "EventBus does not have a policy.",
523 )
524 })?;
525
526 if let Some(stmts) = policy["Statement"].as_array_mut() {
527 let before = stmts.len();
528 stmts.retain(|s| s["Sid"].as_str() != Some(statement_id));
529 if stmts.len() == before {
530 return Err(AwsServiceError::aws_error(
531 StatusCode::BAD_REQUEST,
532 "ResourceNotFoundException",
533 "Statement with the provided id does not exist.",
534 ));
535 }
536 if stmts.is_empty() {
537 bus.policy = None;
538 }
539 }
540
541 Ok(AwsResponse::ok_json(json!({})))
542 }
543
544 fn put_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
547 let body = req.json_body();
548 validate_required("Name", &body["Name"])?;
549 let name = body["Name"]
550 .as_str()
551 .ok_or_else(|| missing("Name"))?
552 .to_string();
553 validate_string_length("name", &name, 1, 64)?;
554 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
555 validate_optional_string_length(
556 "scheduleExpression",
557 body["ScheduleExpression"].as_str(),
558 0,
559 256,
560 )?;
561 validate_optional_string_length("eventPattern", body["EventPattern"].as_str(), 0, 4096)?;
562 validate_optional_enum(
563 "state",
564 body["State"].as_str(),
565 &[
566 "ENABLED",
567 "DISABLED",
568 "ENABLED_WITH_ALL_CLOUDTRAIL_MANAGEMENT_EVENTS",
569 ],
570 )?;
571 validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
572 validate_optional_string_length("roleArn", body["RoleArn"].as_str(), 1, 1600)?;
573
574 let raw_bus = body["EventBusName"]
575 .as_str()
576 .unwrap_or("default")
577 .to_string();
578
579 let mut accounts = self.state.write();
580 let state = accounts.get_or_create(&req.account_id);
581 let event_bus_name = state.resolve_bus_name(&raw_bus);
582
583 let event_pattern = body["EventPattern"].as_str().and_then(|s| {
584 if s.is_empty() {
585 None
586 } else {
587 Some(s.to_string())
588 }
589 });
590 let schedule_expression = body["ScheduleExpression"].as_str().and_then(|s| {
591 if s.is_empty() {
592 None
593 } else {
594 Some(s.to_string())
595 }
596 });
597 let description = body["Description"].as_str().map(|s| s.to_string());
598 let role_arn = body["RoleArn"].as_str().map(|s| s.to_string());
599 let rule_state = body["State"].as_str().unwrap_or("ENABLED").to_string();
600
601 if schedule_expression.is_some() && event_bus_name != "default" {
603 return Err(AwsServiceError::aws_error(
604 StatusCode::BAD_REQUEST,
605 "ValidationException",
606 "ScheduleExpression is supported only on the default event bus.",
607 ));
608 }
609
610 if !state.buses.contains_key(&event_bus_name) {
611 return Err(AwsServiceError::aws_error(
612 StatusCode::BAD_REQUEST,
613 "ResourceNotFoundException",
614 format!("Event bus {event_bus_name} does not exist."),
615 ));
616 }
617
618 let arn = if event_bus_name == "default" {
619 format!(
620 "arn:aws:events:{}:{}:rule/{}",
621 req.region, state.account_id, name
622 )
623 } else {
624 format!(
625 "arn:aws:events:{}:{}:rule/{}/{}",
626 req.region, state.account_id, event_bus_name, name
627 )
628 };
629
630 let key = (event_bus_name.clone(), name.clone());
631 let targets = state
632 .rules
633 .get(&key)
634 .map(|r| r.targets.clone())
635 .unwrap_or_default();
636
637 let tags = parse_tags(&body);
638
639 let rule = EventRule {
640 name: name.clone(),
641 arn: arn.clone(),
642 event_bus_name,
643 event_pattern,
644 schedule_expression,
645 state: rule_state,
646 description,
647 role_arn,
648 managed_by: None,
649 created_by: None,
650 targets,
651 tags,
652 last_fired: None,
653 };
654
655 state.rules.insert(key, rule);
656 Ok(AwsResponse::ok_json(json!({ "RuleArn": arn })))
657 }
658
659 fn delete_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
660 let body = req.json_body();
661 validate_required("Name", &body["Name"])?;
662 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
663 validate_string_length("name", name, 1, 64)?;
664 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
665 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
666
667 let mut accounts = self.state.write();
668 let state = accounts.get_or_create(&req.account_id);
669 let bus_name = state.resolve_bus_name(event_bus_name);
670 let key = (bus_name, name.to_string());
671
672 if let Some(rule) = state.rules.get(&key) {
674 if !rule.targets.is_empty() {
675 return Err(AwsServiceError::aws_error(
676 StatusCode::BAD_REQUEST,
677 "ValidationException",
678 "Rule can't be deleted since it has targets.",
679 ));
680 }
681 }
682
683 state.rules.remove(&key);
684 Ok(AwsResponse::ok_json(json!({})))
685 }
686
687 fn list_rules(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
688 let body = req.json_body();
689 validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 64)?;
690 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
691 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
692 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
693 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
694 let name_prefix = body["NamePrefix"].as_str();
695 let limit = body["Limit"].as_u64().map(|n| n as usize);
696 let next_token = body["NextToken"].as_str();
697
698 let accounts = self.state.read();
699 let empty = EventBridgeState::new(&req.account_id, &req.region);
700 let state = accounts.get(&req.account_id).unwrap_or(&empty);
701 let bus_name = state.resolve_bus_name(event_bus_name);
702
703 let mut rules: Vec<&EventRule> = state
704 .rules
705 .values()
706 .filter(|r| r.event_bus_name == bus_name)
707 .filter(|r| match name_prefix {
708 Some(prefix) => r.name.starts_with(prefix),
709 None => true,
710 })
711 .collect();
712 rules.sort_by(|a, b| a.name.cmp(&b.name));
713
714 let start = next_token
716 .and_then(|t| t.parse::<usize>().ok())
717 .unwrap_or(0)
718 .min(rules.len());
719 let rules_slice = &rules[start..];
720
721 let (page, new_next_token) = if let Some(lim) = limit {
722 if rules_slice.len() > lim {
723 (&rules_slice[..lim], Some((start + lim).to_string()))
724 } else {
725 (rules_slice, None)
726 }
727 } else {
728 (rules_slice, None)
729 };
730
731 let rules_json: Vec<Value> = page
732 .iter()
733 .map(|r| {
734 let mut obj = json!({
735 "Name": r.name,
736 "Arn": r.arn,
737 "EventBusName": r.event_bus_name,
738 "State": r.state,
739 });
740 if let Some(ref desc) = r.description {
741 obj["Description"] = json!(desc);
742 }
743 if let Some(ref ep) = r.event_pattern {
744 obj["EventPattern"] = json!(ep);
745 }
746 if let Some(ref se) = r.schedule_expression {
747 obj["ScheduleExpression"] = json!(se);
748 }
749 if let Some(ref role) = r.role_arn {
750 obj["RoleArn"] = json!(role);
751 }
752 if let Some(ref mb) = r.managed_by {
753 obj["ManagedBy"] = json!(mb);
754 }
755 obj
756 })
757 .collect();
758
759 let mut resp = json!({ "Rules": rules_json });
760 if let Some(token) = new_next_token {
761 resp["NextToken"] = json!(token);
762 }
763
764 Ok(AwsResponse::ok_json(resp))
765 }
766
767 fn describe_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
768 let body = req.json_body();
769 validate_required("Name", &body["Name"])?;
770 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
771 validate_string_length("name", name, 1, 64)?;
772 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
773 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
774
775 let accounts = self.state.read();
776 let empty = EventBridgeState::new(&req.account_id, &req.region);
777 let state = accounts.get(&req.account_id).unwrap_or(&empty);
778 let bus_name = state.resolve_bus_name(event_bus_name);
779 let key = (bus_name.clone(), name.to_string());
780
781 let rule = state.rules.get(&key).ok_or_else(|| {
782 AwsServiceError::aws_error(
783 StatusCode::BAD_REQUEST,
784 "ResourceNotFoundException",
785 format!("Rule {name} does not exist."),
786 )
787 })?;
788
789 let mut resp = json!({
790 "Name": rule.name,
791 "Arn": rule.arn,
792 "EventBusName": rule.event_bus_name,
793 "State": rule.state,
794 });
795
796 if let Some(ref desc) = rule.description {
797 resp["Description"] = json!(desc);
798 }
799 if let Some(ref ep) = rule.event_pattern {
800 resp["EventPattern"] = json!(ep);
801 }
802 if let Some(ref se) = rule.schedule_expression {
803 resp["ScheduleExpression"] = json!(se);
804 }
805 if let Some(ref role) = rule.role_arn {
806 resp["RoleArn"] = json!(role);
807 }
808 if let Some(ref mb) = rule.managed_by {
809 resp["ManagedBy"] = json!(mb);
810 }
811 if let Some(ref cb) = rule.created_by {
812 resp["CreatedBy"] = json!(cb);
813 }
814 if rule.event_bus_name != "default" && rule.created_by.is_none() {
816 resp["CreatedBy"] = json!(state.account_id);
817 }
818
819 Ok(AwsResponse::ok_json(resp))
820 }
821
822 fn enable_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
823 let body = req.json_body();
824 validate_required("Name", &body["Name"])?;
825 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
826 validate_string_length("name", name, 1, 64)?;
827 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
828 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
829
830 let mut accounts = self.state.write();
831 let state = accounts.get_or_create(&req.account_id);
832 let bus_name = state.resolve_bus_name(event_bus_name);
833 let key = (bus_name, name.to_string());
834
835 let rule = state.rules.get_mut(&key).ok_or_else(|| {
836 AwsServiceError::aws_error(
837 StatusCode::BAD_REQUEST,
838 "ResourceNotFoundException",
839 format!("Rule {name} does not exist."),
840 )
841 })?;
842
843 rule.state = "ENABLED".to_string();
844 Ok(AwsResponse::ok_json(json!({})))
845 }
846
847 fn disable_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
848 let body = req.json_body();
849 validate_required("Name", &body["Name"])?;
850 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
851 validate_string_length("name", name, 1, 64)?;
852 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
853 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
854
855 let mut accounts = self.state.write();
856 let state = accounts.get_or_create(&req.account_id);
857 let bus_name = state.resolve_bus_name(event_bus_name);
858 let key = (bus_name, name.to_string());
859
860 let rule = state.rules.get_mut(&key).ok_or_else(|| {
861 AwsServiceError::aws_error(
862 StatusCode::BAD_REQUEST,
863 "ResourceNotFoundException",
864 format!("Rule {name} does not exist."),
865 )
866 })?;
867
868 rule.state = "DISABLED".to_string();
869 Ok(AwsResponse::ok_json(json!({})))
870 }
871
872 fn put_targets(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
875 let body = req.json_body();
876 validate_required("Rule", &body["Rule"])?;
877 let rule_name = body["Rule"].as_str().ok_or_else(|| missing("Rule"))?;
878 validate_string_length("rule", rule_name, 1, 64)?;
879 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
880 validate_required("Targets", &body["Targets"])?;
881 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
882 let targets = body["Targets"]
883 .as_array()
884 .ok_or_else(|| missing("Targets"))?;
885
886 for target in targets {
888 let target_id = target["Id"].as_str().unwrap_or("");
889 let target_arn = target["Arn"].as_str().unwrap_or("");
890
891 if target_arn.ends_with(".fifo") && target.get("SqsParameters").is_none() {
892 return Err(AwsServiceError::aws_error(
893 StatusCode::BAD_REQUEST,
894 "ValidationException",
895 format!(
896 "Parameter(s) SqsParameters must be specified for target: {target_id}."
897 ),
898 ));
899 }
900
901 if !target_arn.starts_with("arn:") {
903 return Err(AwsServiceError::aws_error(
904 StatusCode::BAD_REQUEST,
905 "ValidationException",
906 format!(
907 "Parameter {target_arn} is not valid. Reason: Provided Arn is not in correct format."
908 ),
909 ));
910 }
911 }
912
913 let mut accounts = self.state.write();
914 let state = accounts.get_or_create(&req.account_id);
915 let bus_name = state.resolve_bus_name(event_bus_name);
916 let key = (bus_name.clone(), rule_name.to_string());
917
918 let rule = state.rules.get_mut(&key).ok_or_else(|| {
919 AwsServiceError::aws_error(
920 StatusCode::BAD_REQUEST,
921 "ResourceNotFoundException",
922 format!("Rule {rule_name} does not exist on EventBus {bus_name}."),
923 )
924 })?;
925
926 for target in targets {
927 let et = parse_target(target);
928 rule.targets.retain(|t| t.id != et.id);
930 rule.targets.push(et);
931 }
932
933 Ok(AwsResponse::ok_json(json!({
934 "FailedEntryCount": 0,
935 "FailedEntries": [],
936 })))
937 }
938
939 fn remove_targets(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
940 let body = req.json_body();
941 validate_required("Rule", &body["Rule"])?;
942 let rule_name = body["Rule"].as_str().ok_or_else(|| missing("Rule"))?;
943 validate_string_length("rule", rule_name, 1, 64)?;
944 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
945 validate_required("Ids", &body["Ids"])?;
946 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
947 let ids = body["Ids"].as_array().ok_or_else(|| missing("Ids"))?;
948
949 let target_ids: Vec<String> = ids
950 .iter()
951 .filter_map(|v| v.as_str().map(|s| s.to_string()))
952 .collect();
953
954 let mut accounts = self.state.write();
955 let state = accounts.get_or_create(&req.account_id);
956 let bus_name = state.resolve_bus_name(event_bus_name);
957 let key = (bus_name.clone(), rule_name.to_string());
958
959 let rule = state.rules.get_mut(&key).ok_or_else(|| {
960 AwsServiceError::aws_error(
961 StatusCode::BAD_REQUEST,
962 "ResourceNotFoundException",
963 format!("Rule {rule_name} does not exist on EventBus {bus_name}."),
964 )
965 })?;
966
967 rule.targets.retain(|t| !target_ids.contains(&t.id));
968
969 Ok(AwsResponse::ok_json(json!({
970 "FailedEntryCount": 0,
971 "FailedEntries": [],
972 })))
973 }
974
975 fn list_targets_by_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
976 let body = req.json_body();
977 validate_required("Rule", &body["Rule"])?;
978 let rule_name = body["Rule"].as_str().ok_or_else(|| missing("Rule"))?;
979 validate_string_length("rule", rule_name, 1, 64)?;
980 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
981 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
982 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
983 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
984 let limit = body["Limit"].as_u64().map(|n| n as usize);
985 let next_token = body["NextToken"].as_str();
986
987 let accounts = self.state.read();
988 let empty = EventBridgeState::new(&req.account_id, &req.region);
989 let state = accounts.get(&req.account_id).unwrap_or(&empty);
990 let bus_name = state.resolve_bus_name(event_bus_name);
991 let key = (bus_name, rule_name.to_string());
992
993 let rule = state.rules.get(&key).ok_or_else(|| {
994 AwsServiceError::aws_error(
995 StatusCode::BAD_REQUEST,
996 "ResourceNotFoundException",
997 format!("Rule {rule_name} does not exist."),
998 )
999 })?;
1000
1001 let all_targets = &rule.targets;
1002 let start = next_token
1003 .and_then(|t| t.parse::<usize>().ok())
1004 .unwrap_or(0)
1005 .min(all_targets.len());
1006 let slice = &all_targets[start..];
1007
1008 let (page, new_next_token) = if let Some(lim) = limit {
1009 if slice.len() > lim {
1010 (&slice[..lim], Some((start + lim).to_string()))
1011 } else {
1012 (slice, None)
1013 }
1014 } else {
1015 (slice, None)
1016 };
1017
1018 let targets: Vec<Value> = page.iter().map(target_to_json).collect();
1019
1020 let mut resp = json!({ "Targets": targets });
1021 if let Some(token) = new_next_token {
1022 resp["NextToken"] = json!(token);
1023 }
1024
1025 Ok(AwsResponse::ok_json(resp))
1026 }
1027
1028 fn list_rule_names_by_target(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1029 let body = req.json_body();
1030 validate_required("TargetArn", &body["TargetArn"])?;
1031 let target_arn = body["TargetArn"]
1032 .as_str()
1033 .ok_or_else(|| missing("TargetArn"))?;
1034 validate_string_length("targetArn", target_arn, 1, 1600)?;
1035 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
1036 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1037 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1038 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
1039 let limit = body["Limit"].as_u64().map(|n| n as usize);
1040 let next_token = body["NextToken"].as_str();
1041
1042 let accounts = self.state.read();
1043 let empty = EventBridgeState::new(&req.account_id, &req.region);
1044 let state = accounts.get(&req.account_id).unwrap_or(&empty);
1045 let bus_name = state.resolve_bus_name(event_bus_name);
1046
1047 let mut rule_names: Vec<String> = Vec::new();
1049 for rule in state.rules.values() {
1050 if rule.event_bus_name == bus_name
1051 && rule.targets.iter().any(|t| t.arn == target_arn)
1052 && !rule_names.contains(&rule.name)
1053 {
1054 rule_names.push(rule.name.clone());
1055 }
1056 }
1057 rule_names.sort();
1058
1059 let start = next_token
1060 .and_then(|t| t.parse::<usize>().ok())
1061 .unwrap_or(0)
1062 .min(rule_names.len());
1063 let slice = &rule_names[start..];
1064
1065 let (page, new_next_token) = if let Some(lim) = limit {
1066 if slice.len() > lim {
1067 (&slice[..lim], Some((start + lim).to_string()))
1068 } else {
1069 (slice, None)
1070 }
1071 } else {
1072 (slice, None)
1073 };
1074
1075 let mut resp = json!({ "RuleNames": page });
1076 if let Some(token) = new_next_token {
1077 resp["NextToken"] = json!(token);
1078 }
1079
1080 Ok(AwsResponse::ok_json(resp))
1081 }
1082
1083 fn test_event_pattern(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1086 let body = req.json_body();
1087 validate_required("EventPattern", &body["EventPattern"])?;
1088 validate_required("Event", &body["Event"])?;
1089 let event_pattern = body["EventPattern"]
1090 .as_str()
1091 .ok_or_else(|| missing("EventPattern"))?;
1092 let event_str = body["Event"].as_str().ok_or_else(|| missing("Event"))?;
1093
1094 let event: Value = serde_json::from_str(event_str).map_err(|_| {
1096 AwsServiceError::aws_error(
1097 StatusCode::BAD_REQUEST,
1098 "InvalidEventPatternException",
1099 "Event is not valid JSON.",
1100 )
1101 })?;
1102
1103 let _pattern: Value = serde_json::from_str(event_pattern).map_err(|_| {
1105 AwsServiceError::aws_error(
1106 StatusCode::BAD_REQUEST,
1107 "InvalidEventPatternException",
1108 "Event pattern is not valid JSON.",
1109 )
1110 })?;
1111
1112 let source = event["source"].as_str().unwrap_or("");
1113 let detail_type = event["detail-type"].as_str().unwrap_or("");
1114 let detail = event
1115 .get("detail")
1116 .map(|v| serde_json::to_string(v).unwrap_or_default())
1117 .unwrap_or_else(|| "{}".to_string());
1118 let account = event["account"].as_str().unwrap_or("");
1119 let region = event["region"].as_str().unwrap_or("");
1120 let resources: Vec<String> = event["resources"]
1121 .as_array()
1122 .map(|arr| {
1123 arr.iter()
1124 .filter_map(|v| v.as_str().map(|s| s.to_string()))
1125 .collect()
1126 })
1127 .unwrap_or_default();
1128
1129 let result = matches_pattern(
1130 Some(event_pattern),
1131 source,
1132 detail_type,
1133 &detail,
1134 account,
1135 region,
1136 &resources,
1137 );
1138
1139 Ok(AwsResponse::ok_json(json!({ "Result": result })))
1140 }
1141
1142 fn update_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1145 let body = req.json_body();
1146 validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
1147 validate_optional_string_length(
1148 "kmsKeyIdentifier",
1149 body["KmsKeyIdentifier"].as_str(),
1150 0,
1151 2048,
1152 )?;
1153 let name = body["Name"].as_str().unwrap_or("default");
1154
1155 let mut accounts = self.state.write();
1156 let state = accounts.get_or_create(&req.account_id);
1157 let bus = state.buses.get_mut(name).ok_or_else(|| {
1158 AwsServiceError::aws_error(
1159 StatusCode::BAD_REQUEST,
1160 "ResourceNotFoundException",
1161 format!("Event bus {name} does not exist."),
1162 )
1163 })?;
1164
1165 if let Some(desc) = body["Description"].as_str() {
1166 bus.description = Some(desc.to_string());
1167 }
1168 if let Some(kms) = body["KmsKeyIdentifier"].as_str() {
1169 bus.kms_key_identifier = Some(kms.to_string());
1170 }
1171 if let Some(dlc) = body.get("DeadLetterConfig") {
1172 bus.dead_letter_config = Some(dlc.clone());
1173 }
1174 bus.last_modified_time = Utc::now();
1175
1176 let arn = bus.arn.clone();
1177 let bus_name = bus.name.clone();
1178
1179 Ok(AwsResponse::ok_json(json!({
1180 "Arn": arn,
1181 "Name": bus_name,
1182 })))
1183 }
1184
1185 fn put_events(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1188 let body = req.json_body();
1189 validate_required("Entries", &body["Entries"])?;
1190 validate_optional_string_length("endpointId", body["EndpointId"].as_str(), 1, 50)?;
1191 let entries = body["Entries"]
1192 .as_array()
1193 .ok_or_else(|| missing("Entries"))?;
1194
1195 if entries.is_empty() {
1197 return Err(AwsServiceError::aws_error(
1198 StatusCode::BAD_REQUEST,
1199 "ValidationException",
1200 "1 validation error detected: Value '[PutEventsRequestEntry]' at 'entries' failed to satisfy constraint: Member must have length greater than or equal to 1",
1201 ));
1202 }
1203 if entries.len() > 10 {
1204 return Err(AwsServiceError::aws_error(
1205 StatusCode::BAD_REQUEST,
1206 "ValidationException",
1207 "1 validation error detected: Value '[PutEventsRequestEntry]' at 'entries' failed to satisfy constraint: Member must have length less than or equal to 10",
1208 ));
1209 }
1210
1211 let mut accounts = self.state.write();
1212 let state = accounts.get_or_create(&req.account_id);
1213 let mut result_entries = Vec::new();
1214 let mut events_to_deliver = Vec::new();
1215 let mut failed_count = 0;
1216
1217 for entry in entries {
1218 let source = entry["Source"].as_str().unwrap_or("").to_string();
1219 let detail_type = entry["DetailType"].as_str().unwrap_or("").to_string();
1220 let detail = entry["Detail"].as_str().unwrap_or("").to_string();
1221
1222 if let Err(error) = validate_put_events_entry(&source, &detail_type, &detail) {
1223 failed_count += 1;
1224 result_entries.push(error);
1225 continue;
1226 }
1227
1228 let event_id = uuid::Uuid::new_v4().to_string();
1229 let raw_bus = entry["EventBusName"]
1230 .as_str()
1231 .unwrap_or("default")
1232 .to_string();
1233 let event_bus_name = state.resolve_bus_name(&raw_bus);
1234
1235 let caller_account = req
1241 .principal
1242 .as_ref()
1243 .map(|p| p.account_id.as_str())
1244 .unwrap_or(req.account_id.as_str());
1245 if caller_account != req.account_id {
1246 let bus_policy_value = state
1247 .buses
1248 .get(&event_bus_name)
1249 .and_then(|b| b.policy.clone());
1250 if let Some(policy_value) = bus_policy_value {
1251 let policy_json = serde_json::to_string(&policy_value).unwrap_or_default();
1252 let policy_doc = fakecloud_iam::evaluator::PolicyDocument::parse(&policy_json);
1253 let bus_arn = state
1254 .buses
1255 .get(&event_bus_name)
1256 .map(|b| b.arn.clone())
1257 .unwrap_or_default();
1258 let principal =
1259 req.principal
1260 .clone()
1261 .unwrap_or_else(|| fakecloud_core::auth::Principal {
1262 arn: format!("arn:aws:iam::{caller_account}:root"),
1263 user_id: caller_account.to_string(),
1264 account_id: caller_account.to_string(),
1265 principal_type: fakecloud_core::auth::PrincipalType::Root,
1266 source_identity: None,
1267 tags: None,
1268 });
1269 let context = fakecloud_iam::evaluator::RequestContext {
1270 aws_principal_arn: Some(principal.arn.clone()),
1271 aws_principal_account: Some(principal.account_id.clone()),
1272 ..Default::default()
1273 };
1274 let eval_req = fakecloud_iam::evaluator::EvalRequest {
1275 principal: &principal,
1276 action: "events:PutEvents".to_string(),
1277 resource: bus_arn,
1278 context,
1279 };
1280 let decision = fakecloud_iam::evaluator::evaluate_resource_policy_only(
1281 &policy_doc,
1282 &eval_req,
1283 );
1284 if !matches!(decision, fakecloud_iam::evaluator::Decision::Allow) {
1285 failed_count += 1;
1286 result_entries.push(json!({
1287 "ErrorCode": "AccessDeniedException",
1288 "ErrorMessage": format!(
1289 "User '{}' is not authorized to put events on event bus '{}'",
1290 principal.arn, event_bus_name
1291 ),
1292 }));
1293 continue;
1294 }
1295 }
1296 }
1297
1298 let time = parse_put_events_time(&entry["Time"]);
1299 let resources: Vec<String> = entry["Resources"]
1300 .as_array()
1301 .map(|arr| {
1302 arr.iter()
1303 .filter_map(|v| v.as_str().map(|s| s.to_string()))
1304 .collect()
1305 })
1306 .unwrap_or_default();
1307
1308 let event = PutEvent {
1309 event_id: event_id.clone(),
1310 source: source.clone(),
1311 detail_type: detail_type.clone(),
1312 detail: detail.clone(),
1313 event_bus_name: event_bus_name.clone(),
1314 time,
1315 resources: resources.clone(),
1316 };
1317
1318 archive_matching_event(
1319 state,
1320 &event,
1321 &event_bus_name,
1322 &source,
1323 &detail_type,
1324 &detail,
1325 &req.account_id,
1326 &req.region,
1327 &resources,
1328 );
1329
1330 state.events.push(event);
1331
1332 let matching_targets: Vec<EventTarget> = state
1334 .rules
1335 .values()
1336 .filter(|r| {
1337 r.event_bus_name == event_bus_name
1338 && r.state == "ENABLED"
1339 && matches_pattern(
1340 r.event_pattern.as_deref(),
1341 &source,
1342 &detail_type,
1343 &detail,
1344 &req.account_id,
1345 &req.region,
1346 &resources,
1347 )
1348 })
1349 .flat_map(|r| r.targets.clone())
1350 .collect();
1351
1352 if !matching_targets.is_empty() {
1353 events_to_deliver.push((
1354 event_id.clone(),
1355 source,
1356 detail_type,
1357 detail,
1358 time,
1359 resources,
1360 matching_targets,
1361 ));
1362 }
1363
1364 result_entries.push(json!({ "EventId": event_id }));
1365 }
1366
1367 drop(accounts);
1369
1370 for (event_id, source, detail_type, detail, time, resources, targets) in events_to_deliver {
1375 let detail_value: Value = serde_json::from_str(&detail).unwrap_or(json!({}));
1376 let event_json = json!({
1377 "version": "0",
1378 "id": event_id,
1379 "source": source,
1380 "account": req.account_id,
1381 "detail-type": detail_type,
1382 "detail": detail_value,
1383 "time": time.format("%Y-%m-%dT%H:%M:%SZ").to_string(),
1384 "region": req.region,
1385 "resources": resources,
1386 });
1387
1388 let ctx = EventDispatchContext {
1389 state: &self.state,
1390 delivery: &self.delivery,
1391 lambda_state: self.lambda_state.as_ref(),
1392 logs_state: self.logs_state.as_ref(),
1393 container_runtime: &self.container_runtime,
1394 account_id: &req.account_id,
1395 region: &req.region,
1396 };
1397 for target in targets {
1398 dispatch_event_target(&ctx, &target, &event_json, &event_id, &detail_type);
1399 }
1400 }
1401
1402 let resp = json!({
1403 "FailedEntryCount": failed_count,
1404 "Entries": result_entries,
1405 });
1406
1407 Ok(AwsResponse::ok_json(resp))
1408 }
1409
1410 fn tag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1413 let body = req.json_body();
1414 validate_required("ResourceARN", &body["ResourceARN"])?;
1415 let arn = body["ResourceARN"]
1416 .as_str()
1417 .ok_or_else(|| missing("ResourceARN"))?;
1418 validate_string_length("resourceARN", arn, 1, 1600)?;
1419 validate_required("Tags", &body["Tags"])?;
1420
1421 let mut accounts = self.state.write();
1422 let state = accounts.get_or_create(&req.account_id);
1423 let tag_map = find_tags_mut(state, arn)?;
1424
1425 fakecloud_core::tags::apply_tags(tag_map, &body, "Tags", "Key", "Value").map_err(|f| {
1426 AwsServiceError::aws_error(
1427 StatusCode::BAD_REQUEST,
1428 "ValidationException",
1429 format!("{f} must be a list"),
1430 )
1431 })?;
1432
1433 Ok(AwsResponse::ok_json(json!({})))
1434 }
1435
1436 fn untag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1437 let body = req.json_body();
1438 validate_required("ResourceARN", &body["ResourceARN"])?;
1439 let arn = body["ResourceARN"]
1440 .as_str()
1441 .ok_or_else(|| missing("ResourceARN"))?;
1442 validate_string_length("resourceARN", arn, 1, 1600)?;
1443 validate_required("TagKeys", &body["TagKeys"])?;
1444
1445 let mut accounts = self.state.write();
1446 let state = accounts.get_or_create(&req.account_id);
1447 let tag_map = find_tags_mut(state, arn)?;
1448
1449 fakecloud_core::tags::remove_tags(tag_map, &body, "TagKeys").map_err(|f| {
1450 AwsServiceError::aws_error(
1451 StatusCode::BAD_REQUEST,
1452 "ValidationException",
1453 format!("{f} must be a list"),
1454 )
1455 })?;
1456
1457 Ok(AwsResponse::ok_json(json!({})))
1458 }
1459
1460 fn list_tags_for_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1461 let body = req.json_body();
1462 validate_required("ResourceARN", &body["ResourceARN"])?;
1463 let arn = body["ResourceARN"]
1464 .as_str()
1465 .ok_or_else(|| missing("ResourceARN"))?;
1466 validate_string_length("resourceARN", arn, 1, 1600)?;
1467
1468 let accounts = self.state.read();
1469 let empty = EventBridgeState::new(&req.account_id, &req.region);
1470 let state = accounts.get(&req.account_id).unwrap_or(&empty);
1471 let tag_map = find_tags(state, arn)?;
1472
1473 let tags = fakecloud_core::tags::tags_to_json(tag_map, "Key", "Value");
1474
1475 Ok(AwsResponse::ok_json(json!({ "Tags": tags })))
1476 }
1477
1478 }
1480
1481struct StartReplayInput {
1491 name: String,
1492 description: Option<String>,
1493 event_source_arn: String,
1494 destination: Value,
1495 destination_arn: String,
1496 event_start_time: DateTime<Utc>,
1497 event_end_time: DateTime<Utc>,
1498}
1499
1500impl StartReplayInput {
1501 fn from_body(body: &Value) -> Result<Self, AwsServiceError> {
1502 validate_required("ReplayName", &body["ReplayName"])?;
1503 let name = body["ReplayName"]
1504 .as_str()
1505 .ok_or_else(|| missing("ReplayName"))?
1506 .to_string();
1507 validate_string_length("replayName", &name, 1, 64)?;
1508 validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
1509 validate_required("EventSourceArn", &body["EventSourceArn"])?;
1510 let description = body["Description"].as_str().map(|s| s.to_string());
1511 let event_source_arn = body["EventSourceArn"]
1512 .as_str()
1513 .ok_or_else(|| missing("EventSourceArn"))?
1514 .to_string();
1515 validate_string_length("eventSourceArn", &event_source_arn, 1, 1600)?;
1516 validate_required("EventStartTime", &body["EventStartTime"])?;
1517 validate_required("EventEndTime", &body["EventEndTime"])?;
1518 validate_required("Destination", &body["Destination"])?;
1519 let destination = body["Destination"].clone();
1520
1521 let event_start_time = body["EventStartTime"]
1522 .as_f64()
1523 .and_then(|f| DateTime::from_timestamp(f as i64, 0))
1524 .unwrap_or_else(Utc::now);
1525 let event_end_time = body["EventEndTime"]
1526 .as_f64()
1527 .and_then(|f| DateTime::from_timestamp(f as i64, 0))
1528 .unwrap_or_else(Utc::now);
1529
1530 let destination_arn = destination["Arn"].as_str().unwrap_or("").to_string();
1531 if !destination_arn.contains(":event-bus/") {
1532 return Err(AwsServiceError::aws_error(
1533 StatusCode::BAD_REQUEST,
1534 "ValidationException",
1535 "Parameter Destination.Arn is not valid. Reason: Must contain an event bus ARN.",
1536 ));
1537 }
1538
1539 Ok(Self {
1540 name,
1541 description,
1542 event_source_arn,
1543 destination,
1544 destination_arn,
1545 event_start_time,
1546 event_end_time,
1547 })
1548 }
1549}
1550
1551#[path = "service_archives_replays.rs"]
1552mod service_archives_replays;
1553#[path = "service_connections_apidests.rs"]
1554mod service_connections_apidests;
1555#[path = "service_endpoints.rs"]
1556mod service_endpoints;
1557#[path = "service_partner_sources.rs"]
1558mod service_partner_sources;
1559
1560#[path = "helpers.rs"]
1561mod helpers;
1562pub(crate) use helpers::*;
1563
1564#[cfg(test)]
1565#[path = "service_tests.rs"]
1566mod tests;