1use async_trait::async_trait;
2use chrono::{DateTime, Utc};
3use http::StatusCode;
4use serde_json::{json, Value};
5
6use std::collections::{BTreeMap, HashMap};
7use std::sync::Arc;
8
9use tokio::sync::Mutex as AsyncMutex;
10
11use fakecloud_aws::arn::Arn;
12use fakecloud_core::delivery::DeliveryBus;
13use fakecloud_core::pagination::paginate;
14use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
15use fakecloud_core::validation::*;
16use fakecloud_persistence::SnapshotStore;
17
18use fakecloud_lambda::runtime::ContainerRuntime;
19use fakecloud_lambda::{LambdaInvocation, SharedLambdaState};
20use fakecloud_logs::SharedLogsState;
21
22use crate::state::{
23 ApiDestination, Archive, Connection, Endpoint, EventBridgeSnapshot, EventBridgeState, EventBus,
24 EventRule, EventTarget, PartnerEventSource, PutEvent, Replay, SharedEventBridgeState,
25 EVENTBRIDGE_SNAPSHOT_SCHEMA_VERSION,
26};
27
28pub struct EventBridgeService {
29 state: SharedEventBridgeState,
30 delivery: Arc<DeliveryBus>,
31 lambda_state: Option<SharedLambdaState>,
32 logs_state: Option<SharedLogsState>,
33 container_runtime: Option<Arc<ContainerRuntime>>,
34 snapshot_store: Option<Arc<dyn SnapshotStore>>,
35 snapshot_lock: Arc<AsyncMutex<()>>,
36}
37
38impl EventBridgeService {
39 pub fn new(state: SharedEventBridgeState, delivery: Arc<DeliveryBus>) -> Self {
40 Self {
41 state,
42 delivery,
43 lambda_state: None,
44 logs_state: None,
45 container_runtime: None,
46 snapshot_store: None,
47 snapshot_lock: Arc::new(AsyncMutex::new(())),
48 }
49 }
50
51 pub fn with_lambda(mut self, lambda_state: SharedLambdaState) -> Self {
52 self.lambda_state = Some(lambda_state);
53 self
54 }
55
56 pub fn with_logs(mut self, logs_state: SharedLogsState) -> Self {
57 self.logs_state = Some(logs_state);
58 self
59 }
60
61 pub fn with_runtime(mut self, runtime: Arc<ContainerRuntime>) -> Self {
62 self.container_runtime = Some(runtime);
63 self
64 }
65
66 pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
67 self.snapshot_store = Some(store);
68 self
69 }
70
71 async fn save_snapshot(&self) {
75 save_eventbridge_snapshot(
76 &self.state,
77 self.snapshot_store.clone(),
78 &self.snapshot_lock,
79 )
80 .await;
81 }
82
83 pub fn snapshot_hook(&self) -> Option<fakecloud_persistence::SnapshotHook> {
88 let store = self.snapshot_store.clone()?;
89 let state = self.state.clone();
90 let lock = self.snapshot_lock.clone();
91 Some(Arc::new(move || {
92 let state = state.clone();
93 let store = store.clone();
94 let lock = lock.clone();
95 Box::pin(async move {
96 save_eventbridge_snapshot(&state, Some(store), &lock).await;
97 })
98 }))
99 }
100}
101
102pub async fn save_eventbridge_snapshot(
108 state: &SharedEventBridgeState,
109 store: Option<Arc<dyn SnapshotStore>>,
110 lock: &AsyncMutex<()>,
111) {
112 let Some(store) = store else {
113 return;
114 };
115 let _guard = lock.lock().await;
116 let snapshot = EventBridgeSnapshot {
117 schema_version: EVENTBRIDGE_SNAPSHOT_SCHEMA_VERSION,
118 accounts: Some(state.read().clone()),
119 state: None,
120 };
121 let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
122 let bytes = serde_json::to_vec(&snapshot)
123 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
124 store.save(&bytes)
125 })
126 .await;
127 match join {
128 Ok(Ok(())) => {}
129 Ok(Err(err)) => tracing::error!(%err, "failed to write eventbridge snapshot"),
130 Err(err) => tracing::error!(%err, "eventbridge snapshot task panicked"),
131 }
132}
133
134#[async_trait]
135impl AwsService for EventBridgeService {
136 fn service_name(&self) -> &str {
137 "events"
138 }
139
140 async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
141 let mutates = is_mutating_action(req.action.as_str());
142 let result = match req.action.as_str() {
143 "CreateEventBus" => self.create_event_bus(&req),
144 "DeleteEventBus" => self.delete_event_bus(&req),
145 "ListEventBuses" => self.list_event_buses(&req),
146 "DescribeEventBus" => self.describe_event_bus(&req),
147 "PutRule" => self.put_rule(&req),
148 "DeleteRule" => self.delete_rule(&req),
149 "ListRules" => self.list_rules(&req),
150 "DescribeRule" => self.describe_rule(&req),
151 "EnableRule" => self.enable_rule(&req),
152 "DisableRule" => self.disable_rule(&req),
153 "PutTargets" => self.put_targets(&req),
154 "RemoveTargets" => self.remove_targets(&req),
155 "ListTargetsByRule" => self.list_targets_by_rule(&req),
156 "ListRuleNamesByTarget" => self.list_rule_names_by_target(&req),
157 "PutEvents" => self.put_events(&req),
158 "PutPermission" => self.put_permission(&req),
159 "RemovePermission" => self.remove_permission(&req),
160 "TagResource" => self.tag_resource(&req),
161 "UntagResource" => self.untag_resource(&req),
162 "ListTagsForResource" => self.list_tags_for_resource(&req),
163 "CreateArchive" => self.create_archive(&req),
164 "DescribeArchive" => self.describe_archive(&req),
165 "ListArchives" => self.list_archives(&req),
166 "UpdateArchive" => self.update_archive(&req),
167 "DeleteArchive" => self.delete_archive(&req),
168 "CreateConnection" => self.create_connection(&req),
169 "DescribeConnection" => self.describe_connection(&req),
170 "ListConnections" => self.list_connections(&req),
171 "UpdateConnection" => self.update_connection(&req),
172 "DeleteConnection" => self.delete_connection(&req),
173 "CreateApiDestination" => self.create_api_destination(&req),
174 "DescribeApiDestination" => self.describe_api_destination(&req),
175 "ListApiDestinations" => self.list_api_destinations(&req),
176 "UpdateApiDestination" => self.update_api_destination(&req),
177 "DeleteApiDestination" => self.delete_api_destination(&req),
178 "StartReplay" => self.start_replay(&req),
179 "DescribeReplay" => self.describe_replay(&req),
180 "ListReplays" => self.list_replays(&req),
181 "CancelReplay" => self.cancel_replay(&req),
182 "CreatePartnerEventSource" => self.create_partner_event_source(&req),
183 "DeletePartnerEventSource" => self.delete_partner_event_source(&req),
184 "DescribePartnerEventSource" => self.describe_partner_event_source(&req),
185 "ListPartnerEventSources" => self.list_partner_event_sources(&req),
186 "ListPartnerEventSourceAccounts" => self.list_partner_event_source_accounts(&req),
187 "ActivateEventSource" => self.activate_event_source(&req),
188 "DeactivateEventSource" => self.deactivate_event_source(&req),
189 "DescribeEventSource" => self.describe_event_source(&req),
190 "ListEventSources" => self.list_event_sources(&req),
191 "PutPartnerEvents" => self.put_partner_events(&req),
192 "TestEventPattern" => self.test_event_pattern(&req),
193 "UpdateEventBus" => self.update_event_bus(&req),
194 "CreateEndpoint" => self.create_endpoint(&req),
195 "DeleteEndpoint" => self.delete_endpoint(&req),
196 "DescribeEndpoint" => self.describe_endpoint(&req),
197 "ListEndpoints" => self.list_endpoints(&req),
198 "UpdateEndpoint" => self.update_endpoint(&req),
199 "DeauthorizeConnection" => self.deauthorize_connection(&req),
200 _ => Err(AwsServiceError::action_not_implemented(
201 "events",
202 &req.action,
203 )),
204 };
205 if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
206 self.save_snapshot().await;
207 }
208 result
209 }
210
211 fn supported_actions(&self) -> &[&str] {
212 &[
213 "CreateEventBus",
214 "DeleteEventBus",
215 "ListEventBuses",
216 "DescribeEventBus",
217 "PutRule",
218 "DeleteRule",
219 "ListRules",
220 "DescribeRule",
221 "EnableRule",
222 "DisableRule",
223 "PutTargets",
224 "RemoveTargets",
225 "ListTargetsByRule",
226 "ListRuleNamesByTarget",
227 "PutEvents",
228 "PutPermission",
229 "RemovePermission",
230 "TagResource",
231 "UntagResource",
232 "ListTagsForResource",
233 "CreateArchive",
234 "DescribeArchive",
235 "ListArchives",
236 "UpdateArchive",
237 "DeleteArchive",
238 "CreateConnection",
239 "DescribeConnection",
240 "ListConnections",
241 "UpdateConnection",
242 "DeleteConnection",
243 "CreateApiDestination",
244 "DescribeApiDestination",
245 "ListApiDestinations",
246 "UpdateApiDestination",
247 "DeleteApiDestination",
248 "StartReplay",
249 "DescribeReplay",
250 "ListReplays",
251 "CancelReplay",
252 "CreatePartnerEventSource",
253 "DeletePartnerEventSource",
254 "DescribePartnerEventSource",
255 "ListPartnerEventSources",
256 "ListPartnerEventSourceAccounts",
257 "ActivateEventSource",
258 "DeactivateEventSource",
259 "DescribeEventSource",
260 "ListEventSources",
261 "PutPartnerEvents",
262 "TestEventPattern",
263 "UpdateEventBus",
264 "CreateEndpoint",
265 "DeleteEndpoint",
266 "DescribeEndpoint",
267 "ListEndpoints",
268 "UpdateEndpoint",
269 "DeauthorizeConnection",
270 ]
271 }
272}
273
274impl EventBridgeService {
276 fn create_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
277 let body = req.json_body();
278 validate_required("Name", &body["Name"])?;
279 let name = body["Name"]
280 .as_str()
281 .ok_or_else(|| missing("Name"))?
282 .to_string();
283 validate_string_length("name", &name, 1, 256)?;
284 validate_optional_string_length(
285 "eventSourceName",
286 body["EventSourceName"].as_str(),
287 1,
288 256,
289 )?;
290 validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
291 validate_optional_string_length(
292 "kmsKeyIdentifier",
293 body["KmsKeyIdentifier"].as_str(),
294 0,
295 2048,
296 )?;
297
298 if name.contains('/') && !name.starts_with("aws.partner/") {
300 return Err(AwsServiceError::aws_error(
301 StatusCode::BAD_REQUEST,
302 "ValidationException",
303 "Event bus name must not contain '/'.",
304 ));
305 }
306
307 if name.starts_with("aws.partner/") {
309 let event_source = body["EventSourceName"].as_str().unwrap_or("");
310 let accounts_r = self.state.read();
311 let empty_r = EventBridgeState::new(&req.account_id, &req.region);
312 let state_r = accounts_r.get(&req.account_id).unwrap_or(&empty_r);
313 let has_source = state_r.partner_event_sources.contains_key(event_source);
314 drop(accounts_r);
315 if !has_source {
316 return Err(AwsServiceError::aws_error(
317 StatusCode::BAD_REQUEST,
318 "ResourceNotFoundException",
319 format!("Event source {event_source} does not exist."),
320 ));
321 }
322 }
323
324 let mut accounts = self.state.write();
325 let state = accounts.get_or_create(&req.account_id);
326
327 if state.buses.contains_key(&name) {
328 return Err(AwsServiceError::aws_error(
329 StatusCode::BAD_REQUEST,
330 "ResourceAlreadyExistsException",
331 format!("Event bus {name} already exists."),
332 ));
333 }
334
335 let arn = format!(
336 "arn:aws:events:{}:{}:event-bus/{}",
337 req.region, state.account_id, name
338 );
339 let now = Utc::now();
340 let description = body["Description"].as_str().map(|s| s.to_string());
341 let kms_key_identifier = body["KmsKeyIdentifier"].as_str().map(|s| s.to_string());
342 let dead_letter_config = body.get("DeadLetterConfig").cloned();
343
344 let tags = parse_tags(&body);
345
346 let bus = EventBus {
347 name: name.clone(),
348 arn: arn.clone(),
349 tags,
350 policy: None,
351 description,
352 kms_key_identifier,
353 dead_letter_config,
354 creation_time: now,
355 last_modified_time: now,
356 };
357 state.buses.insert(name, bus);
358
359 Ok(AwsResponse::ok_json(json!({ "EventBusArn": arn })))
360 }
361
362 fn delete_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
363 let body = req.json_body();
364 validate_required("Name", &body["Name"])?;
365 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
366 validate_string_length("name", name, 1, 256)?;
367
368 if name == "default" {
369 return Err(AwsServiceError::aws_error(
370 StatusCode::BAD_REQUEST,
371 "ValidationException",
372 format!("Cannot delete event bus {name}."),
373 ));
374 }
375
376 let mut accounts = self.state.write();
377 let state = accounts.get_or_create(&req.account_id);
378 state.buses.remove(name);
379 state.rules.retain(|k, _| k.0 != name);
380
381 Ok(AwsResponse::ok_json(json!({})))
382 }
383
384 fn list_event_buses(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
385 let body = req.json_body();
386 validate_optional_string_length_value("NamePrefix", &body["NamePrefix"], 1, 256)?;
394 validate_optional_string_length_value("NextToken", &body["NextToken"], 1, 2048)?;
395 validate_optional_json_range("Limit", &body["Limit"], 1, 100)?;
396 let name_prefix = body["NamePrefix"].as_str();
397 let limit = body["Limit"].as_i64().unwrap_or(100).clamp(1, 100) as usize;
398
399 let accounts = self.state.read();
400 let empty = EventBridgeState::new(&req.account_id, &req.region);
401 let state = accounts.get(&req.account_id).unwrap_or(&empty);
402 let filtered: Vec<&_> = state
403 .buses
404 .values()
405 .filter(|b| match name_prefix {
406 Some(prefix) => b.name.starts_with(prefix),
407 None => true,
408 })
409 .collect();
410
411 let (page, next_token) = paginate(&filtered, body["NextToken"].as_str(), limit);
412 let buses: Vec<Value> = page
413 .iter()
414 .map(|b| json!({ "Name": b.name, "Arn": b.arn }))
415 .collect();
416 let mut resp = json!({ "EventBuses": buses });
417 if let Some(token) = next_token {
418 resp["NextToken"] = json!(token);
419 }
420
421 Ok(AwsResponse::ok_json(resp))
422 }
423
424 fn describe_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
425 let body = req.json_body();
426 validate_optional_string_length("name", body["Name"].as_str(), 1, 1600)?;
427 let name = body["Name"].as_str().unwrap_or("default");
428
429 let accounts = self.state.read();
430 let empty = EventBridgeState::new(&req.account_id, &req.region);
431 let state = accounts.get(&req.account_id).unwrap_or(&empty);
432 let bus = state.buses.get(name).ok_or_else(|| {
433 AwsServiceError::aws_error(
434 StatusCode::BAD_REQUEST,
435 "ResourceNotFoundException",
436 format!("Event bus {name} does not exist."),
437 )
438 })?;
439
440 let mut resp = json!({
441 "Name": bus.name,
442 "Arn": bus.arn,
443 "CreationTime": bus.creation_time.timestamp() as f64,
444 "LastModifiedTime": bus.last_modified_time.timestamp() as f64,
445 });
446
447 if let Some(ref policy) = bus.policy {
448 resp["Policy"] = Value::String(serde_json::to_string(policy).unwrap());
449 }
450 if let Some(ref desc) = bus.description {
451 resp["Description"] = json!(desc);
452 }
453 if let Some(ref kms) = bus.kms_key_identifier {
454 resp["KmsKeyIdentifier"] = json!(kms);
455 }
456 if let Some(ref dlc) = bus.dead_letter_config {
457 resp["DeadLetterConfig"] = dlc.clone();
458 }
459
460 Ok(AwsResponse::ok_json(resp))
461 }
462
463 fn put_permission(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
466 let body = req.json_body();
467 validate_optional_string_length_value("EventBusName", &body["EventBusName"], 1, 256)?;
474 validate_optional_string_length_value("Action", &body["Action"], 1, 64)?;
475 validate_optional_string_length_value("Principal", &body["Principal"], 1, 12)?;
476 validate_optional_string_length_value("StatementId", &body["StatementId"], 1, 64)?;
477 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
478
479 let mut accounts = self.state.write();
480 let state = accounts.get_or_create(&req.account_id);
481
482 let bus = state.buses.get_mut(event_bus_name).ok_or_else(|| {
483 AwsServiceError::aws_error(
484 StatusCode::BAD_REQUEST,
485 "ResourceNotFoundException",
486 format!("Event bus {event_bus_name} does not exist."),
487 )
488 })?;
489
490 if let Some(policy_str) = body["Policy"].as_str() {
492 if let Ok(policy) = serde_json::from_str::<Value>(policy_str) {
493 bus.policy = Some(policy);
494 return Ok(AwsResponse::ok_json(json!({})));
495 }
496 }
497
498 let action = body["Action"].as_str().unwrap_or("");
505 let principal = body["Principal"].as_str().unwrap_or("");
506 let statement_id = body["StatementId"].as_str().unwrap_or("");
507
508 let statement = json!({
514 "Sid": statement_id,
515 "Effect": "Allow",
516 "Principal": { "AWS": Arn::global("iam", principal, "root").to_string() },
517 "Action": action,
518 "Resource": bus.arn,
519 });
520
521 let policy = bus.policy.get_or_insert_with(|| {
522 json!({
523 "Version": "2012-10-17",
524 "Statement": [],
525 })
526 });
527
528 if let Some(stmts) = policy["Statement"].as_array_mut() {
529 stmts.push(statement);
530 }
531
532 Ok(AwsResponse::ok_json(json!({})))
533 }
534
535 fn remove_permission(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
536 let body = req.json_body();
537 validate_optional_string_length("statementId", body["StatementId"].as_str(), 1, 64)?;
538 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 256)?;
539 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
540 let statement_id = body["StatementId"].as_str().unwrap_or("");
541 let remove_all = body["RemoveAllPermissions"].as_bool().unwrap_or(false);
542
543 let mut accounts = self.state.write();
544 let state = accounts.get_or_create(&req.account_id);
545
546 let bus = state.buses.get_mut(event_bus_name).ok_or_else(|| {
547 AwsServiceError::aws_error(
548 StatusCode::BAD_REQUEST,
549 "ResourceNotFoundException",
550 format!("Event bus {event_bus_name} does not exist."),
551 )
552 })?;
553
554 if remove_all {
555 bus.policy = None;
556 return Ok(AwsResponse::ok_json(json!({})));
557 }
558
559 let policy = bus.policy.as_mut().ok_or_else(|| {
560 AwsServiceError::aws_error(
561 StatusCode::BAD_REQUEST,
562 "ResourceNotFoundException",
563 "EventBus does not have a policy.",
564 )
565 })?;
566
567 if let Some(stmts) = policy["Statement"].as_array_mut() {
568 let before = stmts.len();
569 stmts.retain(|s| s["Sid"].as_str() != Some(statement_id));
570 if stmts.len() == before {
571 return Err(AwsServiceError::aws_error(
572 StatusCode::BAD_REQUEST,
573 "ResourceNotFoundException",
574 "Statement with the provided id does not exist.",
575 ));
576 }
577 if stmts.is_empty() {
578 bus.policy = None;
579 }
580 }
581
582 Ok(AwsResponse::ok_json(json!({})))
583 }
584
585 fn put_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
588 let body = req.json_body();
589 validate_required("Name", &body["Name"])?;
600 let name = body["Name"]
601 .as_str()
602 .ok_or_else(|| missing("Name"))?
603 .to_string();
604 validate_string_length("Name", &name, 1, 64)?;
605 validate_optional_string_length_value(
606 "ScheduleExpression",
607 &body["ScheduleExpression"],
608 0,
609 256,
610 )?;
611 validate_optional_string_length_value("EventPattern", &body["EventPattern"], 0, 4096)?;
612 validate_optional_enum_value(
613 "State",
614 &body["State"],
615 &[
616 "ENABLED",
617 "DISABLED",
618 "ENABLED_WITH_ALL_CLOUDTRAIL_MANAGEMENT_EVENTS",
619 ],
620 )?;
621 validate_optional_string_length_value("Description", &body["Description"], 0, 512)?;
622 validate_optional_string_length_value("RoleArn", &body["RoleArn"], 1, 1600)?;
623 validate_optional_string_length_value("EventBusName", &body["EventBusName"], 1, 1600)?;
624
625 let raw_bus = body["EventBusName"]
626 .as_str()
627 .unwrap_or("default")
628 .to_string();
629
630 let mut accounts = self.state.write();
631 let state = accounts.get_or_create(&req.account_id);
632 let event_bus_name = state.resolve_bus_name(&raw_bus);
633
634 let event_pattern = body["EventPattern"].as_str().and_then(|s| {
635 if s.is_empty() {
636 None
637 } else {
638 Some(s.to_string())
639 }
640 });
641 let schedule_expression = body["ScheduleExpression"].as_str().and_then(|s| {
642 if s.is_empty() {
643 None
644 } else {
645 Some(s.to_string())
646 }
647 });
648 let description = body["Description"].as_str().map(|s| s.to_string());
649 let role_arn = body["RoleArn"].as_str().map(|s| s.to_string());
650 let rule_state = body["State"].as_str().unwrap_or("ENABLED").to_string();
651
652 if !state.buses.contains_key(&event_bus_name) {
658 return Err(AwsServiceError::aws_error(
659 StatusCode::BAD_REQUEST,
660 "ResourceNotFoundException",
661 format!("Event bus {event_bus_name} does not exist."),
662 ));
663 }
664
665 let arn = if event_bus_name == "default" {
666 format!(
667 "arn:aws:events:{}:{}:rule/{}",
668 req.region, state.account_id, name
669 )
670 } else {
671 format!(
672 "arn:aws:events:{}:{}:rule/{}/{}",
673 req.region, state.account_id, event_bus_name, name
674 )
675 };
676
677 let key = (event_bus_name.clone(), name.clone());
678 let targets = state
679 .rules
680 .get(&key)
681 .map(|r| r.targets.clone())
682 .unwrap_or_default();
683
684 let tags = parse_tags(&body);
685
686 let rule = EventRule {
687 name: name.clone(),
688 arn: arn.clone(),
689 event_bus_name,
690 event_pattern,
691 schedule_expression,
692 state: rule_state,
693 description,
694 role_arn,
695 managed_by: None,
696 created_by: None,
697 targets,
698 tags,
699 last_fired: None,
700 };
701
702 state.rules.insert(key, rule);
703 Ok(AwsResponse::ok_json(json!({ "RuleArn": arn })))
704 }
705
706 fn delete_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
707 let body = req.json_body();
708 validate_required("Name", &body["Name"])?;
709 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
710 validate_string_length("name", name, 1, 64)?;
711 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
712 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
713
714 let mut accounts = self.state.write();
715 let state = accounts.get_or_create(&req.account_id);
716 let bus_name = state.resolve_bus_name(event_bus_name);
717 let key = (bus_name, name.to_string());
718
719 if let Some(rule) = state.rules.get(&key) {
721 if !rule.targets.is_empty() {
722 return Err(AwsServiceError::aws_error(
723 StatusCode::BAD_REQUEST,
724 "ValidationException",
725 "Rule can't be deleted since it has targets.",
726 ));
727 }
728 }
729
730 state.rules.remove(&key);
731 Ok(AwsResponse::ok_json(json!({})))
732 }
733
734 fn list_rules(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
735 let body = req.json_body();
736 validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 64)?;
737 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
738 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
739 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
740 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
741 let name_prefix = body["NamePrefix"].as_str();
742 let limit = body["Limit"].as_u64().map(|n| n as usize);
743 let next_token = body["NextToken"].as_str();
744
745 let accounts = self.state.read();
746 let empty = EventBridgeState::new(&req.account_id, &req.region);
747 let state = accounts.get(&req.account_id).unwrap_or(&empty);
748 let bus_name = state.resolve_bus_name(event_bus_name);
749
750 let mut rules: Vec<&EventRule> = state
751 .rules
752 .values()
753 .filter(|r| r.event_bus_name == bus_name)
754 .filter(|r| match name_prefix {
755 Some(prefix) => r.name.starts_with(prefix),
756 None => true,
757 })
758 .collect();
759 rules.sort_by(|a, b| a.name.cmp(&b.name));
760
761 let start = next_token
763 .and_then(|t| t.parse::<usize>().ok())
764 .unwrap_or(0)
765 .min(rules.len());
766 let rules_slice = &rules[start..];
767
768 let (page, new_next_token) = if let Some(lim) = limit {
769 if rules_slice.len() > lim {
770 (&rules_slice[..lim], Some((start + lim).to_string()))
771 } else {
772 (rules_slice, None)
773 }
774 } else {
775 (rules_slice, None)
776 };
777
778 let rules_json: Vec<Value> = page
779 .iter()
780 .map(|r| {
781 let mut obj = json!({
782 "Name": r.name,
783 "Arn": r.arn,
784 "EventBusName": r.event_bus_name,
785 "State": r.state,
786 });
787 if let Some(ref desc) = r.description {
788 obj["Description"] = json!(desc);
789 }
790 if let Some(ref ep) = r.event_pattern {
791 obj["EventPattern"] = json!(ep);
792 }
793 if let Some(ref se) = r.schedule_expression {
794 obj["ScheduleExpression"] = json!(se);
795 }
796 if let Some(ref role) = r.role_arn {
797 obj["RoleArn"] = json!(role);
798 }
799 if let Some(ref mb) = r.managed_by {
800 obj["ManagedBy"] = json!(mb);
801 }
802 obj
803 })
804 .collect();
805
806 let mut resp = json!({ "Rules": rules_json });
807 if let Some(token) = new_next_token {
808 resp["NextToken"] = json!(token);
809 }
810
811 Ok(AwsResponse::ok_json(resp))
812 }
813
814 fn describe_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
815 let body = req.json_body();
816 validate_required("Name", &body["Name"])?;
817 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
818 validate_string_length("name", name, 1, 64)?;
819 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
820 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
821
822 let accounts = self.state.read();
823 let empty = EventBridgeState::new(&req.account_id, &req.region);
824 let state = accounts.get(&req.account_id).unwrap_or(&empty);
825 let bus_name = state.resolve_bus_name(event_bus_name);
826 let key = (bus_name.clone(), name.to_string());
827
828 let rule = state.rules.get(&key).ok_or_else(|| {
829 AwsServiceError::aws_error(
830 StatusCode::BAD_REQUEST,
831 "ResourceNotFoundException",
832 format!("Rule {name} does not exist."),
833 )
834 })?;
835
836 let mut resp = json!({
837 "Name": rule.name,
838 "Arn": rule.arn,
839 "EventBusName": rule.event_bus_name,
840 "State": rule.state,
841 });
842
843 if let Some(ref desc) = rule.description {
844 resp["Description"] = json!(desc);
845 }
846 if let Some(ref ep) = rule.event_pattern {
847 resp["EventPattern"] = json!(ep);
848 }
849 if let Some(ref se) = rule.schedule_expression {
850 resp["ScheduleExpression"] = json!(se);
851 }
852 if let Some(ref role) = rule.role_arn {
853 resp["RoleArn"] = json!(role);
854 }
855 if let Some(ref mb) = rule.managed_by {
856 resp["ManagedBy"] = json!(mb);
857 }
858 if let Some(ref cb) = rule.created_by {
859 resp["CreatedBy"] = json!(cb);
860 }
861 if rule.event_bus_name != "default" && rule.created_by.is_none() {
863 resp["CreatedBy"] = json!(state.account_id);
864 }
865
866 Ok(AwsResponse::ok_json(resp))
867 }
868
869 fn enable_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
870 let body = req.json_body();
871 validate_required("Name", &body["Name"])?;
872 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
873 validate_string_length("name", name, 1, 64)?;
874 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
875 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
876
877 let mut accounts = self.state.write();
878 let state = accounts.get_or_create(&req.account_id);
879 let bus_name = state.resolve_bus_name(event_bus_name);
880 let key = (bus_name, name.to_string());
881
882 let rule = state.rules.get_mut(&key).ok_or_else(|| {
883 AwsServiceError::aws_error(
884 StatusCode::BAD_REQUEST,
885 "ResourceNotFoundException",
886 format!("Rule {name} does not exist."),
887 )
888 })?;
889
890 rule.state = "ENABLED".to_string();
891 Ok(AwsResponse::ok_json(json!({})))
892 }
893
894 fn disable_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
895 let body = req.json_body();
896 validate_required("Name", &body["Name"])?;
897 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
898 validate_string_length("name", name, 1, 64)?;
899 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
900 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
901
902 let mut accounts = self.state.write();
903 let state = accounts.get_or_create(&req.account_id);
904 let bus_name = state.resolve_bus_name(event_bus_name);
905 let key = (bus_name, name.to_string());
906
907 let rule = state.rules.get_mut(&key).ok_or_else(|| {
908 AwsServiceError::aws_error(
909 StatusCode::BAD_REQUEST,
910 "ResourceNotFoundException",
911 format!("Rule {name} does not exist."),
912 )
913 })?;
914
915 rule.state = "DISABLED".to_string();
916 Ok(AwsResponse::ok_json(json!({})))
917 }
918
919 fn put_targets(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
922 let body = req.json_body();
923 validate_required("Rule", &body["Rule"])?;
930 let rule_name = body["Rule"].as_str().ok_or_else(|| missing("Rule"))?;
931 validate_string_length("Rule", rule_name, 1, 64)?;
932 validate_optional_string_length_value("EventBusName", &body["EventBusName"], 1, 1600)?;
933 validate_required("Targets", &body["Targets"])?;
938 let targets_array = body["Targets"].as_array().ok_or_else(|| {
939 AwsServiceError::aws_error(
940 StatusCode::BAD_REQUEST,
941 "ValidationException",
942 "Targets must be a list",
943 )
944 })?;
945 if targets_array.is_empty() || targets_array.len() > 100 {
946 return Err(AwsServiceError::aws_error(
947 StatusCode::BAD_REQUEST,
948 "ValidationException",
949 "Value at 'Targets' failed to satisfy constraint: \
950 Member must have length between 1 and 100",
951 ));
952 }
953 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
954 let targets: Vec<Value> = targets_array.clone();
955
956 let mut accounts = self.state.write();
957 let state = accounts.get_or_create(&req.account_id);
958 let bus_name = state.resolve_bus_name(event_bus_name);
959 let key = (bus_name.clone(), rule_name.to_string());
960
961 let rule = state.rules.get_mut(&key).ok_or_else(|| {
962 AwsServiceError::aws_error(
963 StatusCode::BAD_REQUEST,
964 "ResourceNotFoundException",
965 format!("Rule {rule_name} does not exist on EventBus {bus_name}."),
966 )
967 })?;
968
969 let mut failed_entries: Vec<Value> = Vec::new();
970 for target in &targets {
971 let target_id = target["Id"].as_str().unwrap_or("").to_string();
972 let target_arn = target["Arn"].as_str().unwrap_or("");
973
974 if target_arn.ends_with(".fifo") && target.get("SqsParameters").is_none() {
975 failed_entries.push(json!({
976 "TargetId": target_id,
977 "ErrorCode": "ValidationException",
978 "ErrorMessage": format!(
979 "Parameter(s) SqsParameters must be specified for target: {target_id}."
980 ),
981 }));
982 continue;
983 }
984 if !target_arn.starts_with("arn:") {
985 failed_entries.push(json!({
986 "TargetId": target_id,
987 "ErrorCode": "ValidationException",
988 "ErrorMessage": format!(
989 "Parameter {target_arn} is not valid. Reason: Provided Arn is not in correct format."
990 ),
991 }));
992 continue;
993 }
994
995 let et = parse_target(target);
996 rule.targets.retain(|t| t.id != et.id);
997 rule.targets.push(et);
998 }
999
1000 Ok(AwsResponse::ok_json(json!({
1001 "FailedEntryCount": failed_entries.len(),
1002 "FailedEntries": failed_entries,
1003 })))
1004 }
1005
1006 fn remove_targets(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1007 let body = req.json_body();
1008 validate_required("Rule", &body["Rule"])?;
1009 let rule_name = body["Rule"].as_str().ok_or_else(|| missing("Rule"))?;
1010 validate_string_length("rule", rule_name, 1, 64)?;
1011 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
1012 validate_required("Ids", &body["Ids"])?;
1013 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
1014 let ids = body["Ids"].as_array().ok_or_else(|| missing("Ids"))?;
1015
1016 let target_ids: Vec<String> = ids
1017 .iter()
1018 .filter_map(|v| v.as_str().map(|s| s.to_string()))
1019 .collect();
1020
1021 let mut accounts = self.state.write();
1022 let state = accounts.get_or_create(&req.account_id);
1023 let bus_name = state.resolve_bus_name(event_bus_name);
1024 let key = (bus_name.clone(), rule_name.to_string());
1025
1026 let rule = state.rules.get_mut(&key).ok_or_else(|| {
1027 AwsServiceError::aws_error(
1028 StatusCode::BAD_REQUEST,
1029 "ResourceNotFoundException",
1030 format!("Rule {rule_name} does not exist on EventBus {bus_name}."),
1031 )
1032 })?;
1033
1034 rule.targets.retain(|t| !target_ids.contains(&t.id));
1035
1036 Ok(AwsResponse::ok_json(json!({
1037 "FailedEntryCount": 0,
1038 "FailedEntries": [],
1039 })))
1040 }
1041
1042 fn list_targets_by_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1043 let body = req.json_body();
1044 validate_required("Rule", &body["Rule"])?;
1045 let rule_name = body["Rule"].as_str().ok_or_else(|| missing("Rule"))?;
1046 validate_string_length("rule", rule_name, 1, 64)?;
1047 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
1048 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1049 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1050 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
1051 let limit = body["Limit"].as_u64().map(|n| n as usize);
1052 let next_token = body["NextToken"].as_str();
1053
1054 let accounts = self.state.read();
1055 let empty = EventBridgeState::new(&req.account_id, &req.region);
1056 let state = accounts.get(&req.account_id).unwrap_or(&empty);
1057 let bus_name = state.resolve_bus_name(event_bus_name);
1058 let key = (bus_name, rule_name.to_string());
1059
1060 let rule = state.rules.get(&key).ok_or_else(|| {
1061 AwsServiceError::aws_error(
1062 StatusCode::BAD_REQUEST,
1063 "ResourceNotFoundException",
1064 format!("Rule {rule_name} does not exist."),
1065 )
1066 })?;
1067
1068 let all_targets = &rule.targets;
1069 let start = next_token
1070 .and_then(|t| t.parse::<usize>().ok())
1071 .unwrap_or(0)
1072 .min(all_targets.len());
1073 let slice = &all_targets[start..];
1074
1075 let (page, new_next_token) = if let Some(lim) = limit {
1076 if slice.len() > lim {
1077 (&slice[..lim], Some((start + lim).to_string()))
1078 } else {
1079 (slice, None)
1080 }
1081 } else {
1082 (slice, None)
1083 };
1084
1085 let targets: Vec<Value> = page.iter().map(target_to_json).collect();
1086
1087 let mut resp = json!({ "Targets": targets });
1088 if let Some(token) = new_next_token {
1089 resp["NextToken"] = json!(token);
1090 }
1091
1092 Ok(AwsResponse::ok_json(resp))
1093 }
1094
1095 fn list_rule_names_by_target(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1096 let body = req.json_body();
1097 validate_required("TargetArn", &body["TargetArn"])?;
1098 let target_arn = body["TargetArn"]
1099 .as_str()
1100 .ok_or_else(|| missing("TargetArn"))?;
1101 validate_string_length("targetArn", target_arn, 1, 1600)?;
1102 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
1103 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1104 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1105 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
1106 let limit = body["Limit"].as_u64().map(|n| n as usize);
1107 let next_token = body["NextToken"].as_str();
1108
1109 let accounts = self.state.read();
1110 let empty = EventBridgeState::new(&req.account_id, &req.region);
1111 let state = accounts.get(&req.account_id).unwrap_or(&empty);
1112 let bus_name = state.resolve_bus_name(event_bus_name);
1113
1114 let mut rule_names: Vec<String> = Vec::new();
1116 for rule in state.rules.values() {
1117 if rule.event_bus_name == bus_name
1118 && rule.targets.iter().any(|t| t.arn == target_arn)
1119 && !rule_names.contains(&rule.name)
1120 {
1121 rule_names.push(rule.name.clone());
1122 }
1123 }
1124 rule_names.sort();
1125
1126 let start = next_token
1127 .and_then(|t| t.parse::<usize>().ok())
1128 .unwrap_or(0)
1129 .min(rule_names.len());
1130 let slice = &rule_names[start..];
1131
1132 let (page, new_next_token) = if let Some(lim) = limit {
1133 if slice.len() > lim {
1134 (&slice[..lim], Some((start + lim).to_string()))
1135 } else {
1136 (slice, None)
1137 }
1138 } else {
1139 (slice, None)
1140 };
1141
1142 let mut resp = json!({ "RuleNames": page });
1143 if let Some(token) = new_next_token {
1144 resp["NextToken"] = json!(token);
1145 }
1146
1147 Ok(AwsResponse::ok_json(resp))
1148 }
1149
1150 fn test_event_pattern(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1153 let body = req.json_body();
1154 validate_required("EventPattern", &body["EventPattern"])?;
1155 validate_required("Event", &body["Event"])?;
1156 let event_pattern = body["EventPattern"]
1157 .as_str()
1158 .ok_or_else(|| missing("EventPattern"))?;
1159 let event_str = body["Event"].as_str().ok_or_else(|| missing("Event"))?;
1160
1161 let event: Value = serde_json::from_str(event_str).map_err(|_| {
1163 AwsServiceError::aws_error(
1164 StatusCode::BAD_REQUEST,
1165 "InvalidEventPatternException",
1166 "Event is not valid JSON.",
1167 )
1168 })?;
1169
1170 let _pattern: Value = serde_json::from_str(event_pattern).map_err(|_| {
1172 AwsServiceError::aws_error(
1173 StatusCode::BAD_REQUEST,
1174 "InvalidEventPatternException",
1175 "Event pattern is not valid JSON.",
1176 )
1177 })?;
1178
1179 let source = event["source"].as_str().unwrap_or("");
1180 let detail_type = event["detail-type"].as_str().unwrap_or("");
1181 let detail = event
1182 .get("detail")
1183 .map(|v| serde_json::to_string(v).unwrap_or_default())
1184 .unwrap_or_else(|| "{}".to_string());
1185 let account = event["account"].as_str().unwrap_or("");
1186 let region = event["region"].as_str().unwrap_or("");
1187 let resources: Vec<String> = event["resources"]
1188 .as_array()
1189 .map(|arr| {
1190 arr.iter()
1191 .filter_map(|v| v.as_str().map(|s| s.to_string()))
1192 .collect()
1193 })
1194 .unwrap_or_default();
1195
1196 let result = matches_pattern(
1197 Some(event_pattern),
1198 source,
1199 detail_type,
1200 &detail,
1201 account,
1202 region,
1203 &resources,
1204 );
1205
1206 Ok(AwsResponse::ok_json(json!({ "Result": result })))
1207 }
1208
1209 fn update_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1212 let body = req.json_body();
1213 validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
1214 validate_optional_string_length(
1215 "kmsKeyIdentifier",
1216 body["KmsKeyIdentifier"].as_str(),
1217 0,
1218 2048,
1219 )?;
1220 let name = body["Name"].as_str().unwrap_or("default");
1221
1222 let mut accounts = self.state.write();
1223 let state = accounts.get_or_create(&req.account_id);
1224 let bus = state.buses.get_mut(name).ok_or_else(|| {
1225 AwsServiceError::aws_error(
1226 StatusCode::BAD_REQUEST,
1227 "ResourceNotFoundException",
1228 format!("Event bus {name} does not exist."),
1229 )
1230 })?;
1231
1232 if let Some(desc) = body["Description"].as_str() {
1233 bus.description = Some(desc.to_string());
1234 }
1235 if let Some(kms) = body["KmsKeyIdentifier"].as_str() {
1236 bus.kms_key_identifier = Some(kms.to_string());
1237 }
1238 if let Some(dlc) = body.get("DeadLetterConfig") {
1239 bus.dead_letter_config = Some(dlc.clone());
1240 }
1241 bus.last_modified_time = Utc::now();
1242
1243 let arn = bus.arn.clone();
1244 let bus_name = bus.name.clone();
1245
1246 Ok(AwsResponse::ok_json(json!({
1247 "Arn": arn,
1248 "Name": bus_name,
1249 })))
1250 }
1251
1252 fn put_events(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1255 let body = req.json_body();
1256 validate_optional_string_length_value("EndpointId", &body["EndpointId"], 1, 50)?;
1260 validate_required("Entries", &body["Entries"])?;
1261 let entries_array = body["Entries"].as_array().ok_or_else(|| {
1262 AwsServiceError::aws_error(
1263 StatusCode::BAD_REQUEST,
1264 "ValidationException",
1265 "Entries must be a list",
1266 )
1267 })?;
1268 if entries_array.is_empty() || entries_array.len() > 10 {
1269 return Err(AwsServiceError::aws_error(
1270 StatusCode::BAD_REQUEST,
1271 "ValidationException",
1272 "Value at 'Entries' failed to satisfy constraint: \
1273 Member must have length between 1 and 10",
1274 ));
1275 }
1276 let entries: Vec<Value> = entries_array.clone();
1277 let entries = &entries;
1278
1279 let mut accounts = self.state.write();
1280 let state = accounts.get_or_create(&req.account_id);
1281 let mut result_entries = Vec::new();
1282 let mut events_to_deliver = Vec::new();
1283 let mut failed_count = 0;
1284
1285 for entry in entries {
1286 let source = entry["Source"].as_str().unwrap_or("").to_string();
1287 let detail_type = entry["DetailType"].as_str().unwrap_or("").to_string();
1288 let detail = entry["Detail"].as_str().unwrap_or("").to_string();
1289
1290 if let Err(error) = validate_put_events_entry(&source, &detail_type, &detail) {
1291 failed_count += 1;
1292 result_entries.push(error);
1293 continue;
1294 }
1295
1296 let event_id = uuid::Uuid::new_v4().to_string();
1297 let raw_bus = entry["EventBusName"]
1298 .as_str()
1299 .unwrap_or("default")
1300 .to_string();
1301 let event_bus_name = state.resolve_bus_name(&raw_bus);
1302
1303 let caller_account = req
1309 .principal
1310 .as_ref()
1311 .map(|p| p.account_id.as_str())
1312 .unwrap_or(req.account_id.as_str());
1313 if caller_account != req.account_id {
1314 let bus_policy_value = state
1315 .buses
1316 .get(&event_bus_name)
1317 .and_then(|b| b.policy.clone());
1318 if let Some(policy_value) = bus_policy_value {
1319 let policy_json = serde_json::to_string(&policy_value).unwrap_or_default();
1320 let policy_doc = fakecloud_iam::evaluator::PolicyDocument::parse(&policy_json);
1321 let bus_arn = state
1322 .buses
1323 .get(&event_bus_name)
1324 .map(|b| b.arn.clone())
1325 .unwrap_or_default();
1326 let principal =
1327 req.principal
1328 .clone()
1329 .unwrap_or_else(|| fakecloud_core::auth::Principal {
1330 arn: Arn::global("iam", caller_account, "root").to_string(),
1331 user_id: caller_account.to_string(),
1332 account_id: caller_account.to_string(),
1333 principal_type: fakecloud_core::auth::PrincipalType::Root,
1334 source_identity: None,
1335 tags: None,
1336 });
1337 let context = fakecloud_iam::evaluator::RequestContext {
1338 aws_principal_arn: Some(principal.arn.clone()),
1339 aws_principal_account: Some(principal.account_id.clone()),
1340 ..Default::default()
1341 };
1342 let eval_req = fakecloud_iam::evaluator::EvalRequest {
1343 principal: &principal,
1344 action: "events:PutEvents".to_string(),
1345 resource: bus_arn,
1346 context,
1347 };
1348 let decision = fakecloud_iam::evaluator::evaluate_resource_policy_only(
1349 &policy_doc,
1350 &eval_req,
1351 );
1352 if !matches!(decision, fakecloud_iam::evaluator::Decision::Allow) {
1353 failed_count += 1;
1354 result_entries.push(json!({
1355 "ErrorCode": "AccessDeniedException",
1356 "ErrorMessage": format!(
1357 "User '{}' is not authorized to put events on event bus '{}'",
1358 principal.arn, event_bus_name
1359 ),
1360 }));
1361 continue;
1362 }
1363 }
1364 }
1365
1366 let time = parse_put_events_time(&entry["Time"]);
1367 let resources: Vec<String> = entry["Resources"]
1368 .as_array()
1369 .map(|arr| {
1370 arr.iter()
1371 .filter_map(|v| v.as_str().map(|s| s.to_string()))
1372 .collect()
1373 })
1374 .unwrap_or_default();
1375
1376 let event = PutEvent {
1377 event_id: event_id.clone(),
1378 source: source.clone(),
1379 detail_type: detail_type.clone(),
1380 detail: detail.clone(),
1381 event_bus_name: event_bus_name.clone(),
1382 time,
1383 resources: resources.clone(),
1384 };
1385
1386 archive_matching_event(
1387 state,
1388 &event,
1389 &event_bus_name,
1390 &source,
1391 &detail_type,
1392 &detail,
1393 &req.account_id,
1394 &req.region,
1395 &resources,
1396 );
1397
1398 state.events.push(event);
1399
1400 let matching_targets: Vec<EventTarget> = state
1402 .rules
1403 .values()
1404 .filter(|r| {
1405 r.event_bus_name == event_bus_name
1406 && r.state == "ENABLED"
1407 && matches_pattern(
1408 r.event_pattern.as_deref(),
1409 &source,
1410 &detail_type,
1411 &detail,
1412 &req.account_id,
1413 &req.region,
1414 &resources,
1415 )
1416 })
1417 .flat_map(|r| r.targets.clone())
1418 .collect();
1419
1420 if !matching_targets.is_empty() {
1421 events_to_deliver.push((
1422 event_id.clone(),
1423 source,
1424 detail_type,
1425 detail,
1426 time,
1427 resources,
1428 matching_targets,
1429 ));
1430 }
1431
1432 result_entries.push(json!({ "EventId": event_id }));
1433 }
1434
1435 drop(accounts);
1437
1438 for (event_id, source, detail_type, detail, time, resources, targets) in events_to_deliver {
1443 let detail_value: Value = serde_json::from_str(&detail).unwrap_or(json!({}));
1444 let event_json = json!({
1445 "version": "0",
1446 "id": event_id,
1447 "source": source,
1448 "account": req.account_id,
1449 "detail-type": detail_type,
1450 "detail": detail_value,
1451 "time": time.format("%Y-%m-%dT%H:%M:%SZ").to_string(),
1452 "region": req.region,
1453 "resources": resources,
1454 });
1455
1456 let ctx = EventDispatchContext {
1457 state: &self.state,
1458 delivery: &self.delivery,
1459 lambda_state: self.lambda_state.as_ref(),
1460 logs_state: self.logs_state.as_ref(),
1461 container_runtime: &self.container_runtime,
1462 account_id: &req.account_id,
1463 region: &req.region,
1464 };
1465 for target in targets {
1466 dispatch_event_target(&ctx, &target, &event_json, &event_id, &detail_type);
1467 }
1468 }
1469
1470 let resp = json!({
1471 "FailedEntryCount": failed_count,
1472 "Entries": result_entries,
1473 });
1474
1475 Ok(AwsResponse::ok_json(resp))
1476 }
1477
1478 fn tag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1481 let body = req.json_body();
1482 validate_required("ResourceARN", &body["ResourceARN"])?;
1483 let arn = body["ResourceARN"]
1484 .as_str()
1485 .ok_or_else(|| missing("ResourceARN"))?;
1486 validate_string_length("resourceARN", arn, 1, 1600)?;
1487 validate_required("Tags", &body["Tags"])?;
1488
1489 let mut accounts = self.state.write();
1490 let state = accounts.get_or_create(&req.account_id);
1491 let tag_map = find_tags_mut(state, arn)?;
1492
1493 fakecloud_core::tags::apply_tags(tag_map, &body, "Tags", "Key", "Value").map_err(|f| {
1494 AwsServiceError::aws_error(
1495 StatusCode::BAD_REQUEST,
1496 "ValidationException",
1497 format!("{f} must be a list"),
1498 )
1499 })?;
1500
1501 Ok(AwsResponse::ok_json(json!({})))
1502 }
1503
1504 fn untag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1505 let body = req.json_body();
1506 validate_required("ResourceARN", &body["ResourceARN"])?;
1507 let arn = body["ResourceARN"]
1508 .as_str()
1509 .ok_or_else(|| missing("ResourceARN"))?;
1510 validate_string_length("resourceARN", arn, 1, 1600)?;
1511 validate_required("TagKeys", &body["TagKeys"])?;
1512
1513 let mut accounts = self.state.write();
1514 let state = accounts.get_or_create(&req.account_id);
1515 let tag_map = find_tags_mut(state, arn)?;
1516
1517 fakecloud_core::tags::remove_tags(tag_map, &body, "TagKeys").map_err(|f| {
1518 AwsServiceError::aws_error(
1519 StatusCode::BAD_REQUEST,
1520 "ValidationException",
1521 format!("{f} must be a list"),
1522 )
1523 })?;
1524
1525 Ok(AwsResponse::ok_json(json!({})))
1526 }
1527
1528 fn list_tags_for_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1529 let body = req.json_body();
1530 validate_required("ResourceARN", &body["ResourceARN"])?;
1531 let arn = body["ResourceARN"]
1532 .as_str()
1533 .ok_or_else(|| missing("ResourceARN"))?;
1534 validate_string_length("resourceARN", arn, 1, 1600)?;
1535
1536 let accounts = self.state.read();
1537 let empty = EventBridgeState::new(&req.account_id, &req.region);
1538 let state = accounts.get(&req.account_id).unwrap_or(&empty);
1539 let tag_map = find_tags(state, arn)?;
1540
1541 let tags = fakecloud_core::tags::tags_to_json(tag_map, "Key", "Value");
1542
1543 Ok(AwsResponse::ok_json(json!({ "Tags": tags })))
1544 }
1545
1546 }
1548
1549struct StartReplayInput {
1559 name: String,
1560 description: Option<String>,
1561 event_source_arn: String,
1562 destination: Value,
1563 destination_arn: String,
1564 event_start_time: DateTime<Utc>,
1565 event_end_time: DateTime<Utc>,
1566}
1567
1568impl StartReplayInput {
1569 fn from_body(body: &Value) -> Result<Self, AwsServiceError> {
1570 let name = body["ReplayName"].as_str().unwrap_or("").to_string();
1578 let description = body["Description"].as_str().map(|s| s.to_string());
1579 let event_source_arn = body["EventSourceArn"].as_str().unwrap_or("").to_string();
1580 let destination = body["Destination"].clone();
1581
1582 let event_start_time = body["EventStartTime"]
1583 .as_f64()
1584 .and_then(|f| DateTime::from_timestamp(f as i64, 0))
1585 .unwrap_or_else(Utc::now);
1586 let event_end_time = body["EventEndTime"]
1587 .as_f64()
1588 .and_then(|f| DateTime::from_timestamp(f as i64, 0))
1589 .unwrap_or_else(Utc::now);
1590
1591 let destination_arn = destination["Arn"].as_str().unwrap_or("").to_string();
1592 if !destination_arn.contains(":event-bus/") {
1593 return Err(AwsServiceError::aws_error(
1597 StatusCode::BAD_REQUEST,
1598 "ResourceNotFoundException",
1599 format!("Destination.Arn {destination_arn} does not point to an event bus."),
1600 ));
1601 }
1602
1603 Ok(Self {
1604 name,
1605 description,
1606 event_source_arn,
1607 destination,
1608 destination_arn,
1609 event_start_time,
1610 event_end_time,
1611 })
1612 }
1613}
1614
1615#[path = "service_archives_replays.rs"]
1616mod service_archives_replays;
1617#[path = "service_connections_apidests.rs"]
1618mod service_connections_apidests;
1619#[path = "service_endpoints.rs"]
1620mod service_endpoints;
1621#[path = "service_partner_sources.rs"]
1622mod service_partner_sources;
1623
1624#[path = "helpers.rs"]
1625mod helpers;
1626pub(crate) use helpers::*;
1627
1628#[cfg(test)]
1629#[path = "service_tests.rs"]
1630mod tests;
1631
1632#[cfg(test)]
1633mod pagination_reject_test {
1634 #[test]
1635 fn paginate_checked_rejects_invalid_token() {
1636 use fakecloud_core::pagination::paginate_checked;
1637 let items: Vec<i32> = (0..5).collect();
1638 assert!(paginate_checked(&items, Some("bad"), 3).is_err());
1639 assert!(paginate_checked(&items, Some("2"), 3).is_ok());
1640 }
1641}