Skip to main content

fakecloud_eventbridge/
service.rs

1use async_trait::async_trait;
2use chrono::{DateTime, Utc};
3use http::StatusCode;
4use serde_json::{json, Value};
5
6use std::collections::{BTreeMap, HashMap};
7use std::sync::Arc;
8
9use tokio::sync::Mutex as AsyncMutex;
10
11use fakecloud_aws::arn::Arn;
12use fakecloud_core::delivery::DeliveryBus;
13use fakecloud_core::pagination::paginate;
14use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
15use fakecloud_core::validation::*;
16use fakecloud_persistence::SnapshotStore;
17
18use fakecloud_lambda::runtime::ContainerRuntime;
19use fakecloud_lambda::{LambdaInvocation, SharedLambdaState};
20use fakecloud_logs::SharedLogsState;
21
22use crate::state::{
23    ApiDestination, Archive, Connection, Endpoint, EventBridgeSnapshot, EventBridgeState, EventBus,
24    EventRule, EventTarget, PartnerEventSource, PutEvent, Replay, SharedEventBridgeState,
25    EVENTBRIDGE_SNAPSHOT_SCHEMA_VERSION,
26};
27
28pub struct EventBridgeService {
29    state: SharedEventBridgeState,
30    delivery: Arc<DeliveryBus>,
31    lambda_state: Option<SharedLambdaState>,
32    logs_state: Option<SharedLogsState>,
33    container_runtime: Option<Arc<ContainerRuntime>>,
34    snapshot_store: Option<Arc<dyn SnapshotStore>>,
35    snapshot_lock: Arc<AsyncMutex<()>>,
36}
37
38impl EventBridgeService {
39    pub fn new(state: SharedEventBridgeState, delivery: Arc<DeliveryBus>) -> Self {
40        Self {
41            state,
42            delivery,
43            lambda_state: None,
44            logs_state: None,
45            container_runtime: None,
46            snapshot_store: None,
47            snapshot_lock: Arc::new(AsyncMutex::new(())),
48        }
49    }
50
51    pub fn with_lambda(mut self, lambda_state: SharedLambdaState) -> Self {
52        self.lambda_state = Some(lambda_state);
53        self
54    }
55
56    pub fn with_logs(mut self, logs_state: SharedLogsState) -> Self {
57        self.logs_state = Some(logs_state);
58        self
59    }
60
61    pub fn with_runtime(mut self, runtime: Arc<ContainerRuntime>) -> Self {
62        self.container_runtime = Some(runtime);
63        self
64    }
65
66    pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
67        self.snapshot_store = Some(store);
68        self
69    }
70
71    /// Persist current state as a snapshot. Held across the
72    /// clone-serialize-write sequence to prevent stale-last writes,
73    /// with serde + file I/O offloaded to the blocking pool.
74    async fn save_snapshot(&self) {
75        let Some(store) = self.snapshot_store.clone() else {
76            return;
77        };
78        let _guard = self.snapshot_lock.lock().await;
79        let snapshot = EventBridgeSnapshot {
80            schema_version: EVENTBRIDGE_SNAPSHOT_SCHEMA_VERSION,
81            accounts: Some(self.state.read().clone()),
82            state: None,
83        };
84        let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
85            let bytes = serde_json::to_vec(&snapshot)
86                .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
87            store.save(&bytes)
88        })
89        .await;
90        match join {
91            Ok(Ok(())) => {}
92            Ok(Err(err)) => tracing::error!(%err, "failed to write eventbridge snapshot"),
93            Err(err) => tracing::error!(%err, "eventbridge snapshot task panicked"),
94        }
95    }
96}
97
98#[async_trait]
99impl AwsService for EventBridgeService {
100    fn service_name(&self) -> &str {
101        "events"
102    }
103
104    async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
105        let mutates = is_mutating_action(req.action.as_str());
106        let result = match req.action.as_str() {
107            "CreateEventBus" => self.create_event_bus(&req),
108            "DeleteEventBus" => self.delete_event_bus(&req),
109            "ListEventBuses" => self.list_event_buses(&req),
110            "DescribeEventBus" => self.describe_event_bus(&req),
111            "PutRule" => self.put_rule(&req),
112            "DeleteRule" => self.delete_rule(&req),
113            "ListRules" => self.list_rules(&req),
114            "DescribeRule" => self.describe_rule(&req),
115            "EnableRule" => self.enable_rule(&req),
116            "DisableRule" => self.disable_rule(&req),
117            "PutTargets" => self.put_targets(&req),
118            "RemoveTargets" => self.remove_targets(&req),
119            "ListTargetsByRule" => self.list_targets_by_rule(&req),
120            "ListRuleNamesByTarget" => self.list_rule_names_by_target(&req),
121            "PutEvents" => self.put_events(&req),
122            "PutPermission" => self.put_permission(&req),
123            "RemovePermission" => self.remove_permission(&req),
124            "TagResource" => self.tag_resource(&req),
125            "UntagResource" => self.untag_resource(&req),
126            "ListTagsForResource" => self.list_tags_for_resource(&req),
127            "CreateArchive" => self.create_archive(&req),
128            "DescribeArchive" => self.describe_archive(&req),
129            "ListArchives" => self.list_archives(&req),
130            "UpdateArchive" => self.update_archive(&req),
131            "DeleteArchive" => self.delete_archive(&req),
132            "CreateConnection" => self.create_connection(&req),
133            "DescribeConnection" => self.describe_connection(&req),
134            "ListConnections" => self.list_connections(&req),
135            "UpdateConnection" => self.update_connection(&req),
136            "DeleteConnection" => self.delete_connection(&req),
137            "CreateApiDestination" => self.create_api_destination(&req),
138            "DescribeApiDestination" => self.describe_api_destination(&req),
139            "ListApiDestinations" => self.list_api_destinations(&req),
140            "UpdateApiDestination" => self.update_api_destination(&req),
141            "DeleteApiDestination" => self.delete_api_destination(&req),
142            "StartReplay" => self.start_replay(&req),
143            "DescribeReplay" => self.describe_replay(&req),
144            "ListReplays" => self.list_replays(&req),
145            "CancelReplay" => self.cancel_replay(&req),
146            "CreatePartnerEventSource" => self.create_partner_event_source(&req),
147            "DeletePartnerEventSource" => self.delete_partner_event_source(&req),
148            "DescribePartnerEventSource" => self.describe_partner_event_source(&req),
149            "ListPartnerEventSources" => self.list_partner_event_sources(&req),
150            "ListPartnerEventSourceAccounts" => self.list_partner_event_source_accounts(&req),
151            "ActivateEventSource" => self.activate_event_source(&req),
152            "DeactivateEventSource" => self.deactivate_event_source(&req),
153            "DescribeEventSource" => self.describe_event_source(&req),
154            "ListEventSources" => self.list_event_sources(&req),
155            "PutPartnerEvents" => self.put_partner_events(&req),
156            "TestEventPattern" => self.test_event_pattern(&req),
157            "UpdateEventBus" => self.update_event_bus(&req),
158            "CreateEndpoint" => self.create_endpoint(&req),
159            "DeleteEndpoint" => self.delete_endpoint(&req),
160            "DescribeEndpoint" => self.describe_endpoint(&req),
161            "ListEndpoints" => self.list_endpoints(&req),
162            "UpdateEndpoint" => self.update_endpoint(&req),
163            "DeauthorizeConnection" => self.deauthorize_connection(&req),
164            _ => Err(AwsServiceError::action_not_implemented(
165                "events",
166                &req.action,
167            )),
168        };
169        if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
170            self.save_snapshot().await;
171        }
172        result
173    }
174
175    fn supported_actions(&self) -> &[&str] {
176        &[
177            "CreateEventBus",
178            "DeleteEventBus",
179            "ListEventBuses",
180            "DescribeEventBus",
181            "PutRule",
182            "DeleteRule",
183            "ListRules",
184            "DescribeRule",
185            "EnableRule",
186            "DisableRule",
187            "PutTargets",
188            "RemoveTargets",
189            "ListTargetsByRule",
190            "ListRuleNamesByTarget",
191            "PutEvents",
192            "PutPermission",
193            "RemovePermission",
194            "TagResource",
195            "UntagResource",
196            "ListTagsForResource",
197            "CreateArchive",
198            "DescribeArchive",
199            "ListArchives",
200            "UpdateArchive",
201            "DeleteArchive",
202            "CreateConnection",
203            "DescribeConnection",
204            "ListConnections",
205            "UpdateConnection",
206            "DeleteConnection",
207            "CreateApiDestination",
208            "DescribeApiDestination",
209            "ListApiDestinations",
210            "UpdateApiDestination",
211            "DeleteApiDestination",
212            "StartReplay",
213            "DescribeReplay",
214            "ListReplays",
215            "CancelReplay",
216            "CreatePartnerEventSource",
217            "DeletePartnerEventSource",
218            "DescribePartnerEventSource",
219            "ListPartnerEventSources",
220            "ListPartnerEventSourceAccounts",
221            "ActivateEventSource",
222            "DeactivateEventSource",
223            "DescribeEventSource",
224            "ListEventSources",
225            "PutPartnerEvents",
226            "TestEventPattern",
227            "UpdateEventBus",
228            "CreateEndpoint",
229            "DeleteEndpoint",
230            "DescribeEndpoint",
231            "ListEndpoints",
232            "UpdateEndpoint",
233            "DeauthorizeConnection",
234        ]
235    }
236}
237
238// ─── Event Bus Operations ───────────────────────────────────────────
239impl EventBridgeService {
240    fn create_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
241        let body = req.json_body();
242        validate_required("Name", &body["Name"])?;
243        let name = body["Name"]
244            .as_str()
245            .ok_or_else(|| missing("Name"))?
246            .to_string();
247        validate_string_length("name", &name, 1, 256)?;
248        validate_optional_string_length(
249            "eventSourceName",
250            body["EventSourceName"].as_str(),
251            1,
252            256,
253        )?;
254        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
255        validate_optional_string_length(
256            "kmsKeyIdentifier",
257            body["KmsKeyIdentifier"].as_str(),
258            0,
259            2048,
260        )?;
261
262        // Validate name doesn't contain '/' (unless partner bus)
263        if name.contains('/') && !name.starts_with("aws.partner/") {
264            return Err(AwsServiceError::aws_error(
265                StatusCode::BAD_REQUEST,
266                "ValidationException",
267                "Event bus name must not contain '/'.",
268            ));
269        }
270
271        // Partner event bus validation
272        if name.starts_with("aws.partner/") {
273            let event_source = body["EventSourceName"].as_str().unwrap_or("");
274            let accounts_r = self.state.read();
275            let empty_r = EventBridgeState::new(&req.account_id, &req.region);
276            let state_r = accounts_r.get(&req.account_id).unwrap_or(&empty_r);
277            let has_source = state_r.partner_event_sources.contains_key(event_source);
278            drop(accounts_r);
279            if !has_source {
280                return Err(AwsServiceError::aws_error(
281                    StatusCode::BAD_REQUEST,
282                    "ResourceNotFoundException",
283                    format!("Event source {event_source} does not exist."),
284                ));
285            }
286        }
287
288        let mut accounts = self.state.write();
289        let state = accounts.get_or_create(&req.account_id);
290
291        if state.buses.contains_key(&name) {
292            return Err(AwsServiceError::aws_error(
293                StatusCode::BAD_REQUEST,
294                "ResourceAlreadyExistsException",
295                format!("Event bus {name} already exists."),
296            ));
297        }
298
299        let arn = format!(
300            "arn:aws:events:{}:{}:event-bus/{}",
301            req.region, state.account_id, name
302        );
303        let now = Utc::now();
304        let description = body["Description"].as_str().map(|s| s.to_string());
305        let kms_key_identifier = body["KmsKeyIdentifier"].as_str().map(|s| s.to_string());
306        let dead_letter_config = body.get("DeadLetterConfig").cloned();
307
308        let tags = parse_tags(&body);
309
310        let bus = EventBus {
311            name: name.clone(),
312            arn: arn.clone(),
313            tags,
314            policy: None,
315            description,
316            kms_key_identifier,
317            dead_letter_config,
318            creation_time: now,
319            last_modified_time: now,
320        };
321        state.buses.insert(name, bus);
322
323        Ok(AwsResponse::ok_json(json!({ "EventBusArn": arn })))
324    }
325
326    fn delete_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
327        let body = req.json_body();
328        validate_required("Name", &body["Name"])?;
329        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
330        validate_string_length("name", name, 1, 256)?;
331
332        if name == "default" {
333            return Err(AwsServiceError::aws_error(
334                StatusCode::BAD_REQUEST,
335                "ValidationException",
336                format!("Cannot delete event bus {name}."),
337            ));
338        }
339
340        let mut accounts = self.state.write();
341        let state = accounts.get_or_create(&req.account_id);
342        state.buses.remove(name);
343        state.rules.retain(|k, _| k.0 != name);
344
345        Ok(AwsResponse::ok_json(json!({})))
346    }
347
348    fn list_event_buses(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
349        let body = req.json_body();
350        // Smithy ListEventBusesRequest constraints:
351        //   NamePrefix: EventBusName length 1..=256
352        //   NextToken: length 1..=2048
353        //   Limit: LimitMax100 range 1..=100
354        // Unrecognised pagination tokens still fall back to the start of the
355        // list — `InvalidNextTokenException` only fires when the token shape
356        // itself is wrong, not when it points at a vanished cursor.
357        validate_optional_string_length_value("NamePrefix", &body["NamePrefix"], 1, 256)?;
358        validate_optional_string_length_value("NextToken", &body["NextToken"], 1, 2048)?;
359        validate_optional_json_range("Limit", &body["Limit"], 1, 100)?;
360        let name_prefix = body["NamePrefix"].as_str();
361        let limit = body["Limit"].as_i64().unwrap_or(100).clamp(1, 100) as usize;
362
363        let accounts = self.state.read();
364        let empty = EventBridgeState::new(&req.account_id, &req.region);
365        let state = accounts.get(&req.account_id).unwrap_or(&empty);
366        let filtered: Vec<&_> = state
367            .buses
368            .values()
369            .filter(|b| match name_prefix {
370                Some(prefix) => b.name.starts_with(prefix),
371                None => true,
372            })
373            .collect();
374
375        let (page, next_token) = paginate(&filtered, body["NextToken"].as_str(), limit);
376        let buses: Vec<Value> = page
377            .iter()
378            .map(|b| json!({ "Name": b.name, "Arn": b.arn }))
379            .collect();
380        let mut resp = json!({ "EventBuses": buses });
381        if let Some(token) = next_token {
382            resp["NextToken"] = json!(token);
383        }
384
385        Ok(AwsResponse::ok_json(resp))
386    }
387
388    fn describe_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
389        let body = req.json_body();
390        validate_optional_string_length("name", body["Name"].as_str(), 1, 1600)?;
391        let name = body["Name"].as_str().unwrap_or("default");
392
393        let accounts = self.state.read();
394        let empty = EventBridgeState::new(&req.account_id, &req.region);
395        let state = accounts.get(&req.account_id).unwrap_or(&empty);
396        let bus = state.buses.get(name).ok_or_else(|| {
397            AwsServiceError::aws_error(
398                StatusCode::BAD_REQUEST,
399                "ResourceNotFoundException",
400                format!("Event bus {name} does not exist."),
401            )
402        })?;
403
404        let mut resp = json!({
405            "Name": bus.name,
406            "Arn": bus.arn,
407            "CreationTime": bus.creation_time.timestamp() as f64,
408            "LastModifiedTime": bus.last_modified_time.timestamp() as f64,
409        });
410
411        if let Some(ref policy) = bus.policy {
412            resp["Policy"] = Value::String(serde_json::to_string(policy).unwrap());
413        }
414        if let Some(ref desc) = bus.description {
415            resp["Description"] = json!(desc);
416        }
417        if let Some(ref kms) = bus.kms_key_identifier {
418            resp["KmsKeyIdentifier"] = json!(kms);
419        }
420        if let Some(ref dlc) = bus.dead_letter_config {
421            resp["DeadLetterConfig"] = dlc.clone();
422        }
423
424        Ok(AwsResponse::ok_json(resp))
425    }
426
427    // ─── Permission Operations ──────────────────────────────────────────
428
429    fn put_permission(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
430        let body = req.json_body();
431        // Smithy PutPermissionRequest constraints (optional members but each
432        // carries a `@length` trait that must be honoured when present):
433        //   EventBusName: NonPartnerEventBusName length 1..=256
434        //   Action: length 1..=64
435        //   Principal: length 1..=12 (12-digit AWS account or `*`)
436        //   StatementId: length 1..=64
437        validate_optional_string_length_value("EventBusName", &body["EventBusName"], 1, 256)?;
438        validate_optional_string_length_value("Action", &body["Action"], 1, 64)?;
439        validate_optional_string_length_value("Principal", &body["Principal"], 1, 12)?;
440        validate_optional_string_length_value("StatementId", &body["StatementId"], 1, 64)?;
441        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
442
443        let mut accounts = self.state.write();
444        let state = accounts.get_or_create(&req.account_id);
445
446        let bus = state.buses.get_mut(event_bus_name).ok_or_else(|| {
447            AwsServiceError::aws_error(
448                StatusCode::BAD_REQUEST,
449                "ResourceNotFoundException",
450                format!("Event bus {event_bus_name} does not exist."),
451            )
452        })?;
453
454        // Check if Policy is provided (new-style)
455        if let Some(policy_str) = body["Policy"].as_str() {
456            if let Ok(policy) = serde_json::from_str::<Value>(policy_str) {
457                bus.policy = Some(policy);
458                return Ok(AwsResponse::ok_json(json!({})));
459            }
460        }
461
462        // Old-style: Action, Principal, StatementId. All are @optional in the
463        // Smithy model; non-string values were already rejected above with
464        // SerializationException, so reaching here means each is either a
465        // valid string or absent. Fall back to "" to preserve current behavior
466        // — recording an empty-statement policy entry is harmless since it can
467        // never match a real action/principal pair.
468        let action = body["Action"].as_str().unwrap_or("");
469        let principal = body["Principal"].as_str().unwrap_or("");
470        let statement_id = body["StatementId"].as_str().unwrap_or("");
471
472        // Note: real AWS does enforce a small allow-list on `Action`, but
473        // PutPermission's Smithy model only declares ResourceNotFoundException,
474        // PolicyLengthExceededException, ConcurrentModificationException,
475        // OperationDisabledException, and InternalException. We accept any
476        // action string and just record the statement.
477        let statement = json!({
478            "Sid": statement_id,
479            "Effect": "Allow",
480            "Principal": { "AWS": Arn::global("iam", principal, "root").to_string() },
481            "Action": action,
482            "Resource": bus.arn,
483        });
484
485        let policy = bus.policy.get_or_insert_with(|| {
486            json!({
487                "Version": "2012-10-17",
488                "Statement": [],
489            })
490        });
491
492        if let Some(stmts) = policy["Statement"].as_array_mut() {
493            stmts.push(statement);
494        }
495
496        Ok(AwsResponse::ok_json(json!({})))
497    }
498
499    fn remove_permission(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
500        let body = req.json_body();
501        validate_optional_string_length("statementId", body["StatementId"].as_str(), 1, 64)?;
502        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 256)?;
503        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
504        let statement_id = body["StatementId"].as_str().unwrap_or("");
505        let remove_all = body["RemoveAllPermissions"].as_bool().unwrap_or(false);
506
507        let mut accounts = self.state.write();
508        let state = accounts.get_or_create(&req.account_id);
509
510        let bus = state.buses.get_mut(event_bus_name).ok_or_else(|| {
511            AwsServiceError::aws_error(
512                StatusCode::BAD_REQUEST,
513                "ResourceNotFoundException",
514                format!("Event bus {event_bus_name} does not exist."),
515            )
516        })?;
517
518        if remove_all {
519            bus.policy = None;
520            return Ok(AwsResponse::ok_json(json!({})));
521        }
522
523        let policy = bus.policy.as_mut().ok_or_else(|| {
524            AwsServiceError::aws_error(
525                StatusCode::BAD_REQUEST,
526                "ResourceNotFoundException",
527                "EventBus does not have a policy.",
528            )
529        })?;
530
531        if let Some(stmts) = policy["Statement"].as_array_mut() {
532            let before = stmts.len();
533            stmts.retain(|s| s["Sid"].as_str() != Some(statement_id));
534            if stmts.len() == before {
535                return Err(AwsServiceError::aws_error(
536                    StatusCode::BAD_REQUEST,
537                    "ResourceNotFoundException",
538                    "Statement with the provided id does not exist.",
539                ));
540            }
541            if stmts.is_empty() {
542                bus.policy = None;
543            }
544        }
545
546        Ok(AwsResponse::ok_json(json!({})))
547    }
548
549    // ─── Rule Operations ────────────────────────────────────────────────
550
551    fn put_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
552        let body = req.json_body();
553        // Smithy PutRuleRequest constraints:
554        //   Name: RuleName length 1..=64, @required
555        //   ScheduleExpression: length 0..=256
556        //   EventPattern: length 0..=4096 (raw JSON, separate
557        //     InvalidEventPatternException still applies to syntax)
558        //   State: RuleState enum {ENABLED, DISABLED,
559        //          ENABLED_WITH_ALL_CLOUDTRAIL_MANAGEMENT_EVENTS}
560        //   Description: RuleDescription length 0..=512
561        //   RoleArn: length 1..=1600
562        //   EventBusName: EventBusNameOrArn length 1..=1600
563        validate_required("Name", &body["Name"])?;
564        let name = body["Name"]
565            .as_str()
566            .ok_or_else(|| missing("Name"))?
567            .to_string();
568        validate_string_length("Name", &name, 1, 64)?;
569        validate_optional_string_length_value(
570            "ScheduleExpression",
571            &body["ScheduleExpression"],
572            0,
573            256,
574        )?;
575        validate_optional_string_length_value("EventPattern", &body["EventPattern"], 0, 4096)?;
576        validate_optional_enum_value(
577            "State",
578            &body["State"],
579            &[
580                "ENABLED",
581                "DISABLED",
582                "ENABLED_WITH_ALL_CLOUDTRAIL_MANAGEMENT_EVENTS",
583            ],
584        )?;
585        validate_optional_string_length_value("Description", &body["Description"], 0, 512)?;
586        validate_optional_string_length_value("RoleArn", &body["RoleArn"], 1, 1600)?;
587        validate_optional_string_length_value("EventBusName", &body["EventBusName"], 1, 1600)?;
588
589        let raw_bus = body["EventBusName"]
590            .as_str()
591            .unwrap_or("default")
592            .to_string();
593
594        let mut accounts = self.state.write();
595        let state = accounts.get_or_create(&req.account_id);
596        let event_bus_name = state.resolve_bus_name(&raw_bus);
597
598        let event_pattern = body["EventPattern"].as_str().and_then(|s| {
599            if s.is_empty() {
600                None
601            } else {
602                Some(s.to_string())
603            }
604        });
605        let schedule_expression = body["ScheduleExpression"].as_str().and_then(|s| {
606            if s.is_empty() {
607                None
608            } else {
609                Some(s.to_string())
610            }
611        });
612        let description = body["Description"].as_str().map(|s| s.to_string());
613        let role_arn = body["RoleArn"].as_str().map(|s| s.to_string());
614        let rule_state = body["State"].as_str().unwrap_or("ENABLED").to_string();
615
616        // Note: real AWS rejects ScheduleExpression on a non-default bus, but
617        // PutRule's Smithy model only declares InvalidEventPatternException
618        // for input-shape problems, not ValidationException. We accept the
619        // value and let the scheduler ignore it on non-default buses.
620
621        if !state.buses.contains_key(&event_bus_name) {
622            return Err(AwsServiceError::aws_error(
623                StatusCode::BAD_REQUEST,
624                "ResourceNotFoundException",
625                format!("Event bus {event_bus_name} does not exist."),
626            ));
627        }
628
629        let arn = if event_bus_name == "default" {
630            format!(
631                "arn:aws:events:{}:{}:rule/{}",
632                req.region, state.account_id, name
633            )
634        } else {
635            format!(
636                "arn:aws:events:{}:{}:rule/{}/{}",
637                req.region, state.account_id, event_bus_name, name
638            )
639        };
640
641        let key = (event_bus_name.clone(), name.clone());
642        let targets = state
643            .rules
644            .get(&key)
645            .map(|r| r.targets.clone())
646            .unwrap_or_default();
647
648        let tags = parse_tags(&body);
649
650        let rule = EventRule {
651            name: name.clone(),
652            arn: arn.clone(),
653            event_bus_name,
654            event_pattern,
655            schedule_expression,
656            state: rule_state,
657            description,
658            role_arn,
659            managed_by: None,
660            created_by: None,
661            targets,
662            tags,
663            last_fired: None,
664        };
665
666        state.rules.insert(key, rule);
667        Ok(AwsResponse::ok_json(json!({ "RuleArn": arn })))
668    }
669
670    fn delete_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
671        let body = req.json_body();
672        validate_required("Name", &body["Name"])?;
673        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
674        validate_string_length("name", name, 1, 64)?;
675        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
676        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
677
678        let mut accounts = self.state.write();
679        let state = accounts.get_or_create(&req.account_id);
680        let bus_name = state.resolve_bus_name(event_bus_name);
681        let key = (bus_name, name.to_string());
682
683        // Check if rule has targets
684        if let Some(rule) = state.rules.get(&key) {
685            if !rule.targets.is_empty() {
686                return Err(AwsServiceError::aws_error(
687                    StatusCode::BAD_REQUEST,
688                    "ValidationException",
689                    "Rule can't be deleted since it has targets.",
690                ));
691            }
692        }
693
694        state.rules.remove(&key);
695        Ok(AwsResponse::ok_json(json!({})))
696    }
697
698    fn list_rules(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
699        let body = req.json_body();
700        validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 64)?;
701        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
702        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
703        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
704        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
705        let name_prefix = body["NamePrefix"].as_str();
706        let limit = body["Limit"].as_u64().map(|n| n as usize);
707        let next_token = body["NextToken"].as_str();
708
709        let accounts = self.state.read();
710        let empty = EventBridgeState::new(&req.account_id, &req.region);
711        let state = accounts.get(&req.account_id).unwrap_or(&empty);
712        let bus_name = state.resolve_bus_name(event_bus_name);
713
714        let mut rules: Vec<&EventRule> = state
715            .rules
716            .values()
717            .filter(|r| r.event_bus_name == bus_name)
718            .filter(|r| match name_prefix {
719                Some(prefix) => r.name.starts_with(prefix),
720                None => true,
721            })
722            .collect();
723        rules.sort_by(|a, b| a.name.cmp(&b.name));
724
725        // Pagination
726        let start = next_token
727            .and_then(|t| t.parse::<usize>().ok())
728            .unwrap_or(0)
729            .min(rules.len());
730        let rules_slice = &rules[start..];
731
732        let (page, new_next_token) = if let Some(lim) = limit {
733            if rules_slice.len() > lim {
734                (&rules_slice[..lim], Some((start + lim).to_string()))
735            } else {
736                (rules_slice, None)
737            }
738        } else {
739            (rules_slice, None)
740        };
741
742        let rules_json: Vec<Value> = page
743            .iter()
744            .map(|r| {
745                let mut obj = json!({
746                    "Name": r.name,
747                    "Arn": r.arn,
748                    "EventBusName": r.event_bus_name,
749                    "State": r.state,
750                });
751                if let Some(ref desc) = r.description {
752                    obj["Description"] = json!(desc);
753                }
754                if let Some(ref ep) = r.event_pattern {
755                    obj["EventPattern"] = json!(ep);
756                }
757                if let Some(ref se) = r.schedule_expression {
758                    obj["ScheduleExpression"] = json!(se);
759                }
760                if let Some(ref role) = r.role_arn {
761                    obj["RoleArn"] = json!(role);
762                }
763                if let Some(ref mb) = r.managed_by {
764                    obj["ManagedBy"] = json!(mb);
765                }
766                obj
767            })
768            .collect();
769
770        let mut resp = json!({ "Rules": rules_json });
771        if let Some(token) = new_next_token {
772            resp["NextToken"] = json!(token);
773        }
774
775        Ok(AwsResponse::ok_json(resp))
776    }
777
778    fn describe_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
779        let body = req.json_body();
780        validate_required("Name", &body["Name"])?;
781        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
782        validate_string_length("name", name, 1, 64)?;
783        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
784        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
785
786        let accounts = self.state.read();
787        let empty = EventBridgeState::new(&req.account_id, &req.region);
788        let state = accounts.get(&req.account_id).unwrap_or(&empty);
789        let bus_name = state.resolve_bus_name(event_bus_name);
790        let key = (bus_name.clone(), name.to_string());
791
792        let rule = state.rules.get(&key).ok_or_else(|| {
793            AwsServiceError::aws_error(
794                StatusCode::BAD_REQUEST,
795                "ResourceNotFoundException",
796                format!("Rule {name} does not exist."),
797            )
798        })?;
799
800        let mut resp = json!({
801            "Name": rule.name,
802            "Arn": rule.arn,
803            "EventBusName": rule.event_bus_name,
804            "State": rule.state,
805        });
806
807        if let Some(ref desc) = rule.description {
808            resp["Description"] = json!(desc);
809        }
810        if let Some(ref ep) = rule.event_pattern {
811            resp["EventPattern"] = json!(ep);
812        }
813        if let Some(ref se) = rule.schedule_expression {
814            resp["ScheduleExpression"] = json!(se);
815        }
816        if let Some(ref role) = rule.role_arn {
817            resp["RoleArn"] = json!(role);
818        }
819        if let Some(ref mb) = rule.managed_by {
820            resp["ManagedBy"] = json!(mb);
821        }
822        if let Some(ref cb) = rule.created_by {
823            resp["CreatedBy"] = json!(cb);
824        }
825        // If non-default bus, set CreatedBy to account_id
826        if rule.event_bus_name != "default" && rule.created_by.is_none() {
827            resp["CreatedBy"] = json!(state.account_id);
828        }
829
830        Ok(AwsResponse::ok_json(resp))
831    }
832
833    fn enable_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
834        let body = req.json_body();
835        validate_required("Name", &body["Name"])?;
836        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
837        validate_string_length("name", name, 1, 64)?;
838        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
839        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
840
841        let mut accounts = self.state.write();
842        let state = accounts.get_or_create(&req.account_id);
843        let bus_name = state.resolve_bus_name(event_bus_name);
844        let key = (bus_name, name.to_string());
845
846        let rule = state.rules.get_mut(&key).ok_or_else(|| {
847            AwsServiceError::aws_error(
848                StatusCode::BAD_REQUEST,
849                "ResourceNotFoundException",
850                format!("Rule {name} does not exist."),
851            )
852        })?;
853
854        rule.state = "ENABLED".to_string();
855        Ok(AwsResponse::ok_json(json!({})))
856    }
857
858    fn disable_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
859        let body = req.json_body();
860        validate_required("Name", &body["Name"])?;
861        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
862        validate_string_length("name", name, 1, 64)?;
863        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
864        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
865
866        let mut accounts = self.state.write();
867        let state = accounts.get_or_create(&req.account_id);
868        let bus_name = state.resolve_bus_name(event_bus_name);
869        let key = (bus_name, name.to_string());
870
871        let rule = state.rules.get_mut(&key).ok_or_else(|| {
872            AwsServiceError::aws_error(
873                StatusCode::BAD_REQUEST,
874                "ResourceNotFoundException",
875                format!("Rule {name} does not exist."),
876            )
877        })?;
878
879        rule.state = "DISABLED".to_string();
880        Ok(AwsResponse::ok_json(json!({})))
881    }
882
883    // ─── Target Operations ──────────────────────────────────────────────
884
885    fn put_targets(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
886        let body = req.json_body();
887        // Smithy PutTargetsRequest constraints (top-level shape):
888        //   Rule: RuleName @required length 1..=64
889        //   EventBusName: EventBusNameOrArn length 1..=1600
890        //   Targets: TargetList @required length 1..=100
891        // Per-target validation still flows through `FailedEntries` (matching
892        // AWS); only the top-level shape produces ValidationException.
893        validate_required("Rule", &body["Rule"])?;
894        let rule_name = body["Rule"].as_str().ok_or_else(|| missing("Rule"))?;
895        validate_string_length("Rule", rule_name, 1, 64)?;
896        validate_optional_string_length_value("EventBusName", &body["EventBusName"], 1, 1600)?;
897        // Targets is @required with TargetList length 1..=100 in the Smithy
898        // model. Real AWS rejects empty / oversized lists with a
899        // ValidationException; per-target validation continues to flow
900        // through FailedEntries (matching AWS).
901        validate_required("Targets", &body["Targets"])?;
902        let targets_array = body["Targets"].as_array().ok_or_else(|| {
903            AwsServiceError::aws_error(
904                StatusCode::BAD_REQUEST,
905                "ValidationException",
906                "Targets must be a list",
907            )
908        })?;
909        if targets_array.is_empty() || targets_array.len() > 100 {
910            return Err(AwsServiceError::aws_error(
911                StatusCode::BAD_REQUEST,
912                "ValidationException",
913                "Value at 'Targets' failed to satisfy constraint: \
914                 Member must have length between 1 and 100",
915            ));
916        }
917        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
918        let targets: Vec<Value> = targets_array.clone();
919
920        let mut accounts = self.state.write();
921        let state = accounts.get_or_create(&req.account_id);
922        let bus_name = state.resolve_bus_name(event_bus_name);
923        let key = (bus_name.clone(), rule_name.to_string());
924
925        let rule = state.rules.get_mut(&key).ok_or_else(|| {
926            AwsServiceError::aws_error(
927                StatusCode::BAD_REQUEST,
928                "ResourceNotFoundException",
929                format!("Rule {rule_name} does not exist on EventBus {bus_name}."),
930            )
931        })?;
932
933        let mut failed_entries: Vec<Value> = Vec::new();
934        for target in &targets {
935            let target_id = target["Id"].as_str().unwrap_or("").to_string();
936            let target_arn = target["Arn"].as_str().unwrap_or("");
937
938            if target_arn.ends_with(".fifo") && target.get("SqsParameters").is_none() {
939                failed_entries.push(json!({
940                    "TargetId": target_id,
941                    "ErrorCode": "ValidationException",
942                    "ErrorMessage": format!(
943                        "Parameter(s) SqsParameters must be specified for target: {target_id}."
944                    ),
945                }));
946                continue;
947            }
948            if !target_arn.starts_with("arn:") {
949                failed_entries.push(json!({
950                    "TargetId": target_id,
951                    "ErrorCode": "ValidationException",
952                    "ErrorMessage": format!(
953                        "Parameter {target_arn} is not valid. Reason: Provided Arn is not in correct format."
954                    ),
955                }));
956                continue;
957            }
958
959            let et = parse_target(target);
960            rule.targets.retain(|t| t.id != et.id);
961            rule.targets.push(et);
962        }
963
964        Ok(AwsResponse::ok_json(json!({
965            "FailedEntryCount": failed_entries.len(),
966            "FailedEntries": failed_entries,
967        })))
968    }
969
970    fn remove_targets(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
971        let body = req.json_body();
972        validate_required("Rule", &body["Rule"])?;
973        let rule_name = body["Rule"].as_str().ok_or_else(|| missing("Rule"))?;
974        validate_string_length("rule", rule_name, 1, 64)?;
975        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
976        validate_required("Ids", &body["Ids"])?;
977        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
978        let ids = body["Ids"].as_array().ok_or_else(|| missing("Ids"))?;
979
980        let target_ids: Vec<String> = ids
981            .iter()
982            .filter_map(|v| v.as_str().map(|s| s.to_string()))
983            .collect();
984
985        let mut accounts = self.state.write();
986        let state = accounts.get_or_create(&req.account_id);
987        let bus_name = state.resolve_bus_name(event_bus_name);
988        let key = (bus_name.clone(), rule_name.to_string());
989
990        let rule = state.rules.get_mut(&key).ok_or_else(|| {
991            AwsServiceError::aws_error(
992                StatusCode::BAD_REQUEST,
993                "ResourceNotFoundException",
994                format!("Rule {rule_name} does not exist on EventBus {bus_name}."),
995            )
996        })?;
997
998        rule.targets.retain(|t| !target_ids.contains(&t.id));
999
1000        Ok(AwsResponse::ok_json(json!({
1001            "FailedEntryCount": 0,
1002            "FailedEntries": [],
1003        })))
1004    }
1005
1006    fn list_targets_by_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1007        let body = req.json_body();
1008        validate_required("Rule", &body["Rule"])?;
1009        let rule_name = body["Rule"].as_str().ok_or_else(|| missing("Rule"))?;
1010        validate_string_length("rule", rule_name, 1, 64)?;
1011        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
1012        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1013        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1014        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
1015        let limit = body["Limit"].as_u64().map(|n| n as usize);
1016        let next_token = body["NextToken"].as_str();
1017
1018        let accounts = self.state.read();
1019        let empty = EventBridgeState::new(&req.account_id, &req.region);
1020        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1021        let bus_name = state.resolve_bus_name(event_bus_name);
1022        let key = (bus_name, rule_name.to_string());
1023
1024        let rule = state.rules.get(&key).ok_or_else(|| {
1025            AwsServiceError::aws_error(
1026                StatusCode::BAD_REQUEST,
1027                "ResourceNotFoundException",
1028                format!("Rule {rule_name} does not exist."),
1029            )
1030        })?;
1031
1032        let all_targets = &rule.targets;
1033        let start = next_token
1034            .and_then(|t| t.parse::<usize>().ok())
1035            .unwrap_or(0)
1036            .min(all_targets.len());
1037        let slice = &all_targets[start..];
1038
1039        let (page, new_next_token) = if let Some(lim) = limit {
1040            if slice.len() > lim {
1041                (&slice[..lim], Some((start + lim).to_string()))
1042            } else {
1043                (slice, None)
1044            }
1045        } else {
1046            (slice, None)
1047        };
1048
1049        let targets: Vec<Value> = page.iter().map(target_to_json).collect();
1050
1051        let mut resp = json!({ "Targets": targets });
1052        if let Some(token) = new_next_token {
1053            resp["NextToken"] = json!(token);
1054        }
1055
1056        Ok(AwsResponse::ok_json(resp))
1057    }
1058
1059    fn list_rule_names_by_target(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1060        let body = req.json_body();
1061        validate_required("TargetArn", &body["TargetArn"])?;
1062        let target_arn = body["TargetArn"]
1063            .as_str()
1064            .ok_or_else(|| missing("TargetArn"))?;
1065        validate_string_length("targetArn", target_arn, 1, 1600)?;
1066        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
1067        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1068        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1069        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
1070        let limit = body["Limit"].as_u64().map(|n| n as usize);
1071        let next_token = body["NextToken"].as_str();
1072
1073        let accounts = self.state.read();
1074        let empty = EventBridgeState::new(&req.account_id, &req.region);
1075        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1076        let bus_name = state.resolve_bus_name(event_bus_name);
1077
1078        // Deduplicate rule names
1079        let mut rule_names: Vec<String> = Vec::new();
1080        for rule in state.rules.values() {
1081            if rule.event_bus_name == bus_name
1082                && rule.targets.iter().any(|t| t.arn == target_arn)
1083                && !rule_names.contains(&rule.name)
1084            {
1085                rule_names.push(rule.name.clone());
1086            }
1087        }
1088        rule_names.sort();
1089
1090        let start = next_token
1091            .and_then(|t| t.parse::<usize>().ok())
1092            .unwrap_or(0)
1093            .min(rule_names.len());
1094        let slice = &rule_names[start..];
1095
1096        let (page, new_next_token) = if let Some(lim) = limit {
1097            if slice.len() > lim {
1098                (&slice[..lim], Some((start + lim).to_string()))
1099            } else {
1100                (slice, None)
1101            }
1102        } else {
1103            (slice, None)
1104        };
1105
1106        let mut resp = json!({ "RuleNames": page });
1107        if let Some(token) = new_next_token {
1108            resp["NextToken"] = json!(token);
1109        }
1110
1111        Ok(AwsResponse::ok_json(resp))
1112    }
1113
1114    // ─── Partner Event Sources ────────────���───────────────────────────
1115
1116    fn test_event_pattern(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1117        let body = req.json_body();
1118        validate_required("EventPattern", &body["EventPattern"])?;
1119        validate_required("Event", &body["Event"])?;
1120        let event_pattern = body["EventPattern"]
1121            .as_str()
1122            .ok_or_else(|| missing("EventPattern"))?;
1123        let event_str = body["Event"].as_str().ok_or_else(|| missing("Event"))?;
1124
1125        // Parse the event JSON
1126        let event: Value = serde_json::from_str(event_str).map_err(|_| {
1127            AwsServiceError::aws_error(
1128                StatusCode::BAD_REQUEST,
1129                "InvalidEventPatternException",
1130                "Event is not valid JSON.",
1131            )
1132        })?;
1133
1134        // Parse the pattern JSON
1135        let _pattern: Value = serde_json::from_str(event_pattern).map_err(|_| {
1136            AwsServiceError::aws_error(
1137                StatusCode::BAD_REQUEST,
1138                "InvalidEventPatternException",
1139                "Event pattern is not valid JSON.",
1140            )
1141        })?;
1142
1143        let source = event["source"].as_str().unwrap_or("");
1144        let detail_type = event["detail-type"].as_str().unwrap_or("");
1145        let detail = event
1146            .get("detail")
1147            .map(|v| serde_json::to_string(v).unwrap_or_default())
1148            .unwrap_or_else(|| "{}".to_string());
1149        let account = event["account"].as_str().unwrap_or("");
1150        let region = event["region"].as_str().unwrap_or("");
1151        let resources: Vec<String> = event["resources"]
1152            .as_array()
1153            .map(|arr| {
1154                arr.iter()
1155                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
1156                    .collect()
1157            })
1158            .unwrap_or_default();
1159
1160        let result = matches_pattern(
1161            Some(event_pattern),
1162            source,
1163            detail_type,
1164            &detail,
1165            account,
1166            region,
1167            &resources,
1168        );
1169
1170        Ok(AwsResponse::ok_json(json!({ "Result": result })))
1171    }
1172
1173    // ─── UpdateEventBus ─────────────────────────────────────────────────
1174
1175    fn update_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1176        let body = req.json_body();
1177        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
1178        validate_optional_string_length(
1179            "kmsKeyIdentifier",
1180            body["KmsKeyIdentifier"].as_str(),
1181            0,
1182            2048,
1183        )?;
1184        let name = body["Name"].as_str().unwrap_or("default");
1185
1186        let mut accounts = self.state.write();
1187        let state = accounts.get_or_create(&req.account_id);
1188        let bus = state.buses.get_mut(name).ok_or_else(|| {
1189            AwsServiceError::aws_error(
1190                StatusCode::BAD_REQUEST,
1191                "ResourceNotFoundException",
1192                format!("Event bus {name} does not exist."),
1193            )
1194        })?;
1195
1196        if let Some(desc) = body["Description"].as_str() {
1197            bus.description = Some(desc.to_string());
1198        }
1199        if let Some(kms) = body["KmsKeyIdentifier"].as_str() {
1200            bus.kms_key_identifier = Some(kms.to_string());
1201        }
1202        if let Some(dlc) = body.get("DeadLetterConfig") {
1203            bus.dead_letter_config = Some(dlc.clone());
1204        }
1205        bus.last_modified_time = Utc::now();
1206
1207        let arn = bus.arn.clone();
1208        let bus_name = bus.name.clone();
1209
1210        Ok(AwsResponse::ok_json(json!({
1211            "Arn": arn,
1212            "Name": bus_name,
1213        })))
1214    }
1215
1216    // ─── Endpoint Operations ────────────────────────────────────────────
1217
1218    fn put_events(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1219        let body = req.json_body();
1220        // Smithy PutEventsRequest constraints:
1221        //   EndpointId: length 1..=50
1222        //   Entries: @required, length 1..=10
1223        validate_optional_string_length_value("EndpointId", &body["EndpointId"], 1, 50)?;
1224        validate_required("Entries", &body["Entries"])?;
1225        let entries_array = body["Entries"].as_array().ok_or_else(|| {
1226            AwsServiceError::aws_error(
1227                StatusCode::BAD_REQUEST,
1228                "ValidationException",
1229                "Entries must be a list",
1230            )
1231        })?;
1232        if entries_array.is_empty() || entries_array.len() > 10 {
1233            return Err(AwsServiceError::aws_error(
1234                StatusCode::BAD_REQUEST,
1235                "ValidationException",
1236                "Value at 'Entries' failed to satisfy constraint: \
1237                 Member must have length between 1 and 10",
1238            ));
1239        }
1240        let entries: Vec<Value> = entries_array.clone();
1241        let entries = &entries;
1242
1243        let mut accounts = self.state.write();
1244        let state = accounts.get_or_create(&req.account_id);
1245        let mut result_entries = Vec::new();
1246        let mut events_to_deliver = Vec::new();
1247        let mut failed_count = 0;
1248
1249        for entry in entries {
1250            let source = entry["Source"].as_str().unwrap_or("").to_string();
1251            let detail_type = entry["DetailType"].as_str().unwrap_or("").to_string();
1252            let detail = entry["Detail"].as_str().unwrap_or("").to_string();
1253
1254            if let Err(error) = validate_put_events_entry(&source, &detail_type, &detail) {
1255                failed_count += 1;
1256                result_entries.push(error);
1257                continue;
1258            }
1259
1260            let event_id = uuid::Uuid::new_v4().to_string();
1261            let raw_bus = entry["EventBusName"]
1262                .as_str()
1263                .unwrap_or("default")
1264                .to_string();
1265            let event_bus_name = state.resolve_bus_name(&raw_bus);
1266
1267            // Bus resource-policy gate. AWS evaluates the bus's
1268            // resource policy against cross-account callers; same-account
1269            // callers always have access. The policy itself is JSON
1270            // stored as serde_json::Value so the IAM evaluator parses
1271            // it the same way it parses an S3 bucket policy.
1272            let caller_account = req
1273                .principal
1274                .as_ref()
1275                .map(|p| p.account_id.as_str())
1276                .unwrap_or(req.account_id.as_str());
1277            if caller_account != req.account_id {
1278                let bus_policy_value = state
1279                    .buses
1280                    .get(&event_bus_name)
1281                    .and_then(|b| b.policy.clone());
1282                if let Some(policy_value) = bus_policy_value {
1283                    let policy_json = serde_json::to_string(&policy_value).unwrap_or_default();
1284                    let policy_doc = fakecloud_iam::evaluator::PolicyDocument::parse(&policy_json);
1285                    let bus_arn = state
1286                        .buses
1287                        .get(&event_bus_name)
1288                        .map(|b| b.arn.clone())
1289                        .unwrap_or_default();
1290                    let principal =
1291                        req.principal
1292                            .clone()
1293                            .unwrap_or_else(|| fakecloud_core::auth::Principal {
1294                                arn: Arn::global("iam", caller_account, "root").to_string(),
1295                                user_id: caller_account.to_string(),
1296                                account_id: caller_account.to_string(),
1297                                principal_type: fakecloud_core::auth::PrincipalType::Root,
1298                                source_identity: None,
1299                                tags: None,
1300                            });
1301                    let context = fakecloud_iam::evaluator::RequestContext {
1302                        aws_principal_arn: Some(principal.arn.clone()),
1303                        aws_principal_account: Some(principal.account_id.clone()),
1304                        ..Default::default()
1305                    };
1306                    let eval_req = fakecloud_iam::evaluator::EvalRequest {
1307                        principal: &principal,
1308                        action: "events:PutEvents".to_string(),
1309                        resource: bus_arn,
1310                        context,
1311                    };
1312                    let decision = fakecloud_iam::evaluator::evaluate_resource_policy_only(
1313                        &policy_doc,
1314                        &eval_req,
1315                    );
1316                    if !matches!(decision, fakecloud_iam::evaluator::Decision::Allow) {
1317                        failed_count += 1;
1318                        result_entries.push(json!({
1319                            "ErrorCode": "AccessDeniedException",
1320                            "ErrorMessage": format!(
1321                                "User '{}' is not authorized to put events on event bus '{}'",
1322                                principal.arn, event_bus_name
1323                            ),
1324                        }));
1325                        continue;
1326                    }
1327                }
1328            }
1329
1330            let time = parse_put_events_time(&entry["Time"]);
1331            let resources: Vec<String> = entry["Resources"]
1332                .as_array()
1333                .map(|arr| {
1334                    arr.iter()
1335                        .filter_map(|v| v.as_str().map(|s| s.to_string()))
1336                        .collect()
1337                })
1338                .unwrap_or_default();
1339
1340            let event = PutEvent {
1341                event_id: event_id.clone(),
1342                source: source.clone(),
1343                detail_type: detail_type.clone(),
1344                detail: detail.clone(),
1345                event_bus_name: event_bus_name.clone(),
1346                time,
1347                resources: resources.clone(),
1348            };
1349
1350            archive_matching_event(
1351                state,
1352                &event,
1353                &event_bus_name,
1354                &source,
1355                &detail_type,
1356                &detail,
1357                &req.account_id,
1358                &req.region,
1359                &resources,
1360            );
1361
1362            state.events.push(event);
1363
1364            // Find matching rules and their targets
1365            let matching_targets: Vec<EventTarget> = state
1366                .rules
1367                .values()
1368                .filter(|r| {
1369                    r.event_bus_name == event_bus_name
1370                        && r.state == "ENABLED"
1371                        && matches_pattern(
1372                            r.event_pattern.as_deref(),
1373                            &source,
1374                            &detail_type,
1375                            &detail,
1376                            &req.account_id,
1377                            &req.region,
1378                            &resources,
1379                        )
1380                })
1381                .flat_map(|r| r.targets.clone())
1382                .collect();
1383
1384            if !matching_targets.is_empty() {
1385                events_to_deliver.push((
1386                    event_id.clone(),
1387                    source,
1388                    detail_type,
1389                    detail,
1390                    time,
1391                    resources,
1392                    matching_targets,
1393                ));
1394            }
1395
1396            result_entries.push(json!({ "EventId": event_id }));
1397        }
1398
1399        // Drop the lock before delivering
1400        drop(accounts);
1401
1402        // Deliver to targets — single-target dispatch lives in the
1403        // shared helper so cross-service callers (delivery.rs) honor the
1404        // same target shape (SQS/SNS/Lambda/Logs/Kinesis/StepFunctions/
1405        // ApiDestination/HTTP) and the same InputTransformer rules.
1406        for (event_id, source, detail_type, detail, time, resources, targets) in events_to_deliver {
1407            let detail_value: Value = serde_json::from_str(&detail).unwrap_or(json!({}));
1408            let event_json = json!({
1409                "version": "0",
1410                "id": event_id,
1411                "source": source,
1412                "account": req.account_id,
1413                "detail-type": detail_type,
1414                "detail": detail_value,
1415                "time": time.format("%Y-%m-%dT%H:%M:%SZ").to_string(),
1416                "region": req.region,
1417                "resources": resources,
1418            });
1419
1420            let ctx = EventDispatchContext {
1421                state: &self.state,
1422                delivery: &self.delivery,
1423                lambda_state: self.lambda_state.as_ref(),
1424                logs_state: self.logs_state.as_ref(),
1425                container_runtime: &self.container_runtime,
1426                account_id: &req.account_id,
1427                region: &req.region,
1428            };
1429            for target in targets {
1430                dispatch_event_target(&ctx, &target, &event_json, &event_id, &detail_type);
1431            }
1432        }
1433
1434        let resp = json!({
1435            "FailedEntryCount": failed_count,
1436            "Entries": result_entries,
1437        });
1438
1439        Ok(AwsResponse::ok_json(resp))
1440    }
1441
1442    // ─── Tagging ────────────────────────────────────────────────────────
1443
1444    fn tag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1445        let body = req.json_body();
1446        validate_required("ResourceARN", &body["ResourceARN"])?;
1447        let arn = body["ResourceARN"]
1448            .as_str()
1449            .ok_or_else(|| missing("ResourceARN"))?;
1450        validate_string_length("resourceARN", arn, 1, 1600)?;
1451        validate_required("Tags", &body["Tags"])?;
1452
1453        let mut accounts = self.state.write();
1454        let state = accounts.get_or_create(&req.account_id);
1455        let tag_map = find_tags_mut(state, arn)?;
1456
1457        fakecloud_core::tags::apply_tags(tag_map, &body, "Tags", "Key", "Value").map_err(|f| {
1458            AwsServiceError::aws_error(
1459                StatusCode::BAD_REQUEST,
1460                "ValidationException",
1461                format!("{f} must be a list"),
1462            )
1463        })?;
1464
1465        Ok(AwsResponse::ok_json(json!({})))
1466    }
1467
1468    fn untag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1469        let body = req.json_body();
1470        validate_required("ResourceARN", &body["ResourceARN"])?;
1471        let arn = body["ResourceARN"]
1472            .as_str()
1473            .ok_or_else(|| missing("ResourceARN"))?;
1474        validate_string_length("resourceARN", arn, 1, 1600)?;
1475        validate_required("TagKeys", &body["TagKeys"])?;
1476
1477        let mut accounts = self.state.write();
1478        let state = accounts.get_or_create(&req.account_id);
1479        let tag_map = find_tags_mut(state, arn)?;
1480
1481        fakecloud_core::tags::remove_tags(tag_map, &body, "TagKeys").map_err(|f| {
1482            AwsServiceError::aws_error(
1483                StatusCode::BAD_REQUEST,
1484                "ValidationException",
1485                format!("{f} must be a list"),
1486            )
1487        })?;
1488
1489        Ok(AwsResponse::ok_json(json!({})))
1490    }
1491
1492    fn list_tags_for_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1493        let body = req.json_body();
1494        validate_required("ResourceARN", &body["ResourceARN"])?;
1495        let arn = body["ResourceARN"]
1496            .as_str()
1497            .ok_or_else(|| missing("ResourceARN"))?;
1498        validate_string_length("resourceARN", arn, 1, 1600)?;
1499
1500        let accounts = self.state.read();
1501        let empty = EventBridgeState::new(&req.account_id, &req.region);
1502        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1503        let tag_map = find_tags(state, arn)?;
1504
1505        let tags = fakecloud_core::tags::tags_to_json(tag_map, "Key", "Value");
1506
1507        Ok(AwsResponse::ok_json(json!({ "Tags": tags })))
1508    }
1509
1510    // ─── Archive Operations ─────────────────────────────────────────────
1511}
1512
1513// ─── Tag Lookup Helpers ─────────────────────────────────────────────────
1514
1515// ─── Event Pattern Validation ────────────────────────────────────────
1516
1517// ─── Connection Auth Params Response Builder ────────────────────────
1518
1519// ─── Event Pattern Matching ─────────────────────────────────────────
1520
1521/// Parsed + validated inputs for `StartReplay`.
1522struct StartReplayInput {
1523    name: String,
1524    description: Option<String>,
1525    event_source_arn: String,
1526    destination: Value,
1527    destination_arn: String,
1528    event_start_time: DateTime<Utc>,
1529    event_end_time: DateTime<Utc>,
1530}
1531
1532impl StartReplayInput {
1533    fn from_body(body: &Value) -> Result<Self, AwsServiceError> {
1534        // StartReplay's Smithy model declares ResourceNotFound,
1535        // ResourceAlreadyExists, InvalidEventPattern, LimitExceeded, and
1536        // Internal — but not ValidationException. Per-field constraints are
1537        // enforced client-side by the SDK; we surface only declared errors
1538        // here. Missing required inputs default to empty strings and the
1539        // downstream bus/archive lookups produce ResourceNotFound for the
1540        // ones that matter.
1541        let name = body["ReplayName"].as_str().unwrap_or("").to_string();
1542        let description = body["Description"].as_str().map(|s| s.to_string());
1543        let event_source_arn = body["EventSourceArn"].as_str().unwrap_or("").to_string();
1544        let destination = body["Destination"].clone();
1545
1546        let event_start_time = body["EventStartTime"]
1547            .as_f64()
1548            .and_then(|f| DateTime::from_timestamp(f as i64, 0))
1549            .unwrap_or_else(Utc::now);
1550        let event_end_time = body["EventEndTime"]
1551            .as_f64()
1552            .and_then(|f| DateTime::from_timestamp(f as i64, 0))
1553            .unwrap_or_else(Utc::now);
1554
1555        let destination_arn = destination["Arn"].as_str().unwrap_or("").to_string();
1556        if !destination_arn.contains(":event-bus/") {
1557            // Missing/invalid destination cannot be resolved to a bus —
1558            // surface as ResourceNotFound rather than the undeclared
1559            // ValidationException.
1560            return Err(AwsServiceError::aws_error(
1561                StatusCode::BAD_REQUEST,
1562                "ResourceNotFoundException",
1563                format!("Destination.Arn {destination_arn} does not point to an event bus."),
1564            ));
1565        }
1566
1567        Ok(Self {
1568            name,
1569            description,
1570            event_source_arn,
1571            destination,
1572            destination_arn,
1573            event_start_time,
1574            event_end_time,
1575        })
1576    }
1577}
1578
1579#[path = "service_archives_replays.rs"]
1580mod service_archives_replays;
1581#[path = "service_connections_apidests.rs"]
1582mod service_connections_apidests;
1583#[path = "service_endpoints.rs"]
1584mod service_endpoints;
1585#[path = "service_partner_sources.rs"]
1586mod service_partner_sources;
1587
1588#[path = "helpers.rs"]
1589mod helpers;
1590pub(crate) use helpers::*;
1591
1592#[cfg(test)]
1593#[path = "service_tests.rs"]
1594mod tests;