1use async_trait::async_trait;
2use chrono::{DateTime, Utc};
3use http::StatusCode;
4use serde_json::{json, Value};
5
6use std::collections::{BTreeMap, HashMap};
7use std::sync::Arc;
8
9use tokio::sync::Mutex as AsyncMutex;
10
11use fakecloud_aws::arn::Arn;
12use fakecloud_core::delivery::DeliveryBus;
13use fakecloud_core::pagination::paginate;
14use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
15use fakecloud_core::validation::*;
16use fakecloud_persistence::SnapshotStore;
17
18use fakecloud_lambda::runtime::ContainerRuntime;
19use fakecloud_lambda::{LambdaInvocation, SharedLambdaState};
20use fakecloud_logs::SharedLogsState;
21
22use crate::state::{
23 ApiDestination, Archive, Connection, Endpoint, EventBridgeSnapshot, EventBridgeState, EventBus,
24 EventRule, EventTarget, PartnerEventSource, PutEvent, Replay, SharedEventBridgeState,
25 EVENTBRIDGE_SNAPSHOT_SCHEMA_VERSION,
26};
27
28pub struct EventBridgeService {
29 state: SharedEventBridgeState,
30 delivery: Arc<DeliveryBus>,
31 lambda_state: Option<SharedLambdaState>,
32 logs_state: Option<SharedLogsState>,
33 container_runtime: Option<Arc<ContainerRuntime>>,
34 snapshot_store: Option<Arc<dyn SnapshotStore>>,
35 snapshot_lock: Arc<AsyncMutex<()>>,
36}
37
38impl EventBridgeService {
39 pub fn new(state: SharedEventBridgeState, delivery: Arc<DeliveryBus>) -> Self {
40 Self {
41 state,
42 delivery,
43 lambda_state: None,
44 logs_state: None,
45 container_runtime: None,
46 snapshot_store: None,
47 snapshot_lock: Arc::new(AsyncMutex::new(())),
48 }
49 }
50
51 pub fn with_lambda(mut self, lambda_state: SharedLambdaState) -> Self {
52 self.lambda_state = Some(lambda_state);
53 self
54 }
55
56 pub fn with_logs(mut self, logs_state: SharedLogsState) -> Self {
57 self.logs_state = Some(logs_state);
58 self
59 }
60
61 pub fn with_runtime(mut self, runtime: Arc<ContainerRuntime>) -> Self {
62 self.container_runtime = Some(runtime);
63 self
64 }
65
66 pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
67 self.snapshot_store = Some(store);
68 self
69 }
70
71 async fn save_snapshot(&self) {
75 save_eventbridge_snapshot(
76 &self.state,
77 self.snapshot_store.clone(),
78 &self.snapshot_lock,
79 )
80 .await;
81 }
82
83 pub fn snapshot_hook(&self) -> Option<fakecloud_persistence::SnapshotHook> {
88 let store = self.snapshot_store.clone()?;
89 let state = self.state.clone();
90 let lock = self.snapshot_lock.clone();
91 Some(Arc::new(move || {
92 let state = state.clone();
93 let store = store.clone();
94 let lock = lock.clone();
95 Box::pin(async move {
96 save_eventbridge_snapshot(&state, Some(store), &lock).await;
97 })
98 }))
99 }
100}
101
102pub async fn save_eventbridge_snapshot(
108 state: &SharedEventBridgeState,
109 store: Option<Arc<dyn SnapshotStore>>,
110 lock: &AsyncMutex<()>,
111) {
112 let Some(store) = store else {
113 return;
114 };
115 let _guard = lock.lock().await;
116 let snapshot = EventBridgeSnapshot {
117 schema_version: EVENTBRIDGE_SNAPSHOT_SCHEMA_VERSION,
118 accounts: Some(state.read().clone()),
119 state: None,
120 };
121 let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
122 let bytes = serde_json::to_vec(&snapshot)
123 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
124 store.save(&bytes)
125 })
126 .await;
127 match join {
128 Ok(Ok(())) => {}
129 Ok(Err(err)) => tracing::error!(%err, "failed to write eventbridge snapshot"),
130 Err(err) => tracing::error!(%err, "eventbridge snapshot task panicked"),
131 }
132}
133
134#[async_trait]
135impl AwsService for EventBridgeService {
136 fn service_name(&self) -> &str {
137 "events"
138 }
139
140 async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
141 let mutates = is_mutating_action(req.action.as_str());
142 let result = match req.action.as_str() {
143 "CreateEventBus" => self.create_event_bus(&req),
144 "DeleteEventBus" => self.delete_event_bus(&req),
145 "ListEventBuses" => self.list_event_buses(&req),
146 "DescribeEventBus" => self.describe_event_bus(&req),
147 "PutRule" => self.put_rule(&req),
148 "DeleteRule" => self.delete_rule(&req),
149 "ListRules" => self.list_rules(&req),
150 "DescribeRule" => self.describe_rule(&req),
151 "EnableRule" => self.enable_rule(&req),
152 "DisableRule" => self.disable_rule(&req),
153 "PutTargets" => self.put_targets(&req),
154 "RemoveTargets" => self.remove_targets(&req),
155 "ListTargetsByRule" => self.list_targets_by_rule(&req),
156 "ListRuleNamesByTarget" => self.list_rule_names_by_target(&req),
157 "PutEvents" => self.put_events(&req),
158 "PutPermission" => self.put_permission(&req),
159 "RemovePermission" => self.remove_permission(&req),
160 "TagResource" => self.tag_resource(&req),
161 "UntagResource" => self.untag_resource(&req),
162 "ListTagsForResource" => self.list_tags_for_resource(&req),
163 "CreateArchive" => self.create_archive(&req),
164 "DescribeArchive" => self.describe_archive(&req),
165 "ListArchives" => self.list_archives(&req),
166 "UpdateArchive" => self.update_archive(&req),
167 "DeleteArchive" => self.delete_archive(&req),
168 "CreateConnection" => self.create_connection(&req),
169 "DescribeConnection" => self.describe_connection(&req),
170 "ListConnections" => self.list_connections(&req),
171 "UpdateConnection" => self.update_connection(&req),
172 "DeleteConnection" => self.delete_connection(&req),
173 "CreateApiDestination" => self.create_api_destination(&req),
174 "DescribeApiDestination" => self.describe_api_destination(&req),
175 "ListApiDestinations" => self.list_api_destinations(&req),
176 "UpdateApiDestination" => self.update_api_destination(&req),
177 "DeleteApiDestination" => self.delete_api_destination(&req),
178 "StartReplay" => self.start_replay(&req),
179 "DescribeReplay" => self.describe_replay(&req),
180 "ListReplays" => self.list_replays(&req),
181 "CancelReplay" => self.cancel_replay(&req),
182 "CreatePartnerEventSource" => self.create_partner_event_source(&req),
183 "DeletePartnerEventSource" => self.delete_partner_event_source(&req),
184 "DescribePartnerEventSource" => self.describe_partner_event_source(&req),
185 "ListPartnerEventSources" => self.list_partner_event_sources(&req),
186 "ListPartnerEventSourceAccounts" => self.list_partner_event_source_accounts(&req),
187 "ActivateEventSource" => self.activate_event_source(&req),
188 "DeactivateEventSource" => self.deactivate_event_source(&req),
189 "DescribeEventSource" => self.describe_event_source(&req),
190 "ListEventSources" => self.list_event_sources(&req),
191 "PutPartnerEvents" => self.put_partner_events(&req),
192 "TestEventPattern" => self.test_event_pattern(&req),
193 "UpdateEventBus" => self.update_event_bus(&req),
194 "CreateEndpoint" => self.create_endpoint(&req),
195 "DeleteEndpoint" => self.delete_endpoint(&req),
196 "DescribeEndpoint" => self.describe_endpoint(&req),
197 "ListEndpoints" => self.list_endpoints(&req),
198 "UpdateEndpoint" => self.update_endpoint(&req),
199 "DeauthorizeConnection" => self.deauthorize_connection(&req),
200 _ => Err(AwsServiceError::action_not_implemented(
201 "events",
202 &req.action,
203 )),
204 };
205 if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
206 self.save_snapshot().await;
207 }
208 result
209 }
210
211 fn supported_actions(&self) -> &[&str] {
212 &[
213 "CreateEventBus",
214 "DeleteEventBus",
215 "ListEventBuses",
216 "DescribeEventBus",
217 "PutRule",
218 "DeleteRule",
219 "ListRules",
220 "DescribeRule",
221 "EnableRule",
222 "DisableRule",
223 "PutTargets",
224 "RemoveTargets",
225 "ListTargetsByRule",
226 "ListRuleNamesByTarget",
227 "PutEvents",
228 "PutPermission",
229 "RemovePermission",
230 "TagResource",
231 "UntagResource",
232 "ListTagsForResource",
233 "CreateArchive",
234 "DescribeArchive",
235 "ListArchives",
236 "UpdateArchive",
237 "DeleteArchive",
238 "CreateConnection",
239 "DescribeConnection",
240 "ListConnections",
241 "UpdateConnection",
242 "DeleteConnection",
243 "CreateApiDestination",
244 "DescribeApiDestination",
245 "ListApiDestinations",
246 "UpdateApiDestination",
247 "DeleteApiDestination",
248 "StartReplay",
249 "DescribeReplay",
250 "ListReplays",
251 "CancelReplay",
252 "CreatePartnerEventSource",
253 "DeletePartnerEventSource",
254 "DescribePartnerEventSource",
255 "ListPartnerEventSources",
256 "ListPartnerEventSourceAccounts",
257 "ActivateEventSource",
258 "DeactivateEventSource",
259 "DescribeEventSource",
260 "ListEventSources",
261 "PutPartnerEvents",
262 "TestEventPattern",
263 "UpdateEventBus",
264 "CreateEndpoint",
265 "DeleteEndpoint",
266 "DescribeEndpoint",
267 "ListEndpoints",
268 "UpdateEndpoint",
269 "DeauthorizeConnection",
270 ]
271 }
272}
273
274impl EventBridgeService {
276 fn create_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
277 let body = req.json_body();
278 validate_required("Name", &body["Name"])?;
279 let name = body["Name"]
280 .as_str()
281 .ok_or_else(|| missing("Name"))?
282 .to_string();
283 validate_string_length("name", &name, 1, 256)?;
284 validate_optional_string_length(
285 "eventSourceName",
286 body["EventSourceName"].as_str(),
287 1,
288 256,
289 )?;
290 validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
291 validate_optional_string_length(
292 "kmsKeyIdentifier",
293 body["KmsKeyIdentifier"].as_str(),
294 0,
295 2048,
296 )?;
297
298 if name.contains('/') && !name.starts_with("aws.partner/") {
300 return Err(AwsServiceError::aws_error(
301 StatusCode::BAD_REQUEST,
302 "ValidationException",
303 "Event bus name must not contain '/'.",
304 ));
305 }
306
307 if name.starts_with("aws.partner/") {
309 let event_source = body["EventSourceName"].as_str().unwrap_or("");
310 let accounts_r = self.state.read();
311 let empty_r = EventBridgeState::new(&req.account_id, &req.region);
312 let state_r = accounts_r.get(&req.account_id).unwrap_or(&empty_r);
313 let has_source = state_r.partner_event_sources.contains_key(event_source);
314 drop(accounts_r);
315 if !has_source {
316 return Err(AwsServiceError::aws_error(
317 StatusCode::BAD_REQUEST,
318 "ResourceNotFoundException",
319 format!("Event source {event_source} does not exist."),
320 ));
321 }
322 }
323
324 let mut accounts = self.state.write();
325 let state = accounts.get_or_create(&req.account_id);
326
327 if state.buses.contains_key(&name) {
328 return Err(AwsServiceError::aws_error(
329 StatusCode::BAD_REQUEST,
330 "ResourceAlreadyExistsException",
331 format!("Event bus {name} already exists."),
332 ));
333 }
334
335 let arn = format!(
336 "arn:aws:events:{}:{}:event-bus/{}",
337 req.region, state.account_id, name
338 );
339 let now = Utc::now();
340 let description = body["Description"].as_str().map(|s| s.to_string());
341 let kms_key_identifier = body["KmsKeyIdentifier"].as_str().map(|s| s.to_string());
342 let dead_letter_config = body.get("DeadLetterConfig").cloned();
343
344 let tags = parse_tags(&body);
345
346 let bus = EventBus {
347 name: name.clone(),
348 arn: arn.clone(),
349 tags,
350 policy: None,
351 description,
352 kms_key_identifier,
353 dead_letter_config,
354 creation_time: now,
355 last_modified_time: now,
356 };
357 state.buses.insert(name, bus);
358
359 Ok(AwsResponse::ok_json(json!({ "EventBusArn": arn })))
360 }
361
362 fn delete_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
363 let body = req.json_body();
364 validate_required("Name", &body["Name"])?;
365 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
366 validate_string_length("name", name, 1, 256)?;
367
368 if name == "default" {
369 return Err(AwsServiceError::aws_error(
370 StatusCode::BAD_REQUEST,
371 "ValidationException",
372 format!("Cannot delete event bus {name}."),
373 ));
374 }
375
376 let mut accounts = self.state.write();
377 let state = accounts.get_or_create(&req.account_id);
378 state.buses.remove(name);
379 state.rules.retain(|k, _| k.0 != name);
380
381 Ok(AwsResponse::ok_json(json!({})))
382 }
383
384 fn list_event_buses(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
385 let body = req.json_body();
386 validate_optional_string_length_value("NamePrefix", &body["NamePrefix"], 1, 256)?;
394 validate_optional_string_length_value("NextToken", &body["NextToken"], 1, 2048)?;
395 validate_optional_json_range("Limit", &body["Limit"], 1, 100)?;
396 let name_prefix = body["NamePrefix"].as_str();
397 let limit = body["Limit"].as_i64().unwrap_or(100).clamp(1, 100) as usize;
398
399 let accounts = self.state.read();
400 let empty = EventBridgeState::new(&req.account_id, &req.region);
401 let state = accounts.get(&req.account_id).unwrap_or(&empty);
402 let filtered: Vec<&_> = state
403 .buses
404 .values()
405 .filter(|b| match name_prefix {
406 Some(prefix) => b.name.starts_with(prefix),
407 None => true,
408 })
409 .collect();
410
411 let (page, next_token) = paginate(&filtered, body["NextToken"].as_str(), limit);
412 let buses: Vec<Value> = page
413 .iter()
414 .map(|b| {
415 json!({
419 "Name": b.name,
420 "Arn": b.arn,
421 "CreationTime": b.creation_time.timestamp() as f64,
422 "LastModifiedTime": b.last_modified_time.timestamp() as f64,
423 })
424 })
425 .collect();
426 let mut resp = json!({ "EventBuses": buses });
427 if let Some(token) = next_token {
428 resp["NextToken"] = json!(token);
429 }
430
431 Ok(AwsResponse::ok_json(resp))
432 }
433
434 fn describe_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
435 let body = req.json_body();
436 validate_optional_string_length("name", body["Name"].as_str(), 1, 1600)?;
437 let name = body["Name"].as_str().unwrap_or("default");
438
439 let accounts = self.state.read();
440 let empty = EventBridgeState::new(&req.account_id, &req.region);
441 let state = accounts.get(&req.account_id).unwrap_or(&empty);
442 let bus = state.buses.get(name).ok_or_else(|| {
443 AwsServiceError::aws_error(
444 StatusCode::BAD_REQUEST,
445 "ResourceNotFoundException",
446 format!("Event bus {name} does not exist."),
447 )
448 })?;
449
450 let mut resp = json!({
451 "Name": bus.name,
452 "Arn": bus.arn,
453 "CreationTime": bus.creation_time.timestamp() as f64,
454 "LastModifiedTime": bus.last_modified_time.timestamp() as f64,
455 });
456
457 if let Some(ref policy) = bus.policy {
458 resp["Policy"] = Value::String(serde_json::to_string(policy).unwrap());
459 }
460 if let Some(ref desc) = bus.description {
461 resp["Description"] = json!(desc);
462 }
463 if let Some(ref kms) = bus.kms_key_identifier {
464 resp["KmsKeyIdentifier"] = json!(kms);
465 }
466 if let Some(ref dlc) = bus.dead_letter_config {
467 resp["DeadLetterConfig"] = dlc.clone();
468 }
469
470 Ok(AwsResponse::ok_json(resp))
471 }
472
473 fn put_permission(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
476 let body = req.json_body();
477 validate_optional_string_length_value("EventBusName", &body["EventBusName"], 1, 256)?;
484 validate_optional_string_length_value("Action", &body["Action"], 1, 64)?;
485 validate_optional_string_length_value("Principal", &body["Principal"], 1, 12)?;
486 validate_optional_string_length_value("StatementId", &body["StatementId"], 1, 64)?;
487 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
488
489 let mut accounts = self.state.write();
490 let state = accounts.get_or_create(&req.account_id);
491
492 let bus = state.buses.get_mut(event_bus_name).ok_or_else(|| {
493 AwsServiceError::aws_error(
494 StatusCode::BAD_REQUEST,
495 "ResourceNotFoundException",
496 format!("Event bus {event_bus_name} does not exist."),
497 )
498 })?;
499
500 if let Some(policy_str) = body["Policy"].as_str() {
502 if let Ok(policy) = serde_json::from_str::<Value>(policy_str) {
503 bus.policy = Some(policy);
504 return Ok(AwsResponse::ok_json(json!({})));
505 }
506 }
507
508 let action = body["Action"].as_str().unwrap_or("");
515 let principal = body["Principal"].as_str().unwrap_or("");
516 let statement_id = body["StatementId"].as_str().unwrap_or("");
517
518 let principal_value = if principal == "*" {
527 json!("*")
528 } else {
529 json!({ "AWS": Arn::global("iam", principal, "root").to_string() })
530 };
531 let statement = json!({
532 "Sid": statement_id,
533 "Effect": "Allow",
534 "Principal": principal_value,
535 "Action": action,
536 "Resource": bus.arn,
537 });
538
539 let policy = bus.policy.get_or_insert_with(|| {
540 json!({
541 "Version": "2012-10-17",
542 "Statement": [],
543 })
544 });
545
546 if let Some(stmts) = policy["Statement"].as_array_mut() {
547 stmts.retain(|s| s["Sid"].as_str() != Some(statement_id));
551 stmts.push(statement);
552 }
553
554 Ok(AwsResponse::ok_json(json!({})))
555 }
556
557 fn remove_permission(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
558 let body = req.json_body();
559 validate_optional_string_length("statementId", body["StatementId"].as_str(), 1, 64)?;
560 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 256)?;
561 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
562 let statement_id = body["StatementId"].as_str().unwrap_or("");
563 let remove_all = body["RemoveAllPermissions"].as_bool().unwrap_or(false);
564
565 let mut accounts = self.state.write();
566 let state = accounts.get_or_create(&req.account_id);
567
568 let bus = state.buses.get_mut(event_bus_name).ok_or_else(|| {
569 AwsServiceError::aws_error(
570 StatusCode::BAD_REQUEST,
571 "ResourceNotFoundException",
572 format!("Event bus {event_bus_name} does not exist."),
573 )
574 })?;
575
576 if remove_all {
577 bus.policy = None;
578 return Ok(AwsResponse::ok_json(json!({})));
579 }
580
581 let policy = bus.policy.as_mut().ok_or_else(|| {
582 AwsServiceError::aws_error(
583 StatusCode::BAD_REQUEST,
584 "ResourceNotFoundException",
585 "EventBus does not have a policy.",
586 )
587 })?;
588
589 if let Some(stmts) = policy["Statement"].as_array_mut() {
590 let before = stmts.len();
591 stmts.retain(|s| s["Sid"].as_str() != Some(statement_id));
592 if stmts.len() == before {
593 return Err(AwsServiceError::aws_error(
594 StatusCode::BAD_REQUEST,
595 "ResourceNotFoundException",
596 "Statement with the provided id does not exist.",
597 ));
598 }
599 if stmts.is_empty() {
600 bus.policy = None;
601 }
602 }
603
604 Ok(AwsResponse::ok_json(json!({})))
605 }
606
607 fn put_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
610 let body = req.json_body();
611 validate_required("Name", &body["Name"])?;
622 let name = body["Name"]
623 .as_str()
624 .ok_or_else(|| missing("Name"))?
625 .to_string();
626 validate_string_length("Name", &name, 1, 64)?;
627 validate_optional_string_length_value(
628 "ScheduleExpression",
629 &body["ScheduleExpression"],
630 0,
631 256,
632 )?;
633 validate_optional_string_length_value("EventPattern", &body["EventPattern"], 0, 4096)?;
634 validate_optional_enum_value(
635 "State",
636 &body["State"],
637 &[
638 "ENABLED",
639 "DISABLED",
640 "ENABLED_WITH_ALL_CLOUDTRAIL_MANAGEMENT_EVENTS",
641 ],
642 )?;
643 validate_optional_string_length_value("Description", &body["Description"], 0, 512)?;
644 validate_optional_string_length_value("RoleArn", &body["RoleArn"], 1, 1600)?;
645 validate_optional_string_length_value("EventBusName", &body["EventBusName"], 1, 1600)?;
646
647 let raw_bus = body["EventBusName"]
648 .as_str()
649 .unwrap_or("default")
650 .to_string();
651
652 let mut accounts = self.state.write();
653 let state = accounts.get_or_create(&req.account_id);
654 let event_bus_name = state.resolve_bus_name(&raw_bus);
655
656 let event_pattern = body["EventPattern"].as_str().and_then(|s| {
657 if s.is_empty() {
658 None
659 } else {
660 Some(s.to_string())
661 }
662 });
663 let schedule_expression = body["ScheduleExpression"].as_str().and_then(|s| {
664 if s.is_empty() {
665 None
666 } else {
667 Some(s.to_string())
668 }
669 });
670 let description = body["Description"].as_str().map(|s| s.to_string());
671 let role_arn = body["RoleArn"].as_str().map(|s| s.to_string());
672 let rule_state = body["State"].as_str().unwrap_or("ENABLED").to_string();
673
674 if !state.buses.contains_key(&event_bus_name) {
680 return Err(AwsServiceError::aws_error(
681 StatusCode::BAD_REQUEST,
682 "ResourceNotFoundException",
683 format!("Event bus {event_bus_name} does not exist."),
684 ));
685 }
686
687 let arn = if event_bus_name == "default" {
688 format!(
689 "arn:aws:events:{}:{}:rule/{}",
690 req.region, state.account_id, name
691 )
692 } else {
693 format!(
694 "arn:aws:events:{}:{}:rule/{}/{}",
695 req.region, state.account_id, event_bus_name, name
696 )
697 };
698
699 let key = (event_bus_name.clone(), name.clone());
700 let targets = state
701 .rules
702 .get(&key)
703 .map(|r| r.targets.clone())
704 .unwrap_or_default();
705
706 let tags = parse_tags(&body);
707
708 let rule = EventRule {
709 name: name.clone(),
710 arn: arn.clone(),
711 event_bus_name,
712 event_pattern,
713 schedule_expression,
714 state: rule_state,
715 description,
716 role_arn,
717 managed_by: None,
718 created_by: None,
719 targets,
720 tags,
721 last_fired: None,
722 };
723
724 state.rules.insert(key, rule);
725 Ok(AwsResponse::ok_json(json!({ "RuleArn": arn })))
726 }
727
728 fn delete_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
729 let body = req.json_body();
730 validate_required("Name", &body["Name"])?;
731 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
732 validate_string_length("name", name, 1, 64)?;
733 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
734 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
735
736 let mut accounts = self.state.write();
737 let state = accounts.get_or_create(&req.account_id);
738 let bus_name = state.resolve_bus_name(event_bus_name);
739 let key = (bus_name, name.to_string());
740
741 if let Some(rule) = state.rules.get(&key) {
743 if !rule.targets.is_empty() {
744 return Err(AwsServiceError::aws_error(
745 StatusCode::BAD_REQUEST,
746 "ValidationException",
747 "Rule can't be deleted since it has targets.",
748 ));
749 }
750 }
751
752 state.rules.remove(&key);
753 Ok(AwsResponse::ok_json(json!({})))
754 }
755
756 fn list_rules(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
757 let body = req.json_body();
758 validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 64)?;
759 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
760 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
761 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
762 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
763 let name_prefix = body["NamePrefix"].as_str();
764 let limit = body["Limit"].as_u64().map(|n| n as usize);
765 let next_token = body["NextToken"].as_str();
766
767 let accounts = self.state.read();
768 let empty = EventBridgeState::new(&req.account_id, &req.region);
769 let state = accounts.get(&req.account_id).unwrap_or(&empty);
770 let bus_name = state.resolve_bus_name(event_bus_name);
771
772 let mut rules: Vec<&EventRule> = state
773 .rules
774 .values()
775 .filter(|r| r.event_bus_name == bus_name)
776 .filter(|r| match name_prefix {
777 Some(prefix) => r.name.starts_with(prefix),
778 None => true,
779 })
780 .collect();
781 rules.sort_by(|a, b| a.name.cmp(&b.name));
782
783 let start = next_token
785 .and_then(|t| t.parse::<usize>().ok())
786 .unwrap_or(0)
787 .min(rules.len());
788 let rules_slice = &rules[start..];
789
790 let (page, new_next_token) = if let Some(lim) = limit {
791 if rules_slice.len() > lim {
792 (&rules_slice[..lim], Some((start + lim).to_string()))
793 } else {
794 (rules_slice, None)
795 }
796 } else {
797 (rules_slice, None)
798 };
799
800 let rules_json: Vec<Value> = page
801 .iter()
802 .map(|r| {
803 let mut obj = json!({
804 "Name": r.name,
805 "Arn": r.arn,
806 "EventBusName": r.event_bus_name,
807 "State": r.state,
808 });
809 if let Some(ref desc) = r.description {
810 obj["Description"] = json!(desc);
811 }
812 if let Some(ref ep) = r.event_pattern {
813 obj["EventPattern"] = json!(ep);
814 }
815 if let Some(ref se) = r.schedule_expression {
816 obj["ScheduleExpression"] = json!(se);
817 }
818 if let Some(ref role) = r.role_arn {
819 obj["RoleArn"] = json!(role);
820 }
821 if let Some(ref mb) = r.managed_by {
822 obj["ManagedBy"] = json!(mb);
823 }
824 obj
825 })
826 .collect();
827
828 let mut resp = json!({ "Rules": rules_json });
829 if let Some(token) = new_next_token {
830 resp["NextToken"] = json!(token);
831 }
832
833 Ok(AwsResponse::ok_json(resp))
834 }
835
836 fn describe_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
837 let body = req.json_body();
838 validate_required("Name", &body["Name"])?;
839 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
840 validate_string_length("name", name, 1, 64)?;
841 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
842 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
843
844 let accounts = self.state.read();
845 let empty = EventBridgeState::new(&req.account_id, &req.region);
846 let state = accounts.get(&req.account_id).unwrap_or(&empty);
847 let bus_name = state.resolve_bus_name(event_bus_name);
848 let key = (bus_name.clone(), name.to_string());
849
850 let rule = state.rules.get(&key).ok_or_else(|| {
851 AwsServiceError::aws_error(
852 StatusCode::BAD_REQUEST,
853 "ResourceNotFoundException",
854 format!("Rule {name} does not exist."),
855 )
856 })?;
857
858 let mut resp = json!({
859 "Name": rule.name,
860 "Arn": rule.arn,
861 "EventBusName": rule.event_bus_name,
862 "State": rule.state,
863 });
864
865 if let Some(ref desc) = rule.description {
866 resp["Description"] = json!(desc);
867 }
868 if let Some(ref ep) = rule.event_pattern {
869 resp["EventPattern"] = json!(ep);
870 }
871 if let Some(ref se) = rule.schedule_expression {
872 resp["ScheduleExpression"] = json!(se);
873 }
874 if let Some(ref role) = rule.role_arn {
875 resp["RoleArn"] = json!(role);
876 }
877 if let Some(ref mb) = rule.managed_by {
878 resp["ManagedBy"] = json!(mb);
879 }
880 if let Some(ref cb) = rule.created_by {
881 resp["CreatedBy"] = json!(cb);
882 }
883 if rule.event_bus_name != "default" && rule.created_by.is_none() {
885 resp["CreatedBy"] = json!(state.account_id);
886 }
887
888 Ok(AwsResponse::ok_json(resp))
889 }
890
891 fn enable_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
892 let body = req.json_body();
893 validate_required("Name", &body["Name"])?;
894 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
895 validate_string_length("name", name, 1, 64)?;
896 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
897 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
898
899 let mut accounts = self.state.write();
900 let state = accounts.get_or_create(&req.account_id);
901 let bus_name = state.resolve_bus_name(event_bus_name);
902 let key = (bus_name, name.to_string());
903
904 let rule = state.rules.get_mut(&key).ok_or_else(|| {
905 AwsServiceError::aws_error(
906 StatusCode::BAD_REQUEST,
907 "ResourceNotFoundException",
908 format!("Rule {name} does not exist."),
909 )
910 })?;
911
912 rule.state = "ENABLED".to_string();
913 Ok(AwsResponse::ok_json(json!({})))
914 }
915
916 fn disable_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
917 let body = req.json_body();
918 validate_required("Name", &body["Name"])?;
919 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
920 validate_string_length("name", name, 1, 64)?;
921 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
922 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
923
924 let mut accounts = self.state.write();
925 let state = accounts.get_or_create(&req.account_id);
926 let bus_name = state.resolve_bus_name(event_bus_name);
927 let key = (bus_name, name.to_string());
928
929 let rule = state.rules.get_mut(&key).ok_or_else(|| {
930 AwsServiceError::aws_error(
931 StatusCode::BAD_REQUEST,
932 "ResourceNotFoundException",
933 format!("Rule {name} does not exist."),
934 )
935 })?;
936
937 rule.state = "DISABLED".to_string();
938 Ok(AwsResponse::ok_json(json!({})))
939 }
940
941 fn put_targets(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
944 let body = req.json_body();
945 validate_required("Rule", &body["Rule"])?;
952 let rule_name = body["Rule"].as_str().ok_or_else(|| missing("Rule"))?;
953 validate_string_length("Rule", rule_name, 1, 64)?;
954 validate_optional_string_length_value("EventBusName", &body["EventBusName"], 1, 1600)?;
955 validate_required("Targets", &body["Targets"])?;
960 let targets_array = body["Targets"].as_array().ok_or_else(|| {
961 AwsServiceError::aws_error(
962 StatusCode::BAD_REQUEST,
963 "ValidationException",
964 "Targets must be a list",
965 )
966 })?;
967 if targets_array.is_empty() || targets_array.len() > 100 {
968 return Err(AwsServiceError::aws_error(
969 StatusCode::BAD_REQUEST,
970 "ValidationException",
971 "Value at 'Targets' failed to satisfy constraint: \
972 Member must have length between 1 and 100",
973 ));
974 }
975 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
976 let targets: Vec<Value> = targets_array.clone();
977
978 let mut accounts = self.state.write();
979 let state = accounts.get_or_create(&req.account_id);
980 let bus_name = state.resolve_bus_name(event_bus_name);
981 let key = (bus_name.clone(), rule_name.to_string());
982
983 let rule = state.rules.get_mut(&key).ok_or_else(|| {
984 AwsServiceError::aws_error(
985 StatusCode::BAD_REQUEST,
986 "ResourceNotFoundException",
987 format!("Rule {rule_name} does not exist on EventBus {bus_name}."),
988 )
989 })?;
990
991 let mut failed_entries: Vec<Value> = Vec::new();
992 for target in &targets {
993 let target_id = target["Id"].as_str().unwrap_or("").to_string();
994 let target_arn = target["Arn"].as_str().unwrap_or("");
995
996 if target_arn.ends_with(".fifo") && target.get("SqsParameters").is_none() {
997 failed_entries.push(json!({
998 "TargetId": target_id,
999 "ErrorCode": "ValidationException",
1000 "ErrorMessage": format!(
1001 "Parameter(s) SqsParameters must be specified for target: {target_id}."
1002 ),
1003 }));
1004 continue;
1005 }
1006 if !target_arn.starts_with("arn:") {
1007 failed_entries.push(json!({
1008 "TargetId": target_id,
1009 "ErrorCode": "ValidationException",
1010 "ErrorMessage": format!(
1011 "Parameter {target_arn} is not valid. Reason: Provided Arn is not in correct format."
1012 ),
1013 }));
1014 continue;
1015 }
1016
1017 let et = parse_target(target);
1018 rule.targets.retain(|t| t.id != et.id);
1019 rule.targets.push(et);
1020 }
1021
1022 Ok(AwsResponse::ok_json(json!({
1023 "FailedEntryCount": failed_entries.len(),
1024 "FailedEntries": failed_entries,
1025 })))
1026 }
1027
1028 fn remove_targets(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1029 let body = req.json_body();
1030 validate_required("Rule", &body["Rule"])?;
1031 let rule_name = body["Rule"].as_str().ok_or_else(|| missing("Rule"))?;
1032 validate_string_length("rule", rule_name, 1, 64)?;
1033 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
1034 validate_required("Ids", &body["Ids"])?;
1035 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
1036 let ids = body["Ids"].as_array().ok_or_else(|| missing("Ids"))?;
1037
1038 let target_ids: Vec<String> = ids
1039 .iter()
1040 .filter_map(|v| v.as_str().map(|s| s.to_string()))
1041 .collect();
1042
1043 let mut accounts = self.state.write();
1044 let state = accounts.get_or_create(&req.account_id);
1045 let bus_name = state.resolve_bus_name(event_bus_name);
1046 let key = (bus_name.clone(), rule_name.to_string());
1047
1048 let rule = state.rules.get_mut(&key).ok_or_else(|| {
1049 AwsServiceError::aws_error(
1050 StatusCode::BAD_REQUEST,
1051 "ResourceNotFoundException",
1052 format!("Rule {rule_name} does not exist on EventBus {bus_name}."),
1053 )
1054 })?;
1055
1056 rule.targets.retain(|t| !target_ids.contains(&t.id));
1057
1058 Ok(AwsResponse::ok_json(json!({
1059 "FailedEntryCount": 0,
1060 "FailedEntries": [],
1061 })))
1062 }
1063
1064 fn list_targets_by_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1065 let body = req.json_body();
1066 validate_required("Rule", &body["Rule"])?;
1067 let rule_name = body["Rule"].as_str().ok_or_else(|| missing("Rule"))?;
1068 validate_string_length("rule", rule_name, 1, 64)?;
1069 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
1070 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1071 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1072 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
1073 let limit = body["Limit"].as_u64().map(|n| n as usize);
1074 let next_token = body["NextToken"].as_str();
1075
1076 let accounts = self.state.read();
1077 let empty = EventBridgeState::new(&req.account_id, &req.region);
1078 let state = accounts.get(&req.account_id).unwrap_or(&empty);
1079 let bus_name = state.resolve_bus_name(event_bus_name);
1080 let key = (bus_name, rule_name.to_string());
1081
1082 let rule = state.rules.get(&key).ok_or_else(|| {
1083 AwsServiceError::aws_error(
1084 StatusCode::BAD_REQUEST,
1085 "ResourceNotFoundException",
1086 format!("Rule {rule_name} does not exist."),
1087 )
1088 })?;
1089
1090 let all_targets = &rule.targets;
1091 let start = next_token
1092 .and_then(|t| t.parse::<usize>().ok())
1093 .unwrap_or(0)
1094 .min(all_targets.len());
1095 let slice = &all_targets[start..];
1096
1097 let (page, new_next_token) = if let Some(lim) = limit {
1098 if slice.len() > lim {
1099 (&slice[..lim], Some((start + lim).to_string()))
1100 } else {
1101 (slice, None)
1102 }
1103 } else {
1104 (slice, None)
1105 };
1106
1107 let targets: Vec<Value> = page.iter().map(target_to_json).collect();
1108
1109 let mut resp = json!({ "Targets": targets });
1110 if let Some(token) = new_next_token {
1111 resp["NextToken"] = json!(token);
1112 }
1113
1114 Ok(AwsResponse::ok_json(resp))
1115 }
1116
1117 fn list_rule_names_by_target(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1118 let body = req.json_body();
1119 validate_required("TargetArn", &body["TargetArn"])?;
1120 let target_arn = body["TargetArn"]
1121 .as_str()
1122 .ok_or_else(|| missing("TargetArn"))?;
1123 validate_string_length("targetArn", target_arn, 1, 1600)?;
1124 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
1125 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1126 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1127 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
1128 let limit = body["Limit"].as_u64().map(|n| n as usize);
1129 let next_token = body["NextToken"].as_str();
1130
1131 let accounts = self.state.read();
1132 let empty = EventBridgeState::new(&req.account_id, &req.region);
1133 let state = accounts.get(&req.account_id).unwrap_or(&empty);
1134 let bus_name = state.resolve_bus_name(event_bus_name);
1135
1136 let mut rule_names: Vec<String> = Vec::new();
1138 for rule in state.rules.values() {
1139 if rule.event_bus_name == bus_name
1140 && rule.targets.iter().any(|t| t.arn == target_arn)
1141 && !rule_names.contains(&rule.name)
1142 {
1143 rule_names.push(rule.name.clone());
1144 }
1145 }
1146 rule_names.sort();
1147
1148 let start = next_token
1149 .and_then(|t| t.parse::<usize>().ok())
1150 .unwrap_or(0)
1151 .min(rule_names.len());
1152 let slice = &rule_names[start..];
1153
1154 let (page, new_next_token) = if let Some(lim) = limit {
1155 if slice.len() > lim {
1156 (&slice[..lim], Some((start + lim).to_string()))
1157 } else {
1158 (slice, None)
1159 }
1160 } else {
1161 (slice, None)
1162 };
1163
1164 let mut resp = json!({ "RuleNames": page });
1165 if let Some(token) = new_next_token {
1166 resp["NextToken"] = json!(token);
1167 }
1168
1169 Ok(AwsResponse::ok_json(resp))
1170 }
1171
1172 fn test_event_pattern(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1175 let body = req.json_body();
1176 validate_required("EventPattern", &body["EventPattern"])?;
1177 validate_required("Event", &body["Event"])?;
1178 let event_pattern = body["EventPattern"]
1179 .as_str()
1180 .ok_or_else(|| missing("EventPattern"))?;
1181 let event_str = body["Event"].as_str().ok_or_else(|| missing("Event"))?;
1182
1183 let event: Value = serde_json::from_str(event_str).map_err(|_| {
1185 AwsServiceError::aws_error(
1186 StatusCode::BAD_REQUEST,
1187 "InvalidEventPatternException",
1188 "Event is not valid JSON.",
1189 )
1190 })?;
1191
1192 let _pattern: Value = serde_json::from_str(event_pattern).map_err(|_| {
1194 AwsServiceError::aws_error(
1195 StatusCode::BAD_REQUEST,
1196 "InvalidEventPatternException",
1197 "Event pattern is not valid JSON.",
1198 )
1199 })?;
1200
1201 let source = event["source"].as_str().unwrap_or("");
1202 let detail_type = event["detail-type"].as_str().unwrap_or("");
1203 let detail = event
1204 .get("detail")
1205 .map(|v| serde_json::to_string(v).unwrap_or_default())
1206 .unwrap_or_else(|| "{}".to_string());
1207 let account = event["account"].as_str().unwrap_or("");
1208 let region = event["region"].as_str().unwrap_or("");
1209 let resources: Vec<String> = event["resources"]
1210 .as_array()
1211 .map(|arr| {
1212 arr.iter()
1213 .filter_map(|v| v.as_str().map(|s| s.to_string()))
1214 .collect()
1215 })
1216 .unwrap_or_default();
1217
1218 let result = matches_pattern(
1219 Some(event_pattern),
1220 source,
1221 detail_type,
1222 &detail,
1223 account,
1224 region,
1225 &resources,
1226 );
1227
1228 Ok(AwsResponse::ok_json(json!({ "Result": result })))
1229 }
1230
1231 fn update_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1234 let body = req.json_body();
1235 validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
1236 validate_optional_string_length(
1237 "kmsKeyIdentifier",
1238 body["KmsKeyIdentifier"].as_str(),
1239 0,
1240 2048,
1241 )?;
1242 let name = body["Name"].as_str().unwrap_or("default");
1243
1244 let mut accounts = self.state.write();
1245 let state = accounts.get_or_create(&req.account_id);
1246 let bus = state.buses.get_mut(name).ok_or_else(|| {
1247 AwsServiceError::aws_error(
1248 StatusCode::BAD_REQUEST,
1249 "ResourceNotFoundException",
1250 format!("Event bus {name} does not exist."),
1251 )
1252 })?;
1253
1254 if let Some(desc) = body["Description"].as_str() {
1255 bus.description = Some(desc.to_string());
1256 }
1257 if let Some(kms) = body["KmsKeyIdentifier"].as_str() {
1258 bus.kms_key_identifier = Some(kms.to_string());
1259 }
1260 if let Some(dlc) = body.get("DeadLetterConfig") {
1261 bus.dead_letter_config = Some(dlc.clone());
1262 }
1263 bus.last_modified_time = Utc::now();
1264
1265 let arn = bus.arn.clone();
1266 let bus_name = bus.name.clone();
1267
1268 Ok(AwsResponse::ok_json(json!({
1269 "Arn": arn,
1270 "Name": bus_name,
1271 })))
1272 }
1273
1274 fn put_events(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1277 let body = req.json_body();
1278 validate_optional_string_length_value("EndpointId", &body["EndpointId"], 1, 50)?;
1282 validate_required("Entries", &body["Entries"])?;
1283 let entries_array = body["Entries"].as_array().ok_or_else(|| {
1284 AwsServiceError::aws_error(
1285 StatusCode::BAD_REQUEST,
1286 "ValidationException",
1287 "Entries must be a list",
1288 )
1289 })?;
1290 if entries_array.is_empty() || entries_array.len() > 10 {
1291 return Err(AwsServiceError::aws_error(
1292 StatusCode::BAD_REQUEST,
1293 "ValidationException",
1294 "Value at 'Entries' failed to satisfy constraint: \
1295 Member must have length between 1 and 10",
1296 ));
1297 }
1298 let entries: Vec<Value> = entries_array.clone();
1299 let entries = &entries;
1300
1301 let mut accounts = self.state.write();
1302 let state = accounts.get_or_create(&req.account_id);
1303 let mut result_entries = Vec::new();
1304 let mut events_to_deliver = Vec::new();
1305 let mut failed_count = 0;
1306
1307 for entry in entries {
1308 let source = entry["Source"].as_str().unwrap_or("").to_string();
1309 let detail_type = entry["DetailType"].as_str().unwrap_or("").to_string();
1310 let detail = entry["Detail"].as_str().unwrap_or("").to_string();
1311
1312 if let Err(error) = validate_put_events_entry(&source, &detail_type, &detail) {
1313 failed_count += 1;
1314 result_entries.push(error);
1315 continue;
1316 }
1317
1318 let event_id = uuid::Uuid::new_v4().to_string();
1319 let raw_bus = entry["EventBusName"]
1320 .as_str()
1321 .unwrap_or("default")
1322 .to_string();
1323 let event_bus_name = state.resolve_bus_name(&raw_bus);
1324
1325 let caller_account = req
1331 .principal
1332 .as_ref()
1333 .map(|p| p.account_id.as_str())
1334 .unwrap_or(req.account_id.as_str());
1335 if caller_account != req.account_id {
1336 let bus_policy_value = state
1337 .buses
1338 .get(&event_bus_name)
1339 .and_then(|b| b.policy.clone());
1340 if let Some(policy_value) = bus_policy_value {
1341 let policy_json = serde_json::to_string(&policy_value).unwrap_or_default();
1342 let policy_doc = fakecloud_iam::evaluator::PolicyDocument::parse(&policy_json);
1343 let bus_arn = state
1344 .buses
1345 .get(&event_bus_name)
1346 .map(|b| b.arn.clone())
1347 .unwrap_or_default();
1348 let principal =
1349 req.principal
1350 .clone()
1351 .unwrap_or_else(|| fakecloud_core::auth::Principal {
1352 arn: Arn::global("iam", caller_account, "root").to_string(),
1353 user_id: caller_account.to_string(),
1354 account_id: caller_account.to_string(),
1355 principal_type: fakecloud_core::auth::PrincipalType::Root,
1356 source_identity: None,
1357 tags: None,
1358 });
1359 let context = fakecloud_iam::evaluator::RequestContext {
1360 aws_principal_arn: Some(principal.arn.clone()),
1361 aws_principal_account: Some(principal.account_id.clone()),
1362 ..Default::default()
1363 };
1364 let eval_req = fakecloud_iam::evaluator::EvalRequest {
1365 principal: &principal,
1366 action: "events:PutEvents".to_string(),
1367 resource: bus_arn,
1368 context,
1369 };
1370 let decision = fakecloud_iam::evaluator::evaluate_resource_policy_only(
1371 &policy_doc,
1372 &eval_req,
1373 );
1374 if !matches!(decision, fakecloud_iam::evaluator::Decision::Allow) {
1375 failed_count += 1;
1376 result_entries.push(json!({
1377 "ErrorCode": "AccessDeniedException",
1378 "ErrorMessage": format!(
1379 "User '{}' is not authorized to put events on event bus '{}'",
1380 principal.arn, event_bus_name
1381 ),
1382 }));
1383 continue;
1384 }
1385 }
1386 }
1387
1388 let time = parse_put_events_time(&entry["Time"]);
1389 let resources: Vec<String> = entry["Resources"]
1390 .as_array()
1391 .map(|arr| {
1392 arr.iter()
1393 .filter_map(|v| v.as_str().map(|s| s.to_string()))
1394 .collect()
1395 })
1396 .unwrap_or_default();
1397
1398 let event = PutEvent {
1399 event_id: event_id.clone(),
1400 source: source.clone(),
1401 detail_type: detail_type.clone(),
1402 detail: detail.clone(),
1403 event_bus_name: event_bus_name.clone(),
1404 time,
1405 resources: resources.clone(),
1406 };
1407
1408 archive_matching_event(
1409 state,
1410 &event,
1411 &event_bus_name,
1412 &source,
1413 &detail_type,
1414 &detail,
1415 &req.account_id,
1416 &req.region,
1417 &resources,
1418 );
1419
1420 state.events.push(event);
1421
1422 let matching_targets: Vec<EventTarget> = state
1424 .rules
1425 .values()
1426 .filter(|r| {
1427 r.event_bus_name == event_bus_name
1428 && r.state == "ENABLED"
1429 && matches_pattern(
1430 r.event_pattern.as_deref(),
1431 &source,
1432 &detail_type,
1433 &detail,
1434 &req.account_id,
1435 &req.region,
1436 &resources,
1437 )
1438 })
1439 .flat_map(|r| r.targets.clone())
1440 .collect();
1441
1442 if !matching_targets.is_empty() {
1443 events_to_deliver.push((
1444 event_id.clone(),
1445 source,
1446 detail_type,
1447 detail,
1448 time,
1449 resources,
1450 matching_targets,
1451 ));
1452 }
1453
1454 result_entries.push(json!({ "EventId": event_id }));
1455 }
1456
1457 drop(accounts);
1459
1460 for (event_id, source, detail_type, detail, time, resources, targets) in events_to_deliver {
1465 let detail_value: Value = serde_json::from_str(&detail).unwrap_or(json!({}));
1466 let event_json = json!({
1467 "version": "0",
1468 "id": event_id,
1469 "source": source,
1470 "account": req.account_id,
1471 "detail-type": detail_type,
1472 "detail": detail_value,
1473 "time": time.format("%Y-%m-%dT%H:%M:%SZ").to_string(),
1474 "region": req.region,
1475 "resources": resources,
1476 });
1477
1478 let ctx = EventDispatchContext {
1479 state: &self.state,
1480 delivery: &self.delivery,
1481 lambda_state: self.lambda_state.as_ref(),
1482 logs_state: self.logs_state.as_ref(),
1483 container_runtime: &self.container_runtime,
1484 account_id: &req.account_id,
1485 region: &req.region,
1486 };
1487 for target in targets {
1488 dispatch_event_target(&ctx, &target, &event_json, &event_id, &detail_type);
1489 }
1490 }
1491
1492 let resp = json!({
1493 "FailedEntryCount": failed_count,
1494 "Entries": result_entries,
1495 });
1496
1497 Ok(AwsResponse::ok_json(resp))
1498 }
1499
1500 fn tag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1503 let body = req.json_body();
1504 validate_required("ResourceARN", &body["ResourceARN"])?;
1505 let arn = body["ResourceARN"]
1506 .as_str()
1507 .ok_or_else(|| missing("ResourceARN"))?;
1508 validate_string_length("resourceARN", arn, 1, 1600)?;
1509 validate_required("Tags", &body["Tags"])?;
1510
1511 let mut accounts = self.state.write();
1512 let state = accounts.get_or_create(&req.account_id);
1513 let tag_map = find_tags_mut(state, arn)?;
1514
1515 fakecloud_core::tags::apply_tags(tag_map, &body, "Tags", "Key", "Value").map_err(|f| {
1516 AwsServiceError::aws_error(
1517 StatusCode::BAD_REQUEST,
1518 "ValidationException",
1519 format!("{f} must be a list"),
1520 )
1521 })?;
1522
1523 Ok(AwsResponse::ok_json(json!({})))
1524 }
1525
1526 fn untag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1527 let body = req.json_body();
1528 validate_required("ResourceARN", &body["ResourceARN"])?;
1529 let arn = body["ResourceARN"]
1530 .as_str()
1531 .ok_or_else(|| missing("ResourceARN"))?;
1532 validate_string_length("resourceARN", arn, 1, 1600)?;
1533 validate_required("TagKeys", &body["TagKeys"])?;
1534
1535 let mut accounts = self.state.write();
1536 let state = accounts.get_or_create(&req.account_id);
1537 let tag_map = find_tags_mut(state, arn)?;
1538
1539 fakecloud_core::tags::remove_tags(tag_map, &body, "TagKeys").map_err(|f| {
1540 AwsServiceError::aws_error(
1541 StatusCode::BAD_REQUEST,
1542 "ValidationException",
1543 format!("{f} must be a list"),
1544 )
1545 })?;
1546
1547 Ok(AwsResponse::ok_json(json!({})))
1548 }
1549
1550 fn list_tags_for_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1551 let body = req.json_body();
1552 validate_required("ResourceARN", &body["ResourceARN"])?;
1553 let arn = body["ResourceARN"]
1554 .as_str()
1555 .ok_or_else(|| missing("ResourceARN"))?;
1556 validate_string_length("resourceARN", arn, 1, 1600)?;
1557
1558 let accounts = self.state.read();
1559 let empty = EventBridgeState::new(&req.account_id, &req.region);
1560 let state = accounts.get(&req.account_id).unwrap_or(&empty);
1561 let tag_map = find_tags(state, arn)?;
1562
1563 let tags = fakecloud_core::tags::tags_to_json(tag_map, "Key", "Value");
1564
1565 Ok(AwsResponse::ok_json(json!({ "Tags": tags })))
1566 }
1567
1568 }
1570
1571struct StartReplayInput {
1581 name: String,
1582 description: Option<String>,
1583 event_source_arn: String,
1584 destination: Value,
1585 destination_arn: String,
1586 event_start_time: DateTime<Utc>,
1587 event_end_time: DateTime<Utc>,
1588}
1589
1590impl StartReplayInput {
1591 fn from_body(body: &Value) -> Result<Self, AwsServiceError> {
1592 let name = body["ReplayName"].as_str().unwrap_or("").to_string();
1600 let description = body["Description"].as_str().map(|s| s.to_string());
1601 let event_source_arn = body["EventSourceArn"].as_str().unwrap_or("").to_string();
1602 let destination = body["Destination"].clone();
1603
1604 let event_start_time = body["EventStartTime"]
1605 .as_f64()
1606 .and_then(|f| DateTime::from_timestamp(f as i64, 0))
1607 .unwrap_or_else(Utc::now);
1608 let event_end_time = body["EventEndTime"]
1609 .as_f64()
1610 .and_then(|f| DateTime::from_timestamp(f as i64, 0))
1611 .unwrap_or_else(Utc::now);
1612
1613 let destination_arn = destination["Arn"].as_str().unwrap_or("").to_string();
1614 if !destination_arn.contains(":event-bus/") {
1615 return Err(AwsServiceError::aws_error(
1619 StatusCode::BAD_REQUEST,
1620 "ResourceNotFoundException",
1621 format!("Destination.Arn {destination_arn} does not point to an event bus."),
1622 ));
1623 }
1624
1625 Ok(Self {
1626 name,
1627 description,
1628 event_source_arn,
1629 destination,
1630 destination_arn,
1631 event_start_time,
1632 event_end_time,
1633 })
1634 }
1635}
1636
1637#[path = "service_archives_replays.rs"]
1638mod service_archives_replays;
1639#[path = "service_connections_apidests.rs"]
1640mod service_connections_apidests;
1641#[path = "service_endpoints.rs"]
1642mod service_endpoints;
1643#[path = "service_partner_sources.rs"]
1644mod service_partner_sources;
1645
1646#[path = "helpers.rs"]
1647pub(crate) mod helpers;
1648pub(crate) use helpers::*;
1649
1650#[cfg(test)]
1651#[path = "service_tests.rs"]
1652mod tests;
1653
1654#[cfg(test)]
1655mod pagination_reject_test {
1656 #[test]
1657 fn paginate_checked_rejects_invalid_token() {
1658 use fakecloud_core::pagination::paginate_checked;
1659 let items: Vec<i32> = (0..5).collect();
1660 assert!(paginate_checked(&items, Some("bad"), 3).is_err());
1661 assert!(paginate_checked(&items, Some("2"), 3).is_ok());
1662 }
1663}