1use async_trait::async_trait;
2use chrono::{DateTime, Utc};
3use http::StatusCode;
4use serde_json::{json, Value};
5
6use std::collections::{BTreeMap, HashMap};
7use std::sync::Arc;
8
9use tokio::sync::Mutex as AsyncMutex;
10
11use fakecloud_aws::arn::Arn;
12use fakecloud_core::delivery::DeliveryBus;
13use fakecloud_core::pagination::paginate;
14use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
15use fakecloud_core::validation::*;
16use fakecloud_persistence::SnapshotStore;
17
18use fakecloud_lambda::runtime::ContainerRuntime;
19use fakecloud_lambda::{LambdaInvocation, SharedLambdaState};
20use fakecloud_logs::SharedLogsState;
21
22use crate::state::{
23 ApiDestination, Archive, Connection, Endpoint, EventBridgeSnapshot, EventBridgeState, EventBus,
24 EventRule, EventTarget, PartnerEventSource, PutEvent, Replay, SharedEventBridgeState,
25 EVENTBRIDGE_SNAPSHOT_SCHEMA_VERSION,
26};
27
28pub struct EventBridgeService {
29 state: SharedEventBridgeState,
30 delivery: Arc<DeliveryBus>,
31 lambda_state: Option<SharedLambdaState>,
32 logs_state: Option<SharedLogsState>,
33 container_runtime: Option<Arc<ContainerRuntime>>,
34 snapshot_store: Option<Arc<dyn SnapshotStore>>,
35 snapshot_lock: Arc<AsyncMutex<()>>,
36}
37
38impl EventBridgeService {
39 pub fn new(state: SharedEventBridgeState, delivery: Arc<DeliveryBus>) -> Self {
40 Self {
41 state,
42 delivery,
43 lambda_state: None,
44 logs_state: None,
45 container_runtime: None,
46 snapshot_store: None,
47 snapshot_lock: Arc::new(AsyncMutex::new(())),
48 }
49 }
50
51 pub fn with_lambda(mut self, lambda_state: SharedLambdaState) -> Self {
52 self.lambda_state = Some(lambda_state);
53 self
54 }
55
56 pub fn with_logs(mut self, logs_state: SharedLogsState) -> Self {
57 self.logs_state = Some(logs_state);
58 self
59 }
60
61 pub fn with_runtime(mut self, runtime: Arc<ContainerRuntime>) -> Self {
62 self.container_runtime = Some(runtime);
63 self
64 }
65
66 pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
67 self.snapshot_store = Some(store);
68 self
69 }
70
71 async fn save_snapshot(&self) {
75 let Some(store) = self.snapshot_store.clone() else {
76 return;
77 };
78 let _guard = self.snapshot_lock.lock().await;
79 let snapshot = EventBridgeSnapshot {
80 schema_version: EVENTBRIDGE_SNAPSHOT_SCHEMA_VERSION,
81 accounts: Some(self.state.read().clone()),
82 state: None,
83 };
84 let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
85 let bytes = serde_json::to_vec(&snapshot)
86 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
87 store.save(&bytes)
88 })
89 .await;
90 match join {
91 Ok(Ok(())) => {}
92 Ok(Err(err)) => tracing::error!(%err, "failed to write eventbridge snapshot"),
93 Err(err) => tracing::error!(%err, "eventbridge snapshot task panicked"),
94 }
95 }
96}
97
98#[async_trait]
99impl AwsService for EventBridgeService {
100 fn service_name(&self) -> &str {
101 "events"
102 }
103
104 async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
105 let mutates = is_mutating_action(req.action.as_str());
106 let result = match req.action.as_str() {
107 "CreateEventBus" => self.create_event_bus(&req),
108 "DeleteEventBus" => self.delete_event_bus(&req),
109 "ListEventBuses" => self.list_event_buses(&req),
110 "DescribeEventBus" => self.describe_event_bus(&req),
111 "PutRule" => self.put_rule(&req),
112 "DeleteRule" => self.delete_rule(&req),
113 "ListRules" => self.list_rules(&req),
114 "DescribeRule" => self.describe_rule(&req),
115 "EnableRule" => self.enable_rule(&req),
116 "DisableRule" => self.disable_rule(&req),
117 "PutTargets" => self.put_targets(&req),
118 "RemoveTargets" => self.remove_targets(&req),
119 "ListTargetsByRule" => self.list_targets_by_rule(&req),
120 "ListRuleNamesByTarget" => self.list_rule_names_by_target(&req),
121 "PutEvents" => self.put_events(&req),
122 "PutPermission" => self.put_permission(&req),
123 "RemovePermission" => self.remove_permission(&req),
124 "TagResource" => self.tag_resource(&req),
125 "UntagResource" => self.untag_resource(&req),
126 "ListTagsForResource" => self.list_tags_for_resource(&req),
127 "CreateArchive" => self.create_archive(&req),
128 "DescribeArchive" => self.describe_archive(&req),
129 "ListArchives" => self.list_archives(&req),
130 "UpdateArchive" => self.update_archive(&req),
131 "DeleteArchive" => self.delete_archive(&req),
132 "CreateConnection" => self.create_connection(&req),
133 "DescribeConnection" => self.describe_connection(&req),
134 "ListConnections" => self.list_connections(&req),
135 "UpdateConnection" => self.update_connection(&req),
136 "DeleteConnection" => self.delete_connection(&req),
137 "CreateApiDestination" => self.create_api_destination(&req),
138 "DescribeApiDestination" => self.describe_api_destination(&req),
139 "ListApiDestinations" => self.list_api_destinations(&req),
140 "UpdateApiDestination" => self.update_api_destination(&req),
141 "DeleteApiDestination" => self.delete_api_destination(&req),
142 "StartReplay" => self.start_replay(&req),
143 "DescribeReplay" => self.describe_replay(&req),
144 "ListReplays" => self.list_replays(&req),
145 "CancelReplay" => self.cancel_replay(&req),
146 "CreatePartnerEventSource" => self.create_partner_event_source(&req),
147 "DeletePartnerEventSource" => self.delete_partner_event_source(&req),
148 "DescribePartnerEventSource" => self.describe_partner_event_source(&req),
149 "ListPartnerEventSources" => self.list_partner_event_sources(&req),
150 "ListPartnerEventSourceAccounts" => self.list_partner_event_source_accounts(&req),
151 "ActivateEventSource" => self.activate_event_source(&req),
152 "DeactivateEventSource" => self.deactivate_event_source(&req),
153 "DescribeEventSource" => self.describe_event_source(&req),
154 "ListEventSources" => self.list_event_sources(&req),
155 "PutPartnerEvents" => self.put_partner_events(&req),
156 "TestEventPattern" => self.test_event_pattern(&req),
157 "UpdateEventBus" => self.update_event_bus(&req),
158 "CreateEndpoint" => self.create_endpoint(&req),
159 "DeleteEndpoint" => self.delete_endpoint(&req),
160 "DescribeEndpoint" => self.describe_endpoint(&req),
161 "ListEndpoints" => self.list_endpoints(&req),
162 "UpdateEndpoint" => self.update_endpoint(&req),
163 "DeauthorizeConnection" => self.deauthorize_connection(&req),
164 _ => Err(AwsServiceError::action_not_implemented(
165 "events",
166 &req.action,
167 )),
168 };
169 if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
170 self.save_snapshot().await;
171 }
172 result
173 }
174
175 fn supported_actions(&self) -> &[&str] {
176 &[
177 "CreateEventBus",
178 "DeleteEventBus",
179 "ListEventBuses",
180 "DescribeEventBus",
181 "PutRule",
182 "DeleteRule",
183 "ListRules",
184 "DescribeRule",
185 "EnableRule",
186 "DisableRule",
187 "PutTargets",
188 "RemoveTargets",
189 "ListTargetsByRule",
190 "ListRuleNamesByTarget",
191 "PutEvents",
192 "PutPermission",
193 "RemovePermission",
194 "TagResource",
195 "UntagResource",
196 "ListTagsForResource",
197 "CreateArchive",
198 "DescribeArchive",
199 "ListArchives",
200 "UpdateArchive",
201 "DeleteArchive",
202 "CreateConnection",
203 "DescribeConnection",
204 "ListConnections",
205 "UpdateConnection",
206 "DeleteConnection",
207 "CreateApiDestination",
208 "DescribeApiDestination",
209 "ListApiDestinations",
210 "UpdateApiDestination",
211 "DeleteApiDestination",
212 "StartReplay",
213 "DescribeReplay",
214 "ListReplays",
215 "CancelReplay",
216 "CreatePartnerEventSource",
217 "DeletePartnerEventSource",
218 "DescribePartnerEventSource",
219 "ListPartnerEventSources",
220 "ListPartnerEventSourceAccounts",
221 "ActivateEventSource",
222 "DeactivateEventSource",
223 "DescribeEventSource",
224 "ListEventSources",
225 "PutPartnerEvents",
226 "TestEventPattern",
227 "UpdateEventBus",
228 "CreateEndpoint",
229 "DeleteEndpoint",
230 "DescribeEndpoint",
231 "ListEndpoints",
232 "UpdateEndpoint",
233 "DeauthorizeConnection",
234 ]
235 }
236}
237
238impl EventBridgeService {
240 fn create_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
241 let body = req.json_body();
242 validate_required("Name", &body["Name"])?;
243 let name = body["Name"]
244 .as_str()
245 .ok_or_else(|| missing("Name"))?
246 .to_string();
247 validate_string_length("name", &name, 1, 256)?;
248 validate_optional_string_length(
249 "eventSourceName",
250 body["EventSourceName"].as_str(),
251 1,
252 256,
253 )?;
254 validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
255 validate_optional_string_length(
256 "kmsKeyIdentifier",
257 body["KmsKeyIdentifier"].as_str(),
258 0,
259 2048,
260 )?;
261
262 if name.contains('/') && !name.starts_with("aws.partner/") {
264 return Err(AwsServiceError::aws_error(
265 StatusCode::BAD_REQUEST,
266 "ValidationException",
267 "Event bus name must not contain '/'.",
268 ));
269 }
270
271 if name.starts_with("aws.partner/") {
273 let event_source = body["EventSourceName"].as_str().unwrap_or("");
274 let accounts_r = self.state.read();
275 let empty_r = EventBridgeState::new(&req.account_id, &req.region);
276 let state_r = accounts_r.get(&req.account_id).unwrap_or(&empty_r);
277 let has_source = state_r.partner_event_sources.contains_key(event_source);
278 drop(accounts_r);
279 if !has_source {
280 return Err(AwsServiceError::aws_error(
281 StatusCode::BAD_REQUEST,
282 "ResourceNotFoundException",
283 format!("Event source {event_source} does not exist."),
284 ));
285 }
286 }
287
288 let mut accounts = self.state.write();
289 let state = accounts.get_or_create(&req.account_id);
290
291 if state.buses.contains_key(&name) {
292 return Err(AwsServiceError::aws_error(
293 StatusCode::BAD_REQUEST,
294 "ResourceAlreadyExistsException",
295 format!("Event bus {name} already exists."),
296 ));
297 }
298
299 let arn = format!(
300 "arn:aws:events:{}:{}:event-bus/{}",
301 req.region, state.account_id, name
302 );
303 let now = Utc::now();
304 let description = body["Description"].as_str().map(|s| s.to_string());
305 let kms_key_identifier = body["KmsKeyIdentifier"].as_str().map(|s| s.to_string());
306 let dead_letter_config = body.get("DeadLetterConfig").cloned();
307
308 let tags = parse_tags(&body);
309
310 let bus = EventBus {
311 name: name.clone(),
312 arn: arn.clone(),
313 tags,
314 policy: None,
315 description,
316 kms_key_identifier,
317 dead_letter_config,
318 creation_time: now,
319 last_modified_time: now,
320 };
321 state.buses.insert(name, bus);
322
323 Ok(AwsResponse::ok_json(json!({ "EventBusArn": arn })))
324 }
325
326 fn delete_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
327 let body = req.json_body();
328 validate_required("Name", &body["Name"])?;
329 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
330 validate_string_length("name", name, 1, 256)?;
331
332 if name == "default" {
333 return Err(AwsServiceError::aws_error(
334 StatusCode::BAD_REQUEST,
335 "ValidationException",
336 format!("Cannot delete event bus {name}."),
337 ));
338 }
339
340 let mut accounts = self.state.write();
341 let state = accounts.get_or_create(&req.account_id);
342 state.buses.remove(name);
343 state.rules.retain(|k, _| k.0 != name);
344
345 Ok(AwsResponse::ok_json(json!({})))
346 }
347
348 fn list_event_buses(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
349 let body = req.json_body();
350 validate_optional_string_length_value("NamePrefix", &body["NamePrefix"], 1, 256)?;
358 validate_optional_string_length_value("NextToken", &body["NextToken"], 1, 2048)?;
359 validate_optional_json_range("Limit", &body["Limit"], 1, 100)?;
360 let name_prefix = body["NamePrefix"].as_str();
361 let limit = body["Limit"].as_i64().unwrap_or(100).clamp(1, 100) as usize;
362
363 let accounts = self.state.read();
364 let empty = EventBridgeState::new(&req.account_id, &req.region);
365 let state = accounts.get(&req.account_id).unwrap_or(&empty);
366 let filtered: Vec<&_> = state
367 .buses
368 .values()
369 .filter(|b| match name_prefix {
370 Some(prefix) => b.name.starts_with(prefix),
371 None => true,
372 })
373 .collect();
374
375 let (page, next_token) = paginate(&filtered, body["NextToken"].as_str(), limit);
376 let buses: Vec<Value> = page
377 .iter()
378 .map(|b| json!({ "Name": b.name, "Arn": b.arn }))
379 .collect();
380 let mut resp = json!({ "EventBuses": buses });
381 if let Some(token) = next_token {
382 resp["NextToken"] = json!(token);
383 }
384
385 Ok(AwsResponse::ok_json(resp))
386 }
387
388 fn describe_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
389 let body = req.json_body();
390 validate_optional_string_length("name", body["Name"].as_str(), 1, 1600)?;
391 let name = body["Name"].as_str().unwrap_or("default");
392
393 let accounts = self.state.read();
394 let empty = EventBridgeState::new(&req.account_id, &req.region);
395 let state = accounts.get(&req.account_id).unwrap_or(&empty);
396 let bus = state.buses.get(name).ok_or_else(|| {
397 AwsServiceError::aws_error(
398 StatusCode::BAD_REQUEST,
399 "ResourceNotFoundException",
400 format!("Event bus {name} does not exist."),
401 )
402 })?;
403
404 let mut resp = json!({
405 "Name": bus.name,
406 "Arn": bus.arn,
407 "CreationTime": bus.creation_time.timestamp() as f64,
408 "LastModifiedTime": bus.last_modified_time.timestamp() as f64,
409 });
410
411 if let Some(ref policy) = bus.policy {
412 resp["Policy"] = Value::String(serde_json::to_string(policy).unwrap());
413 }
414 if let Some(ref desc) = bus.description {
415 resp["Description"] = json!(desc);
416 }
417 if let Some(ref kms) = bus.kms_key_identifier {
418 resp["KmsKeyIdentifier"] = json!(kms);
419 }
420 if let Some(ref dlc) = bus.dead_letter_config {
421 resp["DeadLetterConfig"] = dlc.clone();
422 }
423
424 Ok(AwsResponse::ok_json(resp))
425 }
426
427 fn put_permission(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
430 let body = req.json_body();
431 validate_optional_string_length_value("EventBusName", &body["EventBusName"], 1, 256)?;
438 validate_optional_string_length_value("Action", &body["Action"], 1, 64)?;
439 validate_optional_string_length_value("Principal", &body["Principal"], 1, 12)?;
440 validate_optional_string_length_value("StatementId", &body["StatementId"], 1, 64)?;
441 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
442
443 let mut accounts = self.state.write();
444 let state = accounts.get_or_create(&req.account_id);
445
446 let bus = state.buses.get_mut(event_bus_name).ok_or_else(|| {
447 AwsServiceError::aws_error(
448 StatusCode::BAD_REQUEST,
449 "ResourceNotFoundException",
450 format!("Event bus {event_bus_name} does not exist."),
451 )
452 })?;
453
454 if let Some(policy_str) = body["Policy"].as_str() {
456 if let Ok(policy) = serde_json::from_str::<Value>(policy_str) {
457 bus.policy = Some(policy);
458 return Ok(AwsResponse::ok_json(json!({})));
459 }
460 }
461
462 let action = body["Action"].as_str().unwrap_or("");
469 let principal = body["Principal"].as_str().unwrap_or("");
470 let statement_id = body["StatementId"].as_str().unwrap_or("");
471
472 let statement = json!({
478 "Sid": statement_id,
479 "Effect": "Allow",
480 "Principal": { "AWS": Arn::global("iam", principal, "root").to_string() },
481 "Action": action,
482 "Resource": bus.arn,
483 });
484
485 let policy = bus.policy.get_or_insert_with(|| {
486 json!({
487 "Version": "2012-10-17",
488 "Statement": [],
489 })
490 });
491
492 if let Some(stmts) = policy["Statement"].as_array_mut() {
493 stmts.push(statement);
494 }
495
496 Ok(AwsResponse::ok_json(json!({})))
497 }
498
499 fn remove_permission(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
500 let body = req.json_body();
501 validate_optional_string_length("statementId", body["StatementId"].as_str(), 1, 64)?;
502 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 256)?;
503 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
504 let statement_id = body["StatementId"].as_str().unwrap_or("");
505 let remove_all = body["RemoveAllPermissions"].as_bool().unwrap_or(false);
506
507 let mut accounts = self.state.write();
508 let state = accounts.get_or_create(&req.account_id);
509
510 let bus = state.buses.get_mut(event_bus_name).ok_or_else(|| {
511 AwsServiceError::aws_error(
512 StatusCode::BAD_REQUEST,
513 "ResourceNotFoundException",
514 format!("Event bus {event_bus_name} does not exist."),
515 )
516 })?;
517
518 if remove_all {
519 bus.policy = None;
520 return Ok(AwsResponse::ok_json(json!({})));
521 }
522
523 let policy = bus.policy.as_mut().ok_or_else(|| {
524 AwsServiceError::aws_error(
525 StatusCode::BAD_REQUEST,
526 "ResourceNotFoundException",
527 "EventBus does not have a policy.",
528 )
529 })?;
530
531 if let Some(stmts) = policy["Statement"].as_array_mut() {
532 let before = stmts.len();
533 stmts.retain(|s| s["Sid"].as_str() != Some(statement_id));
534 if stmts.len() == before {
535 return Err(AwsServiceError::aws_error(
536 StatusCode::BAD_REQUEST,
537 "ResourceNotFoundException",
538 "Statement with the provided id does not exist.",
539 ));
540 }
541 if stmts.is_empty() {
542 bus.policy = None;
543 }
544 }
545
546 Ok(AwsResponse::ok_json(json!({})))
547 }
548
549 fn put_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
552 let body = req.json_body();
553 validate_required("Name", &body["Name"])?;
564 let name = body["Name"]
565 .as_str()
566 .ok_or_else(|| missing("Name"))?
567 .to_string();
568 validate_string_length("Name", &name, 1, 64)?;
569 validate_optional_string_length_value(
570 "ScheduleExpression",
571 &body["ScheduleExpression"],
572 0,
573 256,
574 )?;
575 validate_optional_string_length_value("EventPattern", &body["EventPattern"], 0, 4096)?;
576 validate_optional_enum_value(
577 "State",
578 &body["State"],
579 &[
580 "ENABLED",
581 "DISABLED",
582 "ENABLED_WITH_ALL_CLOUDTRAIL_MANAGEMENT_EVENTS",
583 ],
584 )?;
585 validate_optional_string_length_value("Description", &body["Description"], 0, 512)?;
586 validate_optional_string_length_value("RoleArn", &body["RoleArn"], 1, 1600)?;
587 validate_optional_string_length_value("EventBusName", &body["EventBusName"], 1, 1600)?;
588
589 let raw_bus = body["EventBusName"]
590 .as_str()
591 .unwrap_or("default")
592 .to_string();
593
594 let mut accounts = self.state.write();
595 let state = accounts.get_or_create(&req.account_id);
596 let event_bus_name = state.resolve_bus_name(&raw_bus);
597
598 let event_pattern = body["EventPattern"].as_str().and_then(|s| {
599 if s.is_empty() {
600 None
601 } else {
602 Some(s.to_string())
603 }
604 });
605 let schedule_expression = body["ScheduleExpression"].as_str().and_then(|s| {
606 if s.is_empty() {
607 None
608 } else {
609 Some(s.to_string())
610 }
611 });
612 let description = body["Description"].as_str().map(|s| s.to_string());
613 let role_arn = body["RoleArn"].as_str().map(|s| s.to_string());
614 let rule_state = body["State"].as_str().unwrap_or("ENABLED").to_string();
615
616 if !state.buses.contains_key(&event_bus_name) {
622 return Err(AwsServiceError::aws_error(
623 StatusCode::BAD_REQUEST,
624 "ResourceNotFoundException",
625 format!("Event bus {event_bus_name} does not exist."),
626 ));
627 }
628
629 let arn = if event_bus_name == "default" {
630 format!(
631 "arn:aws:events:{}:{}:rule/{}",
632 req.region, state.account_id, name
633 )
634 } else {
635 format!(
636 "arn:aws:events:{}:{}:rule/{}/{}",
637 req.region, state.account_id, event_bus_name, name
638 )
639 };
640
641 let key = (event_bus_name.clone(), name.clone());
642 let targets = state
643 .rules
644 .get(&key)
645 .map(|r| r.targets.clone())
646 .unwrap_or_default();
647
648 let tags = parse_tags(&body);
649
650 let rule = EventRule {
651 name: name.clone(),
652 arn: arn.clone(),
653 event_bus_name,
654 event_pattern,
655 schedule_expression,
656 state: rule_state,
657 description,
658 role_arn,
659 managed_by: None,
660 created_by: None,
661 targets,
662 tags,
663 last_fired: None,
664 };
665
666 state.rules.insert(key, rule);
667 Ok(AwsResponse::ok_json(json!({ "RuleArn": arn })))
668 }
669
670 fn delete_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
671 let body = req.json_body();
672 validate_required("Name", &body["Name"])?;
673 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
674 validate_string_length("name", name, 1, 64)?;
675 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
676 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
677
678 let mut accounts = self.state.write();
679 let state = accounts.get_or_create(&req.account_id);
680 let bus_name = state.resolve_bus_name(event_bus_name);
681 let key = (bus_name, name.to_string());
682
683 if let Some(rule) = state.rules.get(&key) {
685 if !rule.targets.is_empty() {
686 return Err(AwsServiceError::aws_error(
687 StatusCode::BAD_REQUEST,
688 "ValidationException",
689 "Rule can't be deleted since it has targets.",
690 ));
691 }
692 }
693
694 state.rules.remove(&key);
695 Ok(AwsResponse::ok_json(json!({})))
696 }
697
698 fn list_rules(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
699 let body = req.json_body();
700 validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 64)?;
701 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
702 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
703 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
704 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
705 let name_prefix = body["NamePrefix"].as_str();
706 let limit = body["Limit"].as_u64().map(|n| n as usize);
707 let next_token = body["NextToken"].as_str();
708
709 let accounts = self.state.read();
710 let empty = EventBridgeState::new(&req.account_id, &req.region);
711 let state = accounts.get(&req.account_id).unwrap_or(&empty);
712 let bus_name = state.resolve_bus_name(event_bus_name);
713
714 let mut rules: Vec<&EventRule> = state
715 .rules
716 .values()
717 .filter(|r| r.event_bus_name == bus_name)
718 .filter(|r| match name_prefix {
719 Some(prefix) => r.name.starts_with(prefix),
720 None => true,
721 })
722 .collect();
723 rules.sort_by(|a, b| a.name.cmp(&b.name));
724
725 let start = next_token
727 .and_then(|t| t.parse::<usize>().ok())
728 .unwrap_or(0)
729 .min(rules.len());
730 let rules_slice = &rules[start..];
731
732 let (page, new_next_token) = if let Some(lim) = limit {
733 if rules_slice.len() > lim {
734 (&rules_slice[..lim], Some((start + lim).to_string()))
735 } else {
736 (rules_slice, None)
737 }
738 } else {
739 (rules_slice, None)
740 };
741
742 let rules_json: Vec<Value> = page
743 .iter()
744 .map(|r| {
745 let mut obj = json!({
746 "Name": r.name,
747 "Arn": r.arn,
748 "EventBusName": r.event_bus_name,
749 "State": r.state,
750 });
751 if let Some(ref desc) = r.description {
752 obj["Description"] = json!(desc);
753 }
754 if let Some(ref ep) = r.event_pattern {
755 obj["EventPattern"] = json!(ep);
756 }
757 if let Some(ref se) = r.schedule_expression {
758 obj["ScheduleExpression"] = json!(se);
759 }
760 if let Some(ref role) = r.role_arn {
761 obj["RoleArn"] = json!(role);
762 }
763 if let Some(ref mb) = r.managed_by {
764 obj["ManagedBy"] = json!(mb);
765 }
766 obj
767 })
768 .collect();
769
770 let mut resp = json!({ "Rules": rules_json });
771 if let Some(token) = new_next_token {
772 resp["NextToken"] = json!(token);
773 }
774
775 Ok(AwsResponse::ok_json(resp))
776 }
777
778 fn describe_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
779 let body = req.json_body();
780 validate_required("Name", &body["Name"])?;
781 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
782 validate_string_length("name", name, 1, 64)?;
783 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
784 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
785
786 let accounts = self.state.read();
787 let empty = EventBridgeState::new(&req.account_id, &req.region);
788 let state = accounts.get(&req.account_id).unwrap_or(&empty);
789 let bus_name = state.resolve_bus_name(event_bus_name);
790 let key = (bus_name.clone(), name.to_string());
791
792 let rule = state.rules.get(&key).ok_or_else(|| {
793 AwsServiceError::aws_error(
794 StatusCode::BAD_REQUEST,
795 "ResourceNotFoundException",
796 format!("Rule {name} does not exist."),
797 )
798 })?;
799
800 let mut resp = json!({
801 "Name": rule.name,
802 "Arn": rule.arn,
803 "EventBusName": rule.event_bus_name,
804 "State": rule.state,
805 });
806
807 if let Some(ref desc) = rule.description {
808 resp["Description"] = json!(desc);
809 }
810 if let Some(ref ep) = rule.event_pattern {
811 resp["EventPattern"] = json!(ep);
812 }
813 if let Some(ref se) = rule.schedule_expression {
814 resp["ScheduleExpression"] = json!(se);
815 }
816 if let Some(ref role) = rule.role_arn {
817 resp["RoleArn"] = json!(role);
818 }
819 if let Some(ref mb) = rule.managed_by {
820 resp["ManagedBy"] = json!(mb);
821 }
822 if let Some(ref cb) = rule.created_by {
823 resp["CreatedBy"] = json!(cb);
824 }
825 if rule.event_bus_name != "default" && rule.created_by.is_none() {
827 resp["CreatedBy"] = json!(state.account_id);
828 }
829
830 Ok(AwsResponse::ok_json(resp))
831 }
832
833 fn enable_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
834 let body = req.json_body();
835 validate_required("Name", &body["Name"])?;
836 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
837 validate_string_length("name", name, 1, 64)?;
838 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
839 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
840
841 let mut accounts = self.state.write();
842 let state = accounts.get_or_create(&req.account_id);
843 let bus_name = state.resolve_bus_name(event_bus_name);
844 let key = (bus_name, name.to_string());
845
846 let rule = state.rules.get_mut(&key).ok_or_else(|| {
847 AwsServiceError::aws_error(
848 StatusCode::BAD_REQUEST,
849 "ResourceNotFoundException",
850 format!("Rule {name} does not exist."),
851 )
852 })?;
853
854 rule.state = "ENABLED".to_string();
855 Ok(AwsResponse::ok_json(json!({})))
856 }
857
858 fn disable_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
859 let body = req.json_body();
860 validate_required("Name", &body["Name"])?;
861 let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
862 validate_string_length("name", name, 1, 64)?;
863 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
864 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
865
866 let mut accounts = self.state.write();
867 let state = accounts.get_or_create(&req.account_id);
868 let bus_name = state.resolve_bus_name(event_bus_name);
869 let key = (bus_name, name.to_string());
870
871 let rule = state.rules.get_mut(&key).ok_or_else(|| {
872 AwsServiceError::aws_error(
873 StatusCode::BAD_REQUEST,
874 "ResourceNotFoundException",
875 format!("Rule {name} does not exist."),
876 )
877 })?;
878
879 rule.state = "DISABLED".to_string();
880 Ok(AwsResponse::ok_json(json!({})))
881 }
882
883 fn put_targets(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
886 let body = req.json_body();
887 validate_required("Rule", &body["Rule"])?;
894 let rule_name = body["Rule"].as_str().ok_or_else(|| missing("Rule"))?;
895 validate_string_length("Rule", rule_name, 1, 64)?;
896 validate_optional_string_length_value("EventBusName", &body["EventBusName"], 1, 1600)?;
897 validate_required("Targets", &body["Targets"])?;
902 let targets_array = body["Targets"].as_array().ok_or_else(|| {
903 AwsServiceError::aws_error(
904 StatusCode::BAD_REQUEST,
905 "ValidationException",
906 "Targets must be a list",
907 )
908 })?;
909 if targets_array.is_empty() || targets_array.len() > 100 {
910 return Err(AwsServiceError::aws_error(
911 StatusCode::BAD_REQUEST,
912 "ValidationException",
913 "Value at 'Targets' failed to satisfy constraint: \
914 Member must have length between 1 and 100",
915 ));
916 }
917 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
918 let targets: Vec<Value> = targets_array.clone();
919
920 let mut accounts = self.state.write();
921 let state = accounts.get_or_create(&req.account_id);
922 let bus_name = state.resolve_bus_name(event_bus_name);
923 let key = (bus_name.clone(), rule_name.to_string());
924
925 let rule = state.rules.get_mut(&key).ok_or_else(|| {
926 AwsServiceError::aws_error(
927 StatusCode::BAD_REQUEST,
928 "ResourceNotFoundException",
929 format!("Rule {rule_name} does not exist on EventBus {bus_name}."),
930 )
931 })?;
932
933 let mut failed_entries: Vec<Value> = Vec::new();
934 for target in &targets {
935 let target_id = target["Id"].as_str().unwrap_or("").to_string();
936 let target_arn = target["Arn"].as_str().unwrap_or("");
937
938 if target_arn.ends_with(".fifo") && target.get("SqsParameters").is_none() {
939 failed_entries.push(json!({
940 "TargetId": target_id,
941 "ErrorCode": "ValidationException",
942 "ErrorMessage": format!(
943 "Parameter(s) SqsParameters must be specified for target: {target_id}."
944 ),
945 }));
946 continue;
947 }
948 if !target_arn.starts_with("arn:") {
949 failed_entries.push(json!({
950 "TargetId": target_id,
951 "ErrorCode": "ValidationException",
952 "ErrorMessage": format!(
953 "Parameter {target_arn} is not valid. Reason: Provided Arn is not in correct format."
954 ),
955 }));
956 continue;
957 }
958
959 let et = parse_target(target);
960 rule.targets.retain(|t| t.id != et.id);
961 rule.targets.push(et);
962 }
963
964 Ok(AwsResponse::ok_json(json!({
965 "FailedEntryCount": failed_entries.len(),
966 "FailedEntries": failed_entries,
967 })))
968 }
969
970 fn remove_targets(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
971 let body = req.json_body();
972 validate_required("Rule", &body["Rule"])?;
973 let rule_name = body["Rule"].as_str().ok_or_else(|| missing("Rule"))?;
974 validate_string_length("rule", rule_name, 1, 64)?;
975 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
976 validate_required("Ids", &body["Ids"])?;
977 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
978 let ids = body["Ids"].as_array().ok_or_else(|| missing("Ids"))?;
979
980 let target_ids: Vec<String> = ids
981 .iter()
982 .filter_map(|v| v.as_str().map(|s| s.to_string()))
983 .collect();
984
985 let mut accounts = self.state.write();
986 let state = accounts.get_or_create(&req.account_id);
987 let bus_name = state.resolve_bus_name(event_bus_name);
988 let key = (bus_name.clone(), rule_name.to_string());
989
990 let rule = state.rules.get_mut(&key).ok_or_else(|| {
991 AwsServiceError::aws_error(
992 StatusCode::BAD_REQUEST,
993 "ResourceNotFoundException",
994 format!("Rule {rule_name} does not exist on EventBus {bus_name}."),
995 )
996 })?;
997
998 rule.targets.retain(|t| !target_ids.contains(&t.id));
999
1000 Ok(AwsResponse::ok_json(json!({
1001 "FailedEntryCount": 0,
1002 "FailedEntries": [],
1003 })))
1004 }
1005
1006 fn list_targets_by_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1007 let body = req.json_body();
1008 validate_required("Rule", &body["Rule"])?;
1009 let rule_name = body["Rule"].as_str().ok_or_else(|| missing("Rule"))?;
1010 validate_string_length("rule", rule_name, 1, 64)?;
1011 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
1012 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1013 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1014 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
1015 let limit = body["Limit"].as_u64().map(|n| n as usize);
1016 let next_token = body["NextToken"].as_str();
1017
1018 let accounts = self.state.read();
1019 let empty = EventBridgeState::new(&req.account_id, &req.region);
1020 let state = accounts.get(&req.account_id).unwrap_or(&empty);
1021 let bus_name = state.resolve_bus_name(event_bus_name);
1022 let key = (bus_name, rule_name.to_string());
1023
1024 let rule = state.rules.get(&key).ok_or_else(|| {
1025 AwsServiceError::aws_error(
1026 StatusCode::BAD_REQUEST,
1027 "ResourceNotFoundException",
1028 format!("Rule {rule_name} does not exist."),
1029 )
1030 })?;
1031
1032 let all_targets = &rule.targets;
1033 let start = next_token
1034 .and_then(|t| t.parse::<usize>().ok())
1035 .unwrap_or(0)
1036 .min(all_targets.len());
1037 let slice = &all_targets[start..];
1038
1039 let (page, new_next_token) = if let Some(lim) = limit {
1040 if slice.len() > lim {
1041 (&slice[..lim], Some((start + lim).to_string()))
1042 } else {
1043 (slice, None)
1044 }
1045 } else {
1046 (slice, None)
1047 };
1048
1049 let targets: Vec<Value> = page.iter().map(target_to_json).collect();
1050
1051 let mut resp = json!({ "Targets": targets });
1052 if let Some(token) = new_next_token {
1053 resp["NextToken"] = json!(token);
1054 }
1055
1056 Ok(AwsResponse::ok_json(resp))
1057 }
1058
1059 fn list_rule_names_by_target(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1060 let body = req.json_body();
1061 validate_required("TargetArn", &body["TargetArn"])?;
1062 let target_arn = body["TargetArn"]
1063 .as_str()
1064 .ok_or_else(|| missing("TargetArn"))?;
1065 validate_string_length("targetArn", target_arn, 1, 1600)?;
1066 validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
1067 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1068 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1069 let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
1070 let limit = body["Limit"].as_u64().map(|n| n as usize);
1071 let next_token = body["NextToken"].as_str();
1072
1073 let accounts = self.state.read();
1074 let empty = EventBridgeState::new(&req.account_id, &req.region);
1075 let state = accounts.get(&req.account_id).unwrap_or(&empty);
1076 let bus_name = state.resolve_bus_name(event_bus_name);
1077
1078 let mut rule_names: Vec<String> = Vec::new();
1080 for rule in state.rules.values() {
1081 if rule.event_bus_name == bus_name
1082 && rule.targets.iter().any(|t| t.arn == target_arn)
1083 && !rule_names.contains(&rule.name)
1084 {
1085 rule_names.push(rule.name.clone());
1086 }
1087 }
1088 rule_names.sort();
1089
1090 let start = next_token
1091 .and_then(|t| t.parse::<usize>().ok())
1092 .unwrap_or(0)
1093 .min(rule_names.len());
1094 let slice = &rule_names[start..];
1095
1096 let (page, new_next_token) = if let Some(lim) = limit {
1097 if slice.len() > lim {
1098 (&slice[..lim], Some((start + lim).to_string()))
1099 } else {
1100 (slice, None)
1101 }
1102 } else {
1103 (slice, None)
1104 };
1105
1106 let mut resp = json!({ "RuleNames": page });
1107 if let Some(token) = new_next_token {
1108 resp["NextToken"] = json!(token);
1109 }
1110
1111 Ok(AwsResponse::ok_json(resp))
1112 }
1113
1114 fn test_event_pattern(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1117 let body = req.json_body();
1118 validate_required("EventPattern", &body["EventPattern"])?;
1119 validate_required("Event", &body["Event"])?;
1120 let event_pattern = body["EventPattern"]
1121 .as_str()
1122 .ok_or_else(|| missing("EventPattern"))?;
1123 let event_str = body["Event"].as_str().ok_or_else(|| missing("Event"))?;
1124
1125 let event: Value = serde_json::from_str(event_str).map_err(|_| {
1127 AwsServiceError::aws_error(
1128 StatusCode::BAD_REQUEST,
1129 "InvalidEventPatternException",
1130 "Event is not valid JSON.",
1131 )
1132 })?;
1133
1134 let _pattern: Value = serde_json::from_str(event_pattern).map_err(|_| {
1136 AwsServiceError::aws_error(
1137 StatusCode::BAD_REQUEST,
1138 "InvalidEventPatternException",
1139 "Event pattern is not valid JSON.",
1140 )
1141 })?;
1142
1143 let source = event["source"].as_str().unwrap_or("");
1144 let detail_type = event["detail-type"].as_str().unwrap_or("");
1145 let detail = event
1146 .get("detail")
1147 .map(|v| serde_json::to_string(v).unwrap_or_default())
1148 .unwrap_or_else(|| "{}".to_string());
1149 let account = event["account"].as_str().unwrap_or("");
1150 let region = event["region"].as_str().unwrap_or("");
1151 let resources: Vec<String> = event["resources"]
1152 .as_array()
1153 .map(|arr| {
1154 arr.iter()
1155 .filter_map(|v| v.as_str().map(|s| s.to_string()))
1156 .collect()
1157 })
1158 .unwrap_or_default();
1159
1160 let result = matches_pattern(
1161 Some(event_pattern),
1162 source,
1163 detail_type,
1164 &detail,
1165 account,
1166 region,
1167 &resources,
1168 );
1169
1170 Ok(AwsResponse::ok_json(json!({ "Result": result })))
1171 }
1172
1173 fn update_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1176 let body = req.json_body();
1177 validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
1178 validate_optional_string_length(
1179 "kmsKeyIdentifier",
1180 body["KmsKeyIdentifier"].as_str(),
1181 0,
1182 2048,
1183 )?;
1184 let name = body["Name"].as_str().unwrap_or("default");
1185
1186 let mut accounts = self.state.write();
1187 let state = accounts.get_or_create(&req.account_id);
1188 let bus = state.buses.get_mut(name).ok_or_else(|| {
1189 AwsServiceError::aws_error(
1190 StatusCode::BAD_REQUEST,
1191 "ResourceNotFoundException",
1192 format!("Event bus {name} does not exist."),
1193 )
1194 })?;
1195
1196 if let Some(desc) = body["Description"].as_str() {
1197 bus.description = Some(desc.to_string());
1198 }
1199 if let Some(kms) = body["KmsKeyIdentifier"].as_str() {
1200 bus.kms_key_identifier = Some(kms.to_string());
1201 }
1202 if let Some(dlc) = body.get("DeadLetterConfig") {
1203 bus.dead_letter_config = Some(dlc.clone());
1204 }
1205 bus.last_modified_time = Utc::now();
1206
1207 let arn = bus.arn.clone();
1208 let bus_name = bus.name.clone();
1209
1210 Ok(AwsResponse::ok_json(json!({
1211 "Arn": arn,
1212 "Name": bus_name,
1213 })))
1214 }
1215
1216 fn put_events(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1219 let body = req.json_body();
1220 validate_optional_string_length_value("EndpointId", &body["EndpointId"], 1, 50)?;
1224 validate_required("Entries", &body["Entries"])?;
1225 let entries_array = body["Entries"].as_array().ok_or_else(|| {
1226 AwsServiceError::aws_error(
1227 StatusCode::BAD_REQUEST,
1228 "ValidationException",
1229 "Entries must be a list",
1230 )
1231 })?;
1232 if entries_array.is_empty() || entries_array.len() > 10 {
1233 return Err(AwsServiceError::aws_error(
1234 StatusCode::BAD_REQUEST,
1235 "ValidationException",
1236 "Value at 'Entries' failed to satisfy constraint: \
1237 Member must have length between 1 and 10",
1238 ));
1239 }
1240 let entries: Vec<Value> = entries_array.clone();
1241 let entries = &entries;
1242
1243 let mut accounts = self.state.write();
1244 let state = accounts.get_or_create(&req.account_id);
1245 let mut result_entries = Vec::new();
1246 let mut events_to_deliver = Vec::new();
1247 let mut failed_count = 0;
1248
1249 for entry in entries {
1250 let source = entry["Source"].as_str().unwrap_or("").to_string();
1251 let detail_type = entry["DetailType"].as_str().unwrap_or("").to_string();
1252 let detail = entry["Detail"].as_str().unwrap_or("").to_string();
1253
1254 if let Err(error) = validate_put_events_entry(&source, &detail_type, &detail) {
1255 failed_count += 1;
1256 result_entries.push(error);
1257 continue;
1258 }
1259
1260 let event_id = uuid::Uuid::new_v4().to_string();
1261 let raw_bus = entry["EventBusName"]
1262 .as_str()
1263 .unwrap_or("default")
1264 .to_string();
1265 let event_bus_name = state.resolve_bus_name(&raw_bus);
1266
1267 let caller_account = req
1273 .principal
1274 .as_ref()
1275 .map(|p| p.account_id.as_str())
1276 .unwrap_or(req.account_id.as_str());
1277 if caller_account != req.account_id {
1278 let bus_policy_value = state
1279 .buses
1280 .get(&event_bus_name)
1281 .and_then(|b| b.policy.clone());
1282 if let Some(policy_value) = bus_policy_value {
1283 let policy_json = serde_json::to_string(&policy_value).unwrap_or_default();
1284 let policy_doc = fakecloud_iam::evaluator::PolicyDocument::parse(&policy_json);
1285 let bus_arn = state
1286 .buses
1287 .get(&event_bus_name)
1288 .map(|b| b.arn.clone())
1289 .unwrap_or_default();
1290 let principal =
1291 req.principal
1292 .clone()
1293 .unwrap_or_else(|| fakecloud_core::auth::Principal {
1294 arn: Arn::global("iam", caller_account, "root").to_string(),
1295 user_id: caller_account.to_string(),
1296 account_id: caller_account.to_string(),
1297 principal_type: fakecloud_core::auth::PrincipalType::Root,
1298 source_identity: None,
1299 tags: None,
1300 });
1301 let context = fakecloud_iam::evaluator::RequestContext {
1302 aws_principal_arn: Some(principal.arn.clone()),
1303 aws_principal_account: Some(principal.account_id.clone()),
1304 ..Default::default()
1305 };
1306 let eval_req = fakecloud_iam::evaluator::EvalRequest {
1307 principal: &principal,
1308 action: "events:PutEvents".to_string(),
1309 resource: bus_arn,
1310 context,
1311 };
1312 let decision = fakecloud_iam::evaluator::evaluate_resource_policy_only(
1313 &policy_doc,
1314 &eval_req,
1315 );
1316 if !matches!(decision, fakecloud_iam::evaluator::Decision::Allow) {
1317 failed_count += 1;
1318 result_entries.push(json!({
1319 "ErrorCode": "AccessDeniedException",
1320 "ErrorMessage": format!(
1321 "User '{}' is not authorized to put events on event bus '{}'",
1322 principal.arn, event_bus_name
1323 ),
1324 }));
1325 continue;
1326 }
1327 }
1328 }
1329
1330 let time = parse_put_events_time(&entry["Time"]);
1331 let resources: Vec<String> = entry["Resources"]
1332 .as_array()
1333 .map(|arr| {
1334 arr.iter()
1335 .filter_map(|v| v.as_str().map(|s| s.to_string()))
1336 .collect()
1337 })
1338 .unwrap_or_default();
1339
1340 let event = PutEvent {
1341 event_id: event_id.clone(),
1342 source: source.clone(),
1343 detail_type: detail_type.clone(),
1344 detail: detail.clone(),
1345 event_bus_name: event_bus_name.clone(),
1346 time,
1347 resources: resources.clone(),
1348 };
1349
1350 archive_matching_event(
1351 state,
1352 &event,
1353 &event_bus_name,
1354 &source,
1355 &detail_type,
1356 &detail,
1357 &req.account_id,
1358 &req.region,
1359 &resources,
1360 );
1361
1362 state.events.push(event);
1363
1364 let matching_targets: Vec<EventTarget> = state
1366 .rules
1367 .values()
1368 .filter(|r| {
1369 r.event_bus_name == event_bus_name
1370 && r.state == "ENABLED"
1371 && matches_pattern(
1372 r.event_pattern.as_deref(),
1373 &source,
1374 &detail_type,
1375 &detail,
1376 &req.account_id,
1377 &req.region,
1378 &resources,
1379 )
1380 })
1381 .flat_map(|r| r.targets.clone())
1382 .collect();
1383
1384 if !matching_targets.is_empty() {
1385 events_to_deliver.push((
1386 event_id.clone(),
1387 source,
1388 detail_type,
1389 detail,
1390 time,
1391 resources,
1392 matching_targets,
1393 ));
1394 }
1395
1396 result_entries.push(json!({ "EventId": event_id }));
1397 }
1398
1399 drop(accounts);
1401
1402 for (event_id, source, detail_type, detail, time, resources, targets) in events_to_deliver {
1407 let detail_value: Value = serde_json::from_str(&detail).unwrap_or(json!({}));
1408 let event_json = json!({
1409 "version": "0",
1410 "id": event_id,
1411 "source": source,
1412 "account": req.account_id,
1413 "detail-type": detail_type,
1414 "detail": detail_value,
1415 "time": time.format("%Y-%m-%dT%H:%M:%SZ").to_string(),
1416 "region": req.region,
1417 "resources": resources,
1418 });
1419
1420 let ctx = EventDispatchContext {
1421 state: &self.state,
1422 delivery: &self.delivery,
1423 lambda_state: self.lambda_state.as_ref(),
1424 logs_state: self.logs_state.as_ref(),
1425 container_runtime: &self.container_runtime,
1426 account_id: &req.account_id,
1427 region: &req.region,
1428 };
1429 for target in targets {
1430 dispatch_event_target(&ctx, &target, &event_json, &event_id, &detail_type);
1431 }
1432 }
1433
1434 let resp = json!({
1435 "FailedEntryCount": failed_count,
1436 "Entries": result_entries,
1437 });
1438
1439 Ok(AwsResponse::ok_json(resp))
1440 }
1441
1442 fn tag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1445 let body = req.json_body();
1446 validate_required("ResourceARN", &body["ResourceARN"])?;
1447 let arn = body["ResourceARN"]
1448 .as_str()
1449 .ok_or_else(|| missing("ResourceARN"))?;
1450 validate_string_length("resourceARN", arn, 1, 1600)?;
1451 validate_required("Tags", &body["Tags"])?;
1452
1453 let mut accounts = self.state.write();
1454 let state = accounts.get_or_create(&req.account_id);
1455 let tag_map = find_tags_mut(state, arn)?;
1456
1457 fakecloud_core::tags::apply_tags(tag_map, &body, "Tags", "Key", "Value").map_err(|f| {
1458 AwsServiceError::aws_error(
1459 StatusCode::BAD_REQUEST,
1460 "ValidationException",
1461 format!("{f} must be a list"),
1462 )
1463 })?;
1464
1465 Ok(AwsResponse::ok_json(json!({})))
1466 }
1467
1468 fn untag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1469 let body = req.json_body();
1470 validate_required("ResourceARN", &body["ResourceARN"])?;
1471 let arn = body["ResourceARN"]
1472 .as_str()
1473 .ok_or_else(|| missing("ResourceARN"))?;
1474 validate_string_length("resourceARN", arn, 1, 1600)?;
1475 validate_required("TagKeys", &body["TagKeys"])?;
1476
1477 let mut accounts = self.state.write();
1478 let state = accounts.get_or_create(&req.account_id);
1479 let tag_map = find_tags_mut(state, arn)?;
1480
1481 fakecloud_core::tags::remove_tags(tag_map, &body, "TagKeys").map_err(|f| {
1482 AwsServiceError::aws_error(
1483 StatusCode::BAD_REQUEST,
1484 "ValidationException",
1485 format!("{f} must be a list"),
1486 )
1487 })?;
1488
1489 Ok(AwsResponse::ok_json(json!({})))
1490 }
1491
1492 fn list_tags_for_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1493 let body = req.json_body();
1494 validate_required("ResourceARN", &body["ResourceARN"])?;
1495 let arn = body["ResourceARN"]
1496 .as_str()
1497 .ok_or_else(|| missing("ResourceARN"))?;
1498 validate_string_length("resourceARN", arn, 1, 1600)?;
1499
1500 let accounts = self.state.read();
1501 let empty = EventBridgeState::new(&req.account_id, &req.region);
1502 let state = accounts.get(&req.account_id).unwrap_or(&empty);
1503 let tag_map = find_tags(state, arn)?;
1504
1505 let tags = fakecloud_core::tags::tags_to_json(tag_map, "Key", "Value");
1506
1507 Ok(AwsResponse::ok_json(json!({ "Tags": tags })))
1508 }
1509
1510 }
1512
1513struct StartReplayInput {
1523 name: String,
1524 description: Option<String>,
1525 event_source_arn: String,
1526 destination: Value,
1527 destination_arn: String,
1528 event_start_time: DateTime<Utc>,
1529 event_end_time: DateTime<Utc>,
1530}
1531
1532impl StartReplayInput {
1533 fn from_body(body: &Value) -> Result<Self, AwsServiceError> {
1534 let name = body["ReplayName"].as_str().unwrap_or("").to_string();
1542 let description = body["Description"].as_str().map(|s| s.to_string());
1543 let event_source_arn = body["EventSourceArn"].as_str().unwrap_or("").to_string();
1544 let destination = body["Destination"].clone();
1545
1546 let event_start_time = body["EventStartTime"]
1547 .as_f64()
1548 .and_then(|f| DateTime::from_timestamp(f as i64, 0))
1549 .unwrap_or_else(Utc::now);
1550 let event_end_time = body["EventEndTime"]
1551 .as_f64()
1552 .and_then(|f| DateTime::from_timestamp(f as i64, 0))
1553 .unwrap_or_else(Utc::now);
1554
1555 let destination_arn = destination["Arn"].as_str().unwrap_or("").to_string();
1556 if !destination_arn.contains(":event-bus/") {
1557 return Err(AwsServiceError::aws_error(
1561 StatusCode::BAD_REQUEST,
1562 "ResourceNotFoundException",
1563 format!("Destination.Arn {destination_arn} does not point to an event bus."),
1564 ));
1565 }
1566
1567 Ok(Self {
1568 name,
1569 description,
1570 event_source_arn,
1571 destination,
1572 destination_arn,
1573 event_start_time,
1574 event_end_time,
1575 })
1576 }
1577}
1578
1579#[path = "service_archives_replays.rs"]
1580mod service_archives_replays;
1581#[path = "service_connections_apidests.rs"]
1582mod service_connections_apidests;
1583#[path = "service_endpoints.rs"]
1584mod service_endpoints;
1585#[path = "service_partner_sources.rs"]
1586mod service_partner_sources;
1587
1588#[path = "helpers.rs"]
1589mod helpers;
1590pub(crate) use helpers::*;
1591
1592#[cfg(test)]
1593#[path = "service_tests.rs"]
1594mod tests;