1use async_trait::async_trait;
2use chrono::{DateTime, Utc};
3use http::StatusCode;
4use serde_json::{json, Value};
5
6use std::collections::{BTreeMap, HashMap};
7use std::sync::Arc;
8
9use tokio::sync::Mutex as AsyncMutex;
10
11use fakecloud_aws::arn::Arn;
12use fakecloud_core::delivery::DeliveryBus;
13use fakecloud_core::pagination::paginate;
14use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
15use fakecloud_core::validation::*;
16use fakecloud_persistence::SnapshotStore;
17
18use fakecloud_lambda::runtime::ContainerRuntime;
19use fakecloud_lambda::{LambdaInvocation, SharedLambdaState};
20use fakecloud_logs::SharedLogsState;
21
22use crate::state::{
23 ApiDestination, Archive, Connection, Endpoint, EventBridgeSnapshot, EventBridgeState, EventBus,
24 EventRule, EventTarget, PartnerEventSource, PutEvent, Replay, SharedEventBridgeState,
25 EVENTBRIDGE_SNAPSHOT_SCHEMA_VERSION,
26};
27
28pub struct EventBridgeService {
29 state: SharedEventBridgeState,
30 delivery: Arc<DeliveryBus>,
31 lambda_state: Option<SharedLambdaState>,
32 logs_state: Option<SharedLogsState>,
33 container_runtime: Option<Arc<ContainerRuntime>>,
34 snapshot_store: Option<Arc<dyn SnapshotStore>>,
35 snapshot_lock: Arc<AsyncMutex<()>>,
36}
37
38impl EventBridgeService {
39 pub fn new(state: SharedEventBridgeState, delivery: Arc<DeliveryBus>) -> Self {
40 Self {
41 state,
42 delivery,
43 lambda_state: None,
44 logs_state: None,
45 container_runtime: None,
46 snapshot_store: None,
47 snapshot_lock: Arc::new(AsyncMutex::new(())),
48 }
49 }
50
51 pub fn with_lambda(mut self, lambda_state: SharedLambdaState) -> Self {
52 self.lambda_state = Some(lambda_state);
53 self
54 }
55
56 pub fn with_logs(mut self, logs_state: SharedLogsState) -> Self {
57 self.logs_state = Some(logs_state);
58 self
59 }
60
61 pub fn with_runtime(mut self, runtime: Arc<ContainerRuntime>) -> Self {
62 self.container_runtime = Some(runtime);
63 self
64 }
65
66 pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
67 self.snapshot_store = Some(store);
68 self
69 }
70
71 async fn save_snapshot(&self) {
75 save_eventbridge_snapshot(
76 &self.state,
77 self.snapshot_store.clone(),
78 &self.snapshot_lock,
79 )
80 .await;
81 }
82
83 pub fn snapshot_hook(&self) -> Option<fakecloud_persistence::SnapshotHook> {
88 let store = self.snapshot_store.clone()?;
89 let state = self.state.clone();
90 let lock = self.snapshot_lock.clone();
91 Some(Arc::new(move || {
92 let state = state.clone();
93 let store = store.clone();
94 let lock = lock.clone();
95 Box::pin(async move {
96 save_eventbridge_snapshot(&state, Some(store), &lock).await;
97 })
98 }))
99 }
100}
101
102pub async fn save_eventbridge_snapshot(
108 state: &SharedEventBridgeState,
109 store: Option<Arc<dyn SnapshotStore>>,
110 lock: &AsyncMutex<()>,
111) {
112 let Some(store) = store else {
113 return;
114 };
115 let _guard = lock.lock().await;
116 let snapshot = EventBridgeSnapshot {
117 schema_version: EVENTBRIDGE_SNAPSHOT_SCHEMA_VERSION,
118 accounts: Some(state.read().clone()),
119 state: None,
120 };
121 let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
122 let bytes = serde_json::to_vec(&snapshot)
123 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
124 store.save(&bytes)
125 })
126 .await;
127 match join {
128 Ok(Ok(())) => {}
129 Ok(Err(err)) => tracing::error!(%err, "failed to write eventbridge snapshot"),
130 Err(err) => tracing::error!(%err, "eventbridge snapshot task panicked"),
131 }
132}
133
134#[async_trait]
135impl AwsService for EventBridgeService {
136 fn service_name(&self) -> &str {
137 "events"
138 }
139
140 async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
141 let mutates = is_mutating_action(req.action.as_str());
142 let result = match req.action.as_str() {
143 "CreateEventBus" => self.create_event_bus(&req),
144 "DeleteEventBus" => self.delete_event_bus(&req),
145 "ListEventBuses" => self.list_event_buses(&req),
146 "DescribeEventBus" => self.describe_event_bus(&req),
147 "PutRule" => self.put_rule(&req),
148 "DeleteRule" => self.delete_rule(&req),
149 "ListRules" => self.list_rules(&req),
150 "DescribeRule" => self.describe_rule(&req),
151 "EnableRule" => self.enable_rule(&req),
152 "DisableRule" => self.disable_rule(&req),
153 "PutTargets" => self.put_targets(&req),
154 "RemoveTargets" => self.remove_targets(&req),
155 "ListTargetsByRule" => self.list_targets_by_rule(&req),
156 "ListRuleNamesByTarget" => self.list_rule_names_by_target(&req),
157 "PutEvents" => self.put_events(&req),
158 "PutPermission" => self.put_permission(&req),
159 "RemovePermission" => self.remove_permission(&req),
160 "TagResource" => self.tag_resource(&req),
161 "UntagResource" => self.untag_resource(&req),
162 "ListTagsForResource" => self.list_tags_for_resource(&req),
163 "CreateArchive" => self.create_archive(&req),
164 "DescribeArchive" => self.describe_archive(&req),
165 "ListArchives" => self.list_archives(&req),
166 "UpdateArchive" => self.update_archive(&req),
167 "DeleteArchive" => self.delete_archive(&req),
168 "CreateConnection" => self.create_connection(&req),
169 "DescribeConnection" => self.describe_connection(&req),
170 "ListConnections" => self.list_connections(&req),
171 "UpdateConnection" => self.update_connection(&req),
172 "DeleteConnection" => self.delete_connection(&req),
173 "CreateApiDestination" => self.create_api_destination(&req),
174 "DescribeApiDestination" => self.describe_api_destination(&req),
175 "ListApiDestinations" => self.list_api_destinations(&req),
176 "UpdateApiDestination" => self.update_api_destination(&req),
177 "DeleteApiDestination" => self.delete_api_destination(&req),
178 "StartReplay" => self.start_replay(&req),
179 "DescribeReplay" => self.describe_replay(&req),
180 "ListReplays" => self.list_replays(&req),
181 "CancelReplay" => self.cancel_replay(&req),
182 "CreatePartnerEventSource" => self.create_partner_event_source(&req),
183 "DeletePartnerEventSource" => self.delete_partner_event_source(&req),
184 "DescribePartnerEventSource" => self.describe_partner_event_source(&req),
185 "ListPartnerEventSources" => self.list_partner_event_sources(&req),
186 "ListPartnerEventSourceAccounts" => self.list_partner_event_source_accounts(&req),
187 "ActivateEventSource" => self.activate_event_source(&req),
188 "DeactivateEventSource" => self.deactivate_event_source(&req),
189 "DescribeEventSource" => self.describe_event_source(&req),
190 "ListEventSources" => self.list_event_sources(&req),
191 "PutPartnerEvents" => self.put_partner_events(&req),
192 "TestEventPattern" => self.test_event_pattern(&req),
193 "UpdateEventBus" => self.update_event_bus(&req),
194 "CreateEndpoint" => self.create_endpoint(&req),
195 "DeleteEndpoint" => self.delete_endpoint(&req),
196 "DescribeEndpoint" => self.describe_endpoint(&req),
197 "ListEndpoints" => self.list_endpoints(&req),
198 "UpdateEndpoint" => self.update_endpoint(&req),
199 "DeauthorizeConnection" => self.deauthorize_connection(&req),
200 _ => Err(AwsServiceError::action_not_implemented(
201 "events",
202 &req.action,
203 )),
204 };
205 if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
206 self.save_snapshot().await;
207 }
208 result
209 }
210
211 fn supported_actions(&self) -> &[&str] {
212 &[
213 "CreateEventBus",
214 "DeleteEventBus",
215 "ListEventBuses",
216 "DescribeEventBus",
217 "PutRule",
218 "DeleteRule",
219 "ListRules",
220 "DescribeRule",
221 "EnableRule",
222 "DisableRule",
223 "PutTargets",
224 "RemoveTargets",
225 "ListTargetsByRule",
226 "ListRuleNamesByTarget",
227 "PutEvents",
228 "PutPermission",
229 "RemovePermission",
230 "TagResource",
231 "UntagResource",
232 "ListTagsForResource",
233 "CreateArchive",
234 "DescribeArchive",
235 "ListArchives",
236 "UpdateArchive",
237 "DeleteArchive",
238 "CreateConnection",
239 "DescribeConnection",
240 "ListConnections",
241 "UpdateConnection",
242 "DeleteConnection",
243 "CreateApiDestination",
244 "DescribeApiDestination",
245 "ListApiDestinations",
246 "UpdateApiDestination",
247 "DeleteApiDestination",
248 "StartReplay",
249 "DescribeReplay",
250 "ListReplays",
251 "CancelReplay",
252 "CreatePartnerEventSource",
253 "DeletePartnerEventSource",
254 "DescribePartnerEventSource",
255 "ListPartnerEventSources",
256 "ListPartnerEventSourceAccounts",
257 "ActivateEventSource",
258 "DeactivateEventSource",
259 "DescribeEventSource",
260 "ListEventSources",
261 "PutPartnerEvents",
262 "TestEventPattern",
263 "UpdateEventBus",
264 "CreateEndpoint",
265 "DeleteEndpoint",
266 "DescribeEndpoint",
267 "ListEndpoints",
268 "UpdateEndpoint",
269 "DeauthorizeConnection",
270 ]
271 }
272}
273
274impl EventBridgeService {
276 fn create_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
277 let body = req.json_body();
278 validate_required("Name", &body["Name"])?;
279 let name = body["Name"]
280 .as_str()
281 .ok_or_else(|| missing("Name"))?
282 .to_string();
283 validate_string_length("name", &name, 1, 256)?;
284 validate_optional_string_length(
285 "eventSourceName",
286 body["EventSourceName"].as_str(),
287 1,
288 256,
289 )?;
290 validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
291 validate_optional_string_length(
292 "kmsKeyIdentifier",
293 body["KmsKeyIdentifier"].as_str(),
294 0,
295 2048,
296 )?;
297
298 if name.contains('/') && !name.starts_with("aws.partner/") {
300 return Err(AwsServiceError::aws_error(
301 StatusCode::BAD_REQUEST,
302 "ValidationException",
303 "Event bus name must not contain '/'.",
304 ));
305 }
306
307 if name.starts_with("aws.partner/") {
309 let event_source = body["EventSourceName"].as_str().unwrap_or("");
310 let accounts_r = self.state.read();
311 let empty_r = EventBridgeState::new(&req.account_id, &req.region);
312 let state_r = accounts_r.get(&req.account_id).unwrap_or(&empty_r);
313 let has_source = state_r.partner_event_sources.contains_key(event_source);
314 drop(accounts_r);
315 if !has_source {
316 return Err(AwsServiceError::aws_error(
317 StatusCode::BAD_REQUEST,
318 "ResourceNotFoundException",
319 format!("Event source {event_source} does not exist."),
320 ));
321 }
322 }
323
324 let mut accounts = self.state.write();
325 let state = accounts.get_or_create(&req.account_id);
326
327 if state.buses.contains_key(&name) {
328 return Err(AwsServiceError::aws_error(
329 StatusCode::BAD_REQUEST,
330 "ResourceAlreadyExistsException",
331 format!("Event bus {name} already exists."),
332 ));
333 }
334
335 let arn = format!(
336 "arn:aws:events:{}:{}:event-bus/{}",
337 req.region, state.account_id, name
338 );
339 let now = Utc::now();
340 let description = body["Description"].as_str().map(|s| s.to_string());
341 let kms_key_identifier = body["KmsKeyIdentifier"].as_str().map(|s| s.to_string());
342 let dead_letter_config = body.get("DeadLetterConfig").cloned();
343
344 let tags = parse_tags(&body);
345
346 let bus = EventBus {
347 name: name.clone(),
348 arn: arn.clone(),
349 tags,
350 policy: None,
351 description,
352 kms_key_identifier,
353 dead_letter_config,
354 creation_time: now,
355 last_modified_time: now,
356 };
357 state.buses.insert(name, bus);
358
359 Ok(AwsResponse::ok_json(json!({ "EventBusArn": arn })))
360 }
361
362 fn delete_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
363 let body = req.json_body();
364 validate_required("Name", &body["Name"])?;
365 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
366 validate_string_length("name", name, 1, 256)?;
367
368 if name == "default" {
369 return Err(AwsServiceError::aws_error(
370 StatusCode::BAD_REQUEST,
371 "ValidationException",
372 format!("Cannot delete event bus {name}."),
373 ));
374 }
375
376 let mut accounts = self.state.write();
377 let state = accounts.get_or_create(&req.account_id);
378 state.buses.remove(name);
379 state.rules.retain(|k, _| k.0 != name);
380
381 Ok(AwsResponse::ok_json(json!({})))
382 }
383
384 fn list_event_buses(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
385 let body = req.json_body();
386 validate_optional_string_length_value("NamePrefix", &body["NamePrefix"], 1, 256)?;
394 validate_optional_string_length_value("NextToken", &body["NextToken"], 1, 2048)?;
395 validate_optional_json_range("Limit", &body["Limit"], 1, 100)?;
396 let name_prefix = body["NamePrefix"].as_str();
397 let limit = body["Limit"].as_i64().unwrap_or(100).clamp(1, 100) as usize;
398
399 let accounts = self.state.read();
400 let empty = EventBridgeState::new(&req.account_id, &req.region);
401 let state = accounts.get(&req.account_id).unwrap_or(&empty);
402 let filtered: Vec<&_> = state
403 .buses
404 .values()
405 .filter(|b| match name_prefix {
406 Some(prefix) => b.name.starts_with(prefix),
407 None => true,
408 })
409 .collect();
410
411 let (page, next_token) = paginate(&filtered, body["NextToken"].as_str(), limit);
412 let buses: Vec<Value> = page
413 .iter()
414 .map(|b| {
415 json!({
419 "Name": b.name,
420 "Arn": b.arn,
421 "CreationTime": b.creation_time.timestamp() as f64,
422 "LastModifiedTime": b.last_modified_time.timestamp() as f64,
423 })
424 })
425 .collect();
426 let mut resp = json!({ "EventBuses": buses });
427 if let Some(token) = next_token {
428 resp["NextToken"] = json!(token);
429 }
430
431 Ok(AwsResponse::ok_json(resp))
432 }
433
434 fn describe_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
435 let body = req.json_body();
436 validate_optional_string_length("name", body["Name"].as_str(), 1, 1600)?;
437 let name = body["Name"].as_str().unwrap_or("default");
438
439 let accounts = self.state.read();
440 let empty = EventBridgeState::new(&req.account_id, &req.region);
441 let state = accounts.get(&req.account_id).unwrap_or(&empty);
442 let bus = state.buses.get(name).ok_or_else(|| {
443 AwsServiceError::aws_error(
444 StatusCode::BAD_REQUEST,
445 "ResourceNotFoundException",
446 format!("Event bus {name} does not exist."),
447 )
448 })?;
449
450 let mut resp = json!({
451 "Name": bus.name,
452 "Arn": bus.arn,
453 "CreationTime": bus.creation_time.timestamp() as f64,
454 "LastModifiedTime": bus.last_modified_time.timestamp() as f64,
455 });
456
457 if let Some(ref policy) = bus.policy {
458 resp["Policy"] = Value::String(serde_json::to_string(policy).unwrap());
459 }
460 if let Some(ref desc) = bus.description {
461 resp["Description"] = json!(desc);
462 }
463 if let Some(ref kms) = bus.kms_key_identifier {
464 resp["KmsKeyIdentifier"] = json!(kms);
465 }
466 if let Some(ref dlc) = bus.dead_letter_config {
467 resp["DeadLetterConfig"] = dlc.clone();
468 }
469
470 Ok(AwsResponse::ok_json(resp))
471 }
472
473 fn put_permission(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
476 let body = req.json_body();
477 validate_optional_string_length_value("EventBusName", &body["EventBusName"], 1, 256)?;
484 validate_optional_string_length_value("Action", &body["Action"], 1, 64)?;
485 validate_optional_string_length_value("Principal", &body["Principal"], 1, 12)?;
486 validate_optional_string_length_value("StatementId", &body["StatementId"], 1, 64)?;
487 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
488
489 let mut accounts = self.state.write();
490 let state = accounts.get_or_create(&req.account_id);
491
492 let bus = state.buses.get_mut(event_bus_name).ok_or_else(|| {
493 AwsServiceError::aws_error(
494 StatusCode::BAD_REQUEST,
495 "ResourceNotFoundException",
496 format!("Event bus {event_bus_name} does not exist."),
497 )
498 })?;
499
500 if let Some(policy_str) = body["Policy"].as_str() {
502 if let Ok(policy) = serde_json::from_str::<Value>(policy_str) {
503 bus.policy = Some(policy);
504 return Ok(AwsResponse::ok_json(json!({})));
505 }
506 }
507
508 let action = body["Action"].as_str().unwrap_or("");
515 let principal = body["Principal"].as_str().unwrap_or("");
516 let statement_id = body["StatementId"].as_str().unwrap_or("");
517
518 let principal_value = if principal == "*" {
527 json!("*")
528 } else {
529 json!({ "AWS": Arn::global("iam", principal, "root").to_string() })
530 };
531 let statement = json!({
532 "Sid": statement_id,
533 "Effect": "Allow",
534 "Principal": principal_value,
535 "Action": action,
536 "Resource": bus.arn,
537 });
538
539 let policy = bus.policy.get_or_insert_with(|| {
540 json!({
541 "Version": "2012-10-17",
542 "Statement": [],
543 })
544 });
545
546 if let Some(stmts) = policy["Statement"].as_array_mut() {
547 stmts.retain(|s| s["Sid"].as_str() != Some(statement_id));
551 stmts.push(statement);
552 }
553
554 Ok(AwsResponse::ok_json(json!({})))
555 }
556
557 fn remove_permission(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
558 let body = req.json_body();
559 validate_optional_string_length("statementId", body["StatementId"].as_str(), 1, 64)?;
560 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 256)?;
561 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
562 let statement_id = body["StatementId"].as_str().unwrap_or("");
563 let remove_all = body["RemoveAllPermissions"].as_bool().unwrap_or(false);
564
565 let mut accounts = self.state.write();
566 let state = accounts.get_or_create(&req.account_id);
567
568 let bus = state.buses.get_mut(event_bus_name).ok_or_else(|| {
569 AwsServiceError::aws_error(
570 StatusCode::BAD_REQUEST,
571 "ResourceNotFoundException",
572 format!("Event bus {event_bus_name} does not exist."),
573 )
574 })?;
575
576 if remove_all {
577 bus.policy = None;
578 return Ok(AwsResponse::ok_json(json!({})));
579 }
580
581 let policy = bus.policy.as_mut().ok_or_else(|| {
582 AwsServiceError::aws_error(
583 StatusCode::BAD_REQUEST,
584 "ResourceNotFoundException",
585 "EventBus does not have a policy.",
586 )
587 })?;
588
589 if let Some(stmts) = policy["Statement"].as_array_mut() {
590 let before = stmts.len();
591 stmts.retain(|s| s["Sid"].as_str() != Some(statement_id));
592 if stmts.len() == before {
593 return Err(AwsServiceError::aws_error(
594 StatusCode::BAD_REQUEST,
595 "ResourceNotFoundException",
596 "Statement with the provided id does not exist.",
597 ));
598 }
599 if stmts.is_empty() {
600 bus.policy = None;
601 }
602 }
603
604 Ok(AwsResponse::ok_json(json!({})))
605 }
606
607 fn put_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
610 let body = req.json_body();
611 validate_required("Name", &body["Name"])?;
622 let name = body["Name"]
623 .as_str()
624 .ok_or_else(|| missing("Name"))?
625 .to_string();
626 validate_string_length("Name", &name, 1, 64)?;
627 validate_optional_string_length_value(
628 "ScheduleExpression",
629 &body["ScheduleExpression"],
630 0,
631 256,
632 )?;
633 validate_optional_string_length_value("EventPattern", &body["EventPattern"], 0, 4096)?;
634 if let Some(pattern) = body["EventPattern"].as_str().filter(|s| !s.is_empty()) {
638 validate_event_pattern(pattern)?;
639 }
640 validate_optional_enum_value(
641 "State",
642 &body["State"],
643 &[
644 "ENABLED",
645 "DISABLED",
646 "ENABLED_WITH_ALL_CLOUDTRAIL_MANAGEMENT_EVENTS",
647 ],
648 )?;
649 validate_optional_string_length_value("Description", &body["Description"], 0, 512)?;
650 validate_optional_string_length_value("RoleArn", &body["RoleArn"], 1, 1600)?;
651 validate_optional_string_length_value("EventBusName", &body["EventBusName"], 1, 1600)?;
652
653 let raw_bus = body["EventBusName"]
654 .as_str()
655 .unwrap_or("default")
656 .to_string();
657
658 let mut accounts = self.state.write();
659 let state = accounts.get_or_create(&req.account_id);
660 let event_bus_name = state.resolve_bus_name(&raw_bus);
661
662 let event_pattern = body["EventPattern"].as_str().and_then(|s| {
663 if s.is_empty() {
664 None
665 } else {
666 Some(s.to_string())
667 }
668 });
669 let schedule_expression = body["ScheduleExpression"].as_str().and_then(|s| {
670 if s.is_empty() {
671 None
672 } else {
673 Some(s.to_string())
674 }
675 });
676 let description = body["Description"].as_str().map(|s| s.to_string());
677 let role_arn = body["RoleArn"].as_str().map(|s| s.to_string());
678 let rule_state = body["State"].as_str().unwrap_or("ENABLED").to_string();
679
680 if !state.buses.contains_key(&event_bus_name) {
686 return Err(AwsServiceError::aws_error(
687 StatusCode::BAD_REQUEST,
688 "ResourceNotFoundException",
689 format!("Event bus {event_bus_name} does not exist."),
690 ));
691 }
692
693 let arn = if event_bus_name == "default" {
694 format!(
695 "arn:aws:events:{}:{}:rule/{}",
696 req.region, state.account_id, name
697 )
698 } else {
699 format!(
700 "arn:aws:events:{}:{}:rule/{}/{}",
701 req.region, state.account_id, event_bus_name, name
702 )
703 };
704
705 let key = (event_bus_name.clone(), name.clone());
706 let targets = state
707 .rules
708 .get(&key)
709 .map(|r| r.targets.clone())
710 .unwrap_or_default();
711
712 let tags = parse_tags(&body);
713
714 let rule = EventRule {
715 name: name.clone(),
716 arn: arn.clone(),
717 event_bus_name,
718 event_pattern,
719 schedule_expression,
720 state: rule_state,
721 description,
722 role_arn,
723 managed_by: None,
724 created_by: None,
725 targets,
726 tags,
727 last_fired: None,
728 };
729
730 state.rules.insert(key, rule);
731 Ok(AwsResponse::ok_json(json!({ "RuleArn": arn })))
732 }
733
734 fn delete_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
735 let body = req.json_body();
736 validate_required("Name", &body["Name"])?;
737 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
738 validate_string_length("name", name, 1, 64)?;
739 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
740 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
741
742 let mut accounts = self.state.write();
743 let state = accounts.get_or_create(&req.account_id);
744 let bus_name = state.resolve_bus_name(event_bus_name);
745 let key = (bus_name, name.to_string());
746
747 if let Some(rule) = state.rules.get(&key) {
749 if !rule.targets.is_empty() {
750 return Err(AwsServiceError::aws_error(
751 StatusCode::BAD_REQUEST,
752 "ValidationException",
753 "Rule can't be deleted since it has targets.",
754 ));
755 }
756 }
757
758 state.rules.remove(&key);
759 Ok(AwsResponse::ok_json(json!({})))
760 }
761
762 fn list_rules(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
763 let body = req.json_body();
764 validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 64)?;
765 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
766 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
767 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
768 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
769 let name_prefix = body["NamePrefix"].as_str();
770 let limit = body["Limit"].as_u64().map(|n| n as usize);
771 let next_token = body["NextToken"].as_str();
772
773 let accounts = self.state.read();
774 let empty = EventBridgeState::new(&req.account_id, &req.region);
775 let state = accounts.get(&req.account_id).unwrap_or(&empty);
776 let bus_name = state.resolve_bus_name(event_bus_name);
777
778 let mut rules: Vec<&EventRule> = state
779 .rules
780 .values()
781 .filter(|r| r.event_bus_name == bus_name)
782 .filter(|r| match name_prefix {
783 Some(prefix) => r.name.starts_with(prefix),
784 None => true,
785 })
786 .collect();
787 rules.sort_by(|a, b| a.name.cmp(&b.name));
788
789 let start = next_token
791 .and_then(|t| t.parse::<usize>().ok())
792 .unwrap_or(0)
793 .min(rules.len());
794 let rules_slice = &rules[start..];
795
796 let (page, new_next_token) = if let Some(lim) = limit {
797 if rules_slice.len() > lim {
798 (&rules_slice[..lim], Some((start + lim).to_string()))
799 } else {
800 (rules_slice, None)
801 }
802 } else {
803 (rules_slice, None)
804 };
805
806 let rules_json: Vec<Value> = page
807 .iter()
808 .map(|r| {
809 let mut obj = json!({
810 "Name": r.name,
811 "Arn": r.arn,
812 "EventBusName": r.event_bus_name,
813 "State": r.state,
814 });
815 if let Some(ref desc) = r.description {
816 obj["Description"] = json!(desc);
817 }
818 if let Some(ref ep) = r.event_pattern {
819 obj["EventPattern"] = json!(ep);
820 }
821 if let Some(ref se) = r.schedule_expression {
822 obj["ScheduleExpression"] = json!(se);
823 }
824 if let Some(ref role) = r.role_arn {
825 obj["RoleArn"] = json!(role);
826 }
827 if let Some(ref mb) = r.managed_by {
828 obj["ManagedBy"] = json!(mb);
829 }
830 obj
831 })
832 .collect();
833
834 let mut resp = json!({ "Rules": rules_json });
835 if let Some(token) = new_next_token {
836 resp["NextToken"] = json!(token);
837 }
838
839 Ok(AwsResponse::ok_json(resp))
840 }
841
842 fn describe_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
843 let body = req.json_body();
844 validate_required("Name", &body["Name"])?;
845 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
846 validate_string_length("name", name, 1, 64)?;
847 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
848 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
849
850 let accounts = self.state.read();
851 let empty = EventBridgeState::new(&req.account_id, &req.region);
852 let state = accounts.get(&req.account_id).unwrap_or(&empty);
853 let bus_name = state.resolve_bus_name(event_bus_name);
854 let key = (bus_name.clone(), name.to_string());
855
856 let rule = state.rules.get(&key).ok_or_else(|| {
857 AwsServiceError::aws_error(
858 StatusCode::BAD_REQUEST,
859 "ResourceNotFoundException",
860 format!("Rule {name} does not exist."),
861 )
862 })?;
863
864 let mut resp = json!({
865 "Name": rule.name,
866 "Arn": rule.arn,
867 "EventBusName": rule.event_bus_name,
868 "State": rule.state,
869 });
870
871 if let Some(ref desc) = rule.description {
872 resp["Description"] = json!(desc);
873 }
874 if let Some(ref ep) = rule.event_pattern {
875 resp["EventPattern"] = json!(ep);
876 }
877 if let Some(ref se) = rule.schedule_expression {
878 resp["ScheduleExpression"] = json!(se);
879 }
880 if let Some(ref role) = rule.role_arn {
881 resp["RoleArn"] = json!(role);
882 }
883 if let Some(ref mb) = rule.managed_by {
884 resp["ManagedBy"] = json!(mb);
885 }
886 if let Some(ref cb) = rule.created_by {
887 resp["CreatedBy"] = json!(cb);
888 }
889 if rule.event_bus_name != "default" && rule.created_by.is_none() {
891 resp["CreatedBy"] = json!(state.account_id);
892 }
893
894 Ok(AwsResponse::ok_json(resp))
895 }
896
897 fn enable_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
898 let body = req.json_body();
899 validate_required("Name", &body["Name"])?;
900 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
901 validate_string_length("name", name, 1, 64)?;
902 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
903 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
904
905 let mut accounts = self.state.write();
906 let state = accounts.get_or_create(&req.account_id);
907 let bus_name = state.resolve_bus_name(event_bus_name);
908 let key = (bus_name, name.to_string());
909
910 let rule = state.rules.get_mut(&key).ok_or_else(|| {
911 AwsServiceError::aws_error(
912 StatusCode::BAD_REQUEST,
913 "ResourceNotFoundException",
914 format!("Rule {name} does not exist."),
915 )
916 })?;
917
918 rule.state = "ENABLED".to_string();
919 Ok(AwsResponse::ok_json(json!({})))
920 }
921
922 fn disable_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
923 let body = req.json_body();
924 validate_required("Name", &body["Name"])?;
925 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
926 validate_string_length("name", name, 1, 64)?;
927 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
928 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
929
930 let mut accounts = self.state.write();
931 let state = accounts.get_or_create(&req.account_id);
932 let bus_name = state.resolve_bus_name(event_bus_name);
933 let key = (bus_name, name.to_string());
934
935 let rule = state.rules.get_mut(&key).ok_or_else(|| {
936 AwsServiceError::aws_error(
937 StatusCode::BAD_REQUEST,
938 "ResourceNotFoundException",
939 format!("Rule {name} does not exist."),
940 )
941 })?;
942
943 rule.state = "DISABLED".to_string();
944 Ok(AwsResponse::ok_json(json!({})))
945 }
946
947 fn put_targets(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
950 let body = req.json_body();
951 validate_required("Rule", &body["Rule"])?;
958 let rule_name = body["Rule"].as_str().ok_or_else(|| missing("Rule"))?;
959 validate_string_length("Rule", rule_name, 1, 64)?;
960 validate_optional_string_length_value("EventBusName", &body["EventBusName"], 1, 1600)?;
961 validate_required("Targets", &body["Targets"])?;
966 let targets_array = body["Targets"].as_array().ok_or_else(|| {
967 AwsServiceError::aws_error(
968 StatusCode::BAD_REQUEST,
969 "ValidationException",
970 "Targets must be a list",
971 )
972 })?;
973 if targets_array.is_empty() || targets_array.len() > 100 {
974 return Err(AwsServiceError::aws_error(
975 StatusCode::BAD_REQUEST,
976 "ValidationException",
977 "Value at 'Targets' failed to satisfy constraint: \
978 Member must have length between 1 and 100",
979 ));
980 }
981 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
982 let targets: Vec<Value> = targets_array.clone();
983
984 let mut accounts = self.state.write();
985 let state = accounts.get_or_create(&req.account_id);
986 let bus_name = state.resolve_bus_name(event_bus_name);
987 let key = (bus_name.clone(), rule_name.to_string());
988
989 let rule = state.rules.get_mut(&key).ok_or_else(|| {
990 AwsServiceError::aws_error(
991 StatusCode::BAD_REQUEST,
992 "ResourceNotFoundException",
993 format!("Rule {rule_name} does not exist on EventBus {bus_name}."),
994 )
995 })?;
996
997 let mut failed_entries: Vec<Value> = Vec::new();
998 for target in &targets {
999 let target_id = target["Id"].as_str().unwrap_or("").to_string();
1000 let target_arn = target["Arn"].as_str().unwrap_or("");
1001
1002 if target_arn.ends_with(".fifo") && target.get("SqsParameters").is_none() {
1003 failed_entries.push(json!({
1004 "TargetId": target_id,
1005 "ErrorCode": "ValidationException",
1006 "ErrorMessage": format!(
1007 "Parameter(s) SqsParameters must be specified for target: {target_id}."
1008 ),
1009 }));
1010 continue;
1011 }
1012 if !target_arn.starts_with("arn:") {
1013 failed_entries.push(json!({
1014 "TargetId": target_id,
1015 "ErrorCode": "ValidationException",
1016 "ErrorMessage": format!(
1017 "Parameter {target_arn} is not valid. Reason: Provided Arn is not in correct format."
1018 ),
1019 }));
1020 continue;
1021 }
1022
1023 let et = parse_target(target);
1024 rule.targets.retain(|t| t.id != et.id);
1025 rule.targets.push(et);
1026 }
1027
1028 Ok(AwsResponse::ok_json(json!({
1029 "FailedEntryCount": failed_entries.len(),
1030 "FailedEntries": failed_entries,
1031 })))
1032 }
1033
1034 fn remove_targets(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1035 let body = req.json_body();
1036 validate_required("Rule", &body["Rule"])?;
1037 let rule_name = body["Rule"].as_str().ok_or_else(|| missing("Rule"))?;
1038 validate_string_length("rule", rule_name, 1, 64)?;
1039 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
1040 validate_required("Ids", &body["Ids"])?;
1041 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
1042 let ids = body["Ids"].as_array().ok_or_else(|| missing("Ids"))?;
1043
1044 let target_ids: Vec<String> = ids
1045 .iter()
1046 .filter_map(|v| v.as_str().map(|s| s.to_string()))
1047 .collect();
1048
1049 let mut accounts = self.state.write();
1050 let state = accounts.get_or_create(&req.account_id);
1051 let bus_name = state.resolve_bus_name(event_bus_name);
1052 let key = (bus_name.clone(), rule_name.to_string());
1053
1054 let rule = state.rules.get_mut(&key).ok_or_else(|| {
1055 AwsServiceError::aws_error(
1056 StatusCode::BAD_REQUEST,
1057 "ResourceNotFoundException",
1058 format!("Rule {rule_name} does not exist on EventBus {bus_name}."),
1059 )
1060 })?;
1061
1062 rule.targets.retain(|t| !target_ids.contains(&t.id));
1063
1064 Ok(AwsResponse::ok_json(json!({
1065 "FailedEntryCount": 0,
1066 "FailedEntries": [],
1067 })))
1068 }
1069
1070 fn list_targets_by_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1071 let body = req.json_body();
1072 validate_required("Rule", &body["Rule"])?;
1073 let rule_name = body["Rule"].as_str().ok_or_else(|| missing("Rule"))?;
1074 validate_string_length("rule", rule_name, 1, 64)?;
1075 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
1076 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1077 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1078 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
1079 let limit = body["Limit"].as_u64().map(|n| n as usize);
1080 let next_token = body["NextToken"].as_str();
1081
1082 let accounts = self.state.read();
1083 let empty = EventBridgeState::new(&req.account_id, &req.region);
1084 let state = accounts.get(&req.account_id).unwrap_or(&empty);
1085 let bus_name = state.resolve_bus_name(event_bus_name);
1086 let key = (bus_name, rule_name.to_string());
1087
1088 let rule = state.rules.get(&key).ok_or_else(|| {
1089 AwsServiceError::aws_error(
1090 StatusCode::BAD_REQUEST,
1091 "ResourceNotFoundException",
1092 format!("Rule {rule_name} does not exist."),
1093 )
1094 })?;
1095
1096 let all_targets = &rule.targets;
1097 let start = next_token
1098 .and_then(|t| t.parse::<usize>().ok())
1099 .unwrap_or(0)
1100 .min(all_targets.len());
1101 let slice = &all_targets[start..];
1102
1103 let (page, new_next_token) = if let Some(lim) = limit {
1104 if slice.len() > lim {
1105 (&slice[..lim], Some((start + lim).to_string()))
1106 } else {
1107 (slice, None)
1108 }
1109 } else {
1110 (slice, None)
1111 };
1112
1113 let targets: Vec<Value> = page.iter().map(target_to_json).collect();
1114
1115 let mut resp = json!({ "Targets": targets });
1116 if let Some(token) = new_next_token {
1117 resp["NextToken"] = json!(token);
1118 }
1119
1120 Ok(AwsResponse::ok_json(resp))
1121 }
1122
1123 fn list_rule_names_by_target(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1124 let body = req.json_body();
1125 validate_required("TargetArn", &body["TargetArn"])?;
1126 let target_arn = body["TargetArn"]
1127 .as_str()
1128 .ok_or_else(|| missing("TargetArn"))?;
1129 validate_string_length("targetArn", target_arn, 1, 1600)?;
1130 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
1131 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1132 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1133 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
1134 let limit = body["Limit"].as_u64().map(|n| n as usize);
1135 let next_token = body["NextToken"].as_str();
1136
1137 let accounts = self.state.read();
1138 let empty = EventBridgeState::new(&req.account_id, &req.region);
1139 let state = accounts.get(&req.account_id).unwrap_or(&empty);
1140 let bus_name = state.resolve_bus_name(event_bus_name);
1141
1142 let mut rule_names: Vec<String> = Vec::new();
1144 for rule in state.rules.values() {
1145 if rule.event_bus_name == bus_name
1146 && rule.targets.iter().any(|t| t.arn == target_arn)
1147 && !rule_names.contains(&rule.name)
1148 {
1149 rule_names.push(rule.name.clone());
1150 }
1151 }
1152 rule_names.sort();
1153
1154 let start = next_token
1155 .and_then(|t| t.parse::<usize>().ok())
1156 .unwrap_or(0)
1157 .min(rule_names.len());
1158 let slice = &rule_names[start..];
1159
1160 let (page, new_next_token) = if let Some(lim) = limit {
1161 if slice.len() > lim {
1162 (&slice[..lim], Some((start + lim).to_string()))
1163 } else {
1164 (slice, None)
1165 }
1166 } else {
1167 (slice, None)
1168 };
1169
1170 let mut resp = json!({ "RuleNames": page });
1171 if let Some(token) = new_next_token {
1172 resp["NextToken"] = json!(token);
1173 }
1174
1175 Ok(AwsResponse::ok_json(resp))
1176 }
1177
1178 fn test_event_pattern(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1181 let body = req.json_body();
1182 validate_required("EventPattern", &body["EventPattern"])?;
1183 validate_required("Event", &body["Event"])?;
1184 let event_pattern = body["EventPattern"]
1185 .as_str()
1186 .ok_or_else(|| missing("EventPattern"))?;
1187 let event_str = body["Event"].as_str().ok_or_else(|| missing("Event"))?;
1188
1189 let event: Value = serde_json::from_str(event_str).map_err(|_| {
1191 AwsServiceError::aws_error(
1192 StatusCode::BAD_REQUEST,
1193 "InvalidEventPatternException",
1194 "Event is not valid JSON.",
1195 )
1196 })?;
1197
1198 let _pattern: Value = serde_json::from_str(event_pattern).map_err(|_| {
1200 AwsServiceError::aws_error(
1201 StatusCode::BAD_REQUEST,
1202 "InvalidEventPatternException",
1203 "Event pattern is not valid JSON.",
1204 )
1205 })?;
1206
1207 let source = event["source"].as_str().unwrap_or("");
1208 let detail_type = event["detail-type"].as_str().unwrap_or("");
1209 let detail = event
1210 .get("detail")
1211 .map(|v| serde_json::to_string(v).unwrap_or_default())
1212 .unwrap_or_else(|| "{}".to_string());
1213 let account = event["account"].as_str().unwrap_or("");
1214 let region = event["region"].as_str().unwrap_or("");
1215 let resources: Vec<String> = event["resources"]
1216 .as_array()
1217 .map(|arr| {
1218 arr.iter()
1219 .filter_map(|v| v.as_str().map(|s| s.to_string()))
1220 .collect()
1221 })
1222 .unwrap_or_default();
1223
1224 let result = matches_pattern(
1225 Some(event_pattern),
1226 source,
1227 detail_type,
1228 &detail,
1229 account,
1230 region,
1231 &resources,
1232 );
1233
1234 Ok(AwsResponse::ok_json(json!({ "Result": result })))
1235 }
1236
1237 fn update_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1240 let body = req.json_body();
1241 validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
1242 validate_optional_string_length(
1243 "kmsKeyIdentifier",
1244 body["KmsKeyIdentifier"].as_str(),
1245 0,
1246 2048,
1247 )?;
1248 let name = body["Name"].as_str().unwrap_or("default");
1249
1250 let mut accounts = self.state.write();
1251 let state = accounts.get_or_create(&req.account_id);
1252 let bus = state.buses.get_mut(name).ok_or_else(|| {
1253 AwsServiceError::aws_error(
1254 StatusCode::BAD_REQUEST,
1255 "ResourceNotFoundException",
1256 format!("Event bus {name} does not exist."),
1257 )
1258 })?;
1259
1260 if let Some(desc) = body["Description"].as_str() {
1261 bus.description = Some(desc.to_string());
1262 }
1263 if let Some(kms) = body["KmsKeyIdentifier"].as_str() {
1264 bus.kms_key_identifier = Some(kms.to_string());
1265 }
1266 if let Some(dlc) = body.get("DeadLetterConfig") {
1267 bus.dead_letter_config = Some(dlc.clone());
1268 }
1269 bus.last_modified_time = Utc::now();
1270
1271 let arn = bus.arn.clone();
1272 let bus_name = bus.name.clone();
1273
1274 Ok(AwsResponse::ok_json(json!({
1275 "Arn": arn,
1276 "Name": bus_name,
1277 })))
1278 }
1279
1280 fn put_events(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1283 let body = req.json_body();
1284 validate_optional_string_length_value("EndpointId", &body["EndpointId"], 1, 50)?;
1288 validate_required("Entries", &body["Entries"])?;
1289 let entries_array = body["Entries"].as_array().ok_or_else(|| {
1290 AwsServiceError::aws_error(
1291 StatusCode::BAD_REQUEST,
1292 "ValidationException",
1293 "Entries must be a list",
1294 )
1295 })?;
1296 if entries_array.is_empty() || entries_array.len() > 10 {
1297 return Err(AwsServiceError::aws_error(
1298 StatusCode::BAD_REQUEST,
1299 "ValidationException",
1300 "Value at 'Entries' failed to satisfy constraint: \
1301 Member must have length between 1 and 10",
1302 ));
1303 }
1304 let entries: Vec<Value> = entries_array.clone();
1305 let entries = &entries;
1306
1307 let mut accounts = self.state.write();
1308 let state = accounts.get_or_create(&req.account_id);
1309 let mut result_entries = Vec::new();
1310 let mut events_to_deliver = Vec::new();
1311 let mut failed_count = 0;
1312
1313 for entry in entries {
1314 let source = entry["Source"].as_str().unwrap_or("").to_string();
1315 let detail_type = entry["DetailType"].as_str().unwrap_or("").to_string();
1316 let detail = entry["Detail"].as_str().unwrap_or("").to_string();
1317
1318 if let Err(error) = validate_put_events_entry(&source, &detail_type, &detail) {
1319 failed_count += 1;
1320 result_entries.push(error);
1321 continue;
1322 }
1323
1324 let event_id = uuid::Uuid::new_v4().to_string();
1325 let raw_bus = entry["EventBusName"]
1326 .as_str()
1327 .unwrap_or("default")
1328 .to_string();
1329 let event_bus_name = state.resolve_bus_name(&raw_bus);
1330
1331 let caller_account = req
1337 .principal
1338 .as_ref()
1339 .map(|p| p.account_id.as_str())
1340 .unwrap_or(req.account_id.as_str());
1341 if caller_account != req.account_id {
1342 let bus_policy_value = state
1343 .buses
1344 .get(&event_bus_name)
1345 .and_then(|b| b.policy.clone());
1346 if let Some(policy_value) = bus_policy_value {
1347 let policy_json = serde_json::to_string(&policy_value).unwrap_or_default();
1348 let policy_doc = fakecloud_iam::evaluator::PolicyDocument::parse(&policy_json);
1349 let bus_arn = state
1350 .buses
1351 .get(&event_bus_name)
1352 .map(|b| b.arn.clone())
1353 .unwrap_or_default();
1354 let principal =
1355 req.principal
1356 .clone()
1357 .unwrap_or_else(|| fakecloud_core::auth::Principal {
1358 arn: Arn::global("iam", caller_account, "root").to_string(),
1359 user_id: caller_account.to_string(),
1360 account_id: caller_account.to_string(),
1361 principal_type: fakecloud_core::auth::PrincipalType::Root,
1362 source_identity: None,
1363 tags: None,
1364 });
1365 let context = fakecloud_iam::evaluator::RequestContext {
1366 aws_principal_arn: Some(principal.arn.clone()),
1367 aws_principal_account: Some(principal.account_id.clone()),
1368 ..Default::default()
1369 };
1370 let eval_req = fakecloud_iam::evaluator::EvalRequest {
1371 principal: &principal,
1372 action: "events:PutEvents".to_string(),
1373 resource: bus_arn,
1374 context,
1375 };
1376 let decision = fakecloud_iam::evaluator::evaluate_resource_policy_only(
1377 &policy_doc,
1378 &eval_req,
1379 );
1380 if !matches!(decision, fakecloud_iam::evaluator::Decision::Allow) {
1381 failed_count += 1;
1382 result_entries.push(json!({
1383 "ErrorCode": "AccessDeniedException",
1384 "ErrorMessage": format!(
1385 "User '{}' is not authorized to put events on event bus '{}'",
1386 principal.arn, event_bus_name
1387 ),
1388 }));
1389 continue;
1390 }
1391 }
1392 }
1393
1394 let time = parse_put_events_time(&entry["Time"]);
1395 let resources: Vec<String> = entry["Resources"]
1396 .as_array()
1397 .map(|arr| {
1398 arr.iter()
1399 .filter_map(|v| v.as_str().map(|s| s.to_string()))
1400 .collect()
1401 })
1402 .unwrap_or_default();
1403
1404 let event = PutEvent {
1405 event_id: event_id.clone(),
1406 source: source.clone(),
1407 detail_type: detail_type.clone(),
1408 detail: detail.clone(),
1409 event_bus_name: event_bus_name.clone(),
1410 time,
1411 resources: resources.clone(),
1412 };
1413
1414 archive_matching_event(
1415 state,
1416 &event,
1417 &event_bus_name,
1418 &source,
1419 &detail_type,
1420 &detail,
1421 &req.account_id,
1422 &req.region,
1423 &resources,
1424 );
1425
1426 state.events.push(event);
1427
1428 let matching_targets: Vec<EventTarget> = state
1430 .rules
1431 .values()
1432 .filter(|r| {
1433 r.event_bus_name == event_bus_name
1434 && r.state == "ENABLED"
1435 && matches_pattern(
1436 r.event_pattern.as_deref(),
1437 &source,
1438 &detail_type,
1439 &detail,
1440 &req.account_id,
1441 &req.region,
1442 &resources,
1443 )
1444 })
1445 .flat_map(|r| r.targets.clone())
1446 .collect();
1447
1448 if !matching_targets.is_empty() {
1449 events_to_deliver.push((
1450 event_id.clone(),
1451 source,
1452 detail_type,
1453 detail,
1454 time,
1455 resources,
1456 matching_targets,
1457 ));
1458 }
1459
1460 result_entries.push(json!({ "EventId": event_id }));
1461 }
1462
1463 drop(accounts);
1465
1466 for (event_id, source, detail_type, detail, time, resources, targets) in events_to_deliver {
1471 let detail_value: Value = serde_json::from_str(&detail).unwrap_or(json!({}));
1472 let event_json = json!({
1473 "version": "0",
1474 "id": event_id,
1475 "source": source,
1476 "account": req.account_id,
1477 "detail-type": detail_type,
1478 "detail": detail_value,
1479 "time": time.format("%Y-%m-%dT%H:%M:%SZ").to_string(),
1480 "region": req.region,
1481 "resources": resources,
1482 });
1483
1484 let ctx = EventDispatchContext {
1485 state: &self.state,
1486 delivery: &self.delivery,
1487 lambda_state: self.lambda_state.as_ref(),
1488 logs_state: self.logs_state.as_ref(),
1489 container_runtime: &self.container_runtime,
1490 account_id: &req.account_id,
1491 region: &req.region,
1492 };
1493 for target in targets {
1494 dispatch_event_target(&ctx, &target, &event_json, &event_id, &detail_type);
1495 }
1496 }
1497
1498 let resp = json!({
1499 "FailedEntryCount": failed_count,
1500 "Entries": result_entries,
1501 });
1502
1503 Ok(AwsResponse::ok_json(resp))
1504 }
1505
1506 fn tag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1509 let body = req.json_body();
1510 validate_required("ResourceARN", &body["ResourceARN"])?;
1511 let arn = body["ResourceARN"]
1512 .as_str()
1513 .ok_or_else(|| missing("ResourceARN"))?;
1514 validate_string_length("resourceARN", arn, 1, 1600)?;
1515 validate_required("Tags", &body["Tags"])?;
1516
1517 let mut accounts = self.state.write();
1518 let state = accounts.get_or_create(&req.account_id);
1519 let tag_map = find_tags_mut(state, arn)?;
1520
1521 fakecloud_core::tags::apply_tags(tag_map, &body, "Tags", "Key", "Value").map_err(|f| {
1522 AwsServiceError::aws_error(
1523 StatusCode::BAD_REQUEST,
1524 "ValidationException",
1525 format!("{f} must be a list"),
1526 )
1527 })?;
1528
1529 Ok(AwsResponse::ok_json(json!({})))
1530 }
1531
1532 fn untag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1533 let body = req.json_body();
1534 validate_required("ResourceARN", &body["ResourceARN"])?;
1535 let arn = body["ResourceARN"]
1536 .as_str()
1537 .ok_or_else(|| missing("ResourceARN"))?;
1538 validate_string_length("resourceARN", arn, 1, 1600)?;
1539 validate_required("TagKeys", &body["TagKeys"])?;
1540
1541 let mut accounts = self.state.write();
1542 let state = accounts.get_or_create(&req.account_id);
1543 let tag_map = find_tags_mut(state, arn)?;
1544
1545 fakecloud_core::tags::remove_tags(tag_map, &body, "TagKeys").map_err(|f| {
1546 AwsServiceError::aws_error(
1547 StatusCode::BAD_REQUEST,
1548 "ValidationException",
1549 format!("{f} must be a list"),
1550 )
1551 })?;
1552
1553 Ok(AwsResponse::ok_json(json!({})))
1554 }
1555
1556 fn list_tags_for_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1557 let body = req.json_body();
1558 validate_required("ResourceARN", &body["ResourceARN"])?;
1559 let arn = body["ResourceARN"]
1560 .as_str()
1561 .ok_or_else(|| missing("ResourceARN"))?;
1562 validate_string_length("resourceARN", arn, 1, 1600)?;
1563
1564 let accounts = self.state.read();
1565 let empty = EventBridgeState::new(&req.account_id, &req.region);
1566 let state = accounts.get(&req.account_id).unwrap_or(&empty);
1567 let tag_map = find_tags(state, arn)?;
1568
1569 let tags = fakecloud_core::tags::tags_to_json(tag_map, "Key", "Value");
1570
1571 Ok(AwsResponse::ok_json(json!({ "Tags": tags })))
1572 }
1573
1574 }
1576
1577struct StartReplayInput {
1587 name: String,
1588 description: Option<String>,
1589 event_source_arn: String,
1590 destination: Value,
1591 destination_arn: String,
1592 event_start_time: DateTime<Utc>,
1593 event_end_time: DateTime<Utc>,
1594}
1595
1596impl StartReplayInput {
1597 fn from_body(body: &Value) -> Result<Self, AwsServiceError> {
1598 let name = body["ReplayName"].as_str().unwrap_or("").to_string();
1606 let description = body["Description"].as_str().map(|s| s.to_string());
1607 let event_source_arn = body["EventSourceArn"].as_str().unwrap_or("").to_string();
1608 let destination = body["Destination"].clone();
1609
1610 let event_start_time = body["EventStartTime"]
1611 .as_f64()
1612 .and_then(|f| DateTime::from_timestamp(f as i64, 0))
1613 .unwrap_or_else(Utc::now);
1614 let event_end_time = body["EventEndTime"]
1615 .as_f64()
1616 .and_then(|f| DateTime::from_timestamp(f as i64, 0))
1617 .unwrap_or_else(Utc::now);
1618
1619 let destination_arn = destination["Arn"].as_str().unwrap_or("").to_string();
1620 if !destination_arn.contains(":event-bus/") {
1621 return Err(AwsServiceError::aws_error(
1625 StatusCode::BAD_REQUEST,
1626 "ResourceNotFoundException",
1627 format!("Destination.Arn {destination_arn} does not point to an event bus."),
1628 ));
1629 }
1630
1631 Ok(Self {
1632 name,
1633 description,
1634 event_source_arn,
1635 destination,
1636 destination_arn,
1637 event_start_time,
1638 event_end_time,
1639 })
1640 }
1641}
1642
1643#[path = "service_archives_replays.rs"]
1644mod service_archives_replays;
1645#[path = "service_connections_apidests.rs"]
1646mod service_connections_apidests;
1647#[path = "service_endpoints.rs"]
1648mod service_endpoints;
1649#[path = "service_partner_sources.rs"]
1650mod service_partner_sources;
1651
1652#[path = "helpers.rs"]
1653pub(crate) mod helpers;
1654pub(crate) use helpers::*;
1655
1656#[cfg(test)]
1657#[path = "service_tests.rs"]
1658mod tests;
1659
1660#[cfg(test)]
1661mod pagination_reject_test {
1662 #[test]
1663 fn paginate_checked_rejects_invalid_token() {
1664 use fakecloud_core::pagination::paginate_checked;
1665 let items: Vec<i32> = (0..5).collect();
1666 assert!(paginate_checked(&items, Some("bad"), 3).is_err());
1667 assert!(paginate_checked(&items, Some("2"), 3).is_ok());
1668 }
1669}