Skip to main content

fakecloud_eventbridge/
service.rs

1use async_trait::async_trait;
2use chrono::{DateTime, Utc};
3use http::StatusCode;
4use serde_json::{json, Value};
5
6use std::collections::{BTreeMap, HashMap};
7use std::sync::Arc;
8
9use tokio::sync::Mutex as AsyncMutex;
10
11use fakecloud_aws::arn::Arn;
12use fakecloud_core::delivery::DeliveryBus;
13use fakecloud_core::pagination::paginate;
14use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
15use fakecloud_core::validation::*;
16use fakecloud_persistence::SnapshotStore;
17
18use fakecloud_lambda::runtime::ContainerRuntime;
19use fakecloud_lambda::{LambdaInvocation, SharedLambdaState};
20use fakecloud_logs::SharedLogsState;
21
22use crate::state::{
23    ApiDestination, Archive, Connection, Endpoint, EventBridgeSnapshot, EventBridgeState, EventBus,
24    EventRule, EventTarget, PartnerEventSource, PutEvent, Replay, SharedEventBridgeState,
25    EVENTBRIDGE_SNAPSHOT_SCHEMA_VERSION,
26};
27
28pub struct EventBridgeService {
29    state: SharedEventBridgeState,
30    delivery: Arc<DeliveryBus>,
31    lambda_state: Option<SharedLambdaState>,
32    logs_state: Option<SharedLogsState>,
33    container_runtime: Option<Arc<ContainerRuntime>>,
34    snapshot_store: Option<Arc<dyn SnapshotStore>>,
35    snapshot_lock: Arc<AsyncMutex<()>>,
36}
37
38impl EventBridgeService {
39    pub fn new(state: SharedEventBridgeState, delivery: Arc<DeliveryBus>) -> Self {
40        Self {
41            state,
42            delivery,
43            lambda_state: None,
44            logs_state: None,
45            container_runtime: None,
46            snapshot_store: None,
47            snapshot_lock: Arc::new(AsyncMutex::new(())),
48        }
49    }
50
51    pub fn with_lambda(mut self, lambda_state: SharedLambdaState) -> Self {
52        self.lambda_state = Some(lambda_state);
53        self
54    }
55
56    pub fn with_logs(mut self, logs_state: SharedLogsState) -> Self {
57        self.logs_state = Some(logs_state);
58        self
59    }
60
61    pub fn with_runtime(mut self, runtime: Arc<ContainerRuntime>) -> Self {
62        self.container_runtime = Some(runtime);
63        self
64    }
65
66    pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
67        self.snapshot_store = Some(store);
68        self
69    }
70
71    /// Persist current state as a snapshot. Held across the
72    /// clone-serialize-write sequence to prevent stale-last writes,
73    /// with serde + file I/O offloaded to the blocking pool.
74    async fn save_snapshot(&self) {
75        save_eventbridge_snapshot(
76            &self.state,
77            self.snapshot_store.clone(),
78            &self.snapshot_lock,
79        )
80        .await;
81    }
82
83    /// Build a hook that persists the current EventBridge state when invoked, or
84    /// `None` in memory mode (no snapshot store). The CloudFormation provisioner
85    /// mutates `state` directly and uses this to write a CFN-provisioned
86    /// resource through to disk, the same way a direct mutating API call would.
87    pub fn snapshot_hook(&self) -> Option<fakecloud_persistence::SnapshotHook> {
88        let store = self.snapshot_store.clone()?;
89        let state = self.state.clone();
90        let lock = self.snapshot_lock.clone();
91        Some(Arc::new(move || {
92            let state = state.clone();
93            let store = store.clone();
94            let lock = lock.clone();
95            Box::pin(async move {
96                save_eventbridge_snapshot(&state, Some(store), &lock).await;
97            })
98        }))
99    }
100}
101
102/// Persist the current EventBridge state as a snapshot. Offloads the serde +
103/// blocking file write to the Tokio blocking pool. Noop when `store` is `None`
104/// (memory mode). Shared by `EventBridgeService::save_snapshot` and the
105/// CloudFormation provisioner's post-provision persist hook so both route
106/// through the same serialize-and-write path.
107pub async fn save_eventbridge_snapshot(
108    state: &SharedEventBridgeState,
109    store: Option<Arc<dyn SnapshotStore>>,
110    lock: &AsyncMutex<()>,
111) {
112    let Some(store) = store else {
113        return;
114    };
115    let _guard = lock.lock().await;
116    let snapshot = EventBridgeSnapshot {
117        schema_version: EVENTBRIDGE_SNAPSHOT_SCHEMA_VERSION,
118        accounts: Some(state.read().clone()),
119        state: None,
120    };
121    let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
122        let bytes = serde_json::to_vec(&snapshot)
123            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
124        store.save(&bytes)
125    })
126    .await;
127    match join {
128        Ok(Ok(())) => {}
129        Ok(Err(err)) => tracing::error!(%err, "failed to write eventbridge snapshot"),
130        Err(err) => tracing::error!(%err, "eventbridge snapshot task panicked"),
131    }
132}
133
134#[async_trait]
135impl AwsService for EventBridgeService {
136    fn service_name(&self) -> &str {
137        "events"
138    }
139
140    async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
141        let mutates = is_mutating_action(req.action.as_str());
142        let result = match req.action.as_str() {
143            "CreateEventBus" => self.create_event_bus(&req),
144            "DeleteEventBus" => self.delete_event_bus(&req),
145            "ListEventBuses" => self.list_event_buses(&req),
146            "DescribeEventBus" => self.describe_event_bus(&req),
147            "PutRule" => self.put_rule(&req),
148            "DeleteRule" => self.delete_rule(&req),
149            "ListRules" => self.list_rules(&req),
150            "DescribeRule" => self.describe_rule(&req),
151            "EnableRule" => self.enable_rule(&req),
152            "DisableRule" => self.disable_rule(&req),
153            "PutTargets" => self.put_targets(&req),
154            "RemoveTargets" => self.remove_targets(&req),
155            "ListTargetsByRule" => self.list_targets_by_rule(&req),
156            "ListRuleNamesByTarget" => self.list_rule_names_by_target(&req),
157            "PutEvents" => self.put_events(&req),
158            "PutPermission" => self.put_permission(&req),
159            "RemovePermission" => self.remove_permission(&req),
160            "TagResource" => self.tag_resource(&req),
161            "UntagResource" => self.untag_resource(&req),
162            "ListTagsForResource" => self.list_tags_for_resource(&req),
163            "CreateArchive" => self.create_archive(&req),
164            "DescribeArchive" => self.describe_archive(&req),
165            "ListArchives" => self.list_archives(&req),
166            "UpdateArchive" => self.update_archive(&req),
167            "DeleteArchive" => self.delete_archive(&req),
168            "CreateConnection" => self.create_connection(&req),
169            "DescribeConnection" => self.describe_connection(&req),
170            "ListConnections" => self.list_connections(&req),
171            "UpdateConnection" => self.update_connection(&req),
172            "DeleteConnection" => self.delete_connection(&req),
173            "CreateApiDestination" => self.create_api_destination(&req),
174            "DescribeApiDestination" => self.describe_api_destination(&req),
175            "ListApiDestinations" => self.list_api_destinations(&req),
176            "UpdateApiDestination" => self.update_api_destination(&req),
177            "DeleteApiDestination" => self.delete_api_destination(&req),
178            "StartReplay" => self.start_replay(&req),
179            "DescribeReplay" => self.describe_replay(&req),
180            "ListReplays" => self.list_replays(&req),
181            "CancelReplay" => self.cancel_replay(&req),
182            "CreatePartnerEventSource" => self.create_partner_event_source(&req),
183            "DeletePartnerEventSource" => self.delete_partner_event_source(&req),
184            "DescribePartnerEventSource" => self.describe_partner_event_source(&req),
185            "ListPartnerEventSources" => self.list_partner_event_sources(&req),
186            "ListPartnerEventSourceAccounts" => self.list_partner_event_source_accounts(&req),
187            "ActivateEventSource" => self.activate_event_source(&req),
188            "DeactivateEventSource" => self.deactivate_event_source(&req),
189            "DescribeEventSource" => self.describe_event_source(&req),
190            "ListEventSources" => self.list_event_sources(&req),
191            "PutPartnerEvents" => self.put_partner_events(&req),
192            "TestEventPattern" => self.test_event_pattern(&req),
193            "UpdateEventBus" => self.update_event_bus(&req),
194            "CreateEndpoint" => self.create_endpoint(&req),
195            "DeleteEndpoint" => self.delete_endpoint(&req),
196            "DescribeEndpoint" => self.describe_endpoint(&req),
197            "ListEndpoints" => self.list_endpoints(&req),
198            "UpdateEndpoint" => self.update_endpoint(&req),
199            "DeauthorizeConnection" => self.deauthorize_connection(&req),
200            _ => Err(AwsServiceError::action_not_implemented(
201                "events",
202                &req.action,
203            )),
204        };
205        if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
206            self.save_snapshot().await;
207        }
208        result
209    }
210
211    fn supported_actions(&self) -> &[&str] {
212        &[
213            "CreateEventBus",
214            "DeleteEventBus",
215            "ListEventBuses",
216            "DescribeEventBus",
217            "PutRule",
218            "DeleteRule",
219            "ListRules",
220            "DescribeRule",
221            "EnableRule",
222            "DisableRule",
223            "PutTargets",
224            "RemoveTargets",
225            "ListTargetsByRule",
226            "ListRuleNamesByTarget",
227            "PutEvents",
228            "PutPermission",
229            "RemovePermission",
230            "TagResource",
231            "UntagResource",
232            "ListTagsForResource",
233            "CreateArchive",
234            "DescribeArchive",
235            "ListArchives",
236            "UpdateArchive",
237            "DeleteArchive",
238            "CreateConnection",
239            "DescribeConnection",
240            "ListConnections",
241            "UpdateConnection",
242            "DeleteConnection",
243            "CreateApiDestination",
244            "DescribeApiDestination",
245            "ListApiDestinations",
246            "UpdateApiDestination",
247            "DeleteApiDestination",
248            "StartReplay",
249            "DescribeReplay",
250            "ListReplays",
251            "CancelReplay",
252            "CreatePartnerEventSource",
253            "DeletePartnerEventSource",
254            "DescribePartnerEventSource",
255            "ListPartnerEventSources",
256            "ListPartnerEventSourceAccounts",
257            "ActivateEventSource",
258            "DeactivateEventSource",
259            "DescribeEventSource",
260            "ListEventSources",
261            "PutPartnerEvents",
262            "TestEventPattern",
263            "UpdateEventBus",
264            "CreateEndpoint",
265            "DeleteEndpoint",
266            "DescribeEndpoint",
267            "ListEndpoints",
268            "UpdateEndpoint",
269            "DeauthorizeConnection",
270        ]
271    }
272}
273
274// ─── Event Bus Operations ───────────────────────────────────────────
275impl EventBridgeService {
276    fn create_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
277        let body = req.json_body();
278        validate_required("Name", &body["Name"])?;
279        let name = body["Name"]
280            .as_str()
281            .ok_or_else(|| missing("Name"))?
282            .to_string();
283        validate_string_length("name", &name, 1, 256)?;
284        validate_optional_string_length(
285            "eventSourceName",
286            body["EventSourceName"].as_str(),
287            1,
288            256,
289        )?;
290        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
291        validate_optional_string_length(
292            "kmsKeyIdentifier",
293            body["KmsKeyIdentifier"].as_str(),
294            0,
295            2048,
296        )?;
297
298        // Validate name doesn't contain '/' (unless partner bus)
299        if name.contains('/') && !name.starts_with("aws.partner/") {
300            return Err(AwsServiceError::aws_error(
301                StatusCode::BAD_REQUEST,
302                "ValidationException",
303                "Event bus name must not contain '/'.",
304            ));
305        }
306
307        // Partner event bus validation
308        if name.starts_with("aws.partner/") {
309            let event_source = body["EventSourceName"].as_str().unwrap_or("");
310            let accounts_r = self.state.read();
311            let empty_r = EventBridgeState::new(&req.account_id, &req.region);
312            let state_r = accounts_r.get(&req.account_id).unwrap_or(&empty_r);
313            let has_source = state_r.partner_event_sources.contains_key(event_source);
314            drop(accounts_r);
315            if !has_source {
316                return Err(AwsServiceError::aws_error(
317                    StatusCode::BAD_REQUEST,
318                    "ResourceNotFoundException",
319                    format!("Event source {event_source} does not exist."),
320                ));
321            }
322        }
323
324        let mut accounts = self.state.write();
325        let state = accounts.get_or_create(&req.account_id);
326
327        if state.buses.contains_key(&name) {
328            return Err(AwsServiceError::aws_error(
329                StatusCode::BAD_REQUEST,
330                "ResourceAlreadyExistsException",
331                format!("Event bus {name} already exists."),
332            ));
333        }
334
335        let arn = format!(
336            "arn:aws:events:{}:{}:event-bus/{}",
337            req.region, state.account_id, name
338        );
339        let now = Utc::now();
340        let description = body["Description"].as_str().map(|s| s.to_string());
341        let kms_key_identifier = body["KmsKeyIdentifier"].as_str().map(|s| s.to_string());
342        let dead_letter_config = body.get("DeadLetterConfig").cloned();
343
344        let tags = parse_tags(&body);
345
346        let bus = EventBus {
347            name: name.clone(),
348            arn: arn.clone(),
349            tags,
350            policy: None,
351            description,
352            kms_key_identifier,
353            dead_letter_config,
354            creation_time: now,
355            last_modified_time: now,
356        };
357        state.buses.insert(name, bus);
358
359        Ok(AwsResponse::ok_json(json!({ "EventBusArn": arn })))
360    }
361
362    fn delete_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
363        let body = req.json_body();
364        validate_required("Name", &body["Name"])?;
365        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
366        validate_string_length("name", name, 1, 256)?;
367
368        if name == "default" {
369            return Err(AwsServiceError::aws_error(
370                StatusCode::BAD_REQUEST,
371                "ValidationException",
372                format!("Cannot delete event bus {name}."),
373            ));
374        }
375
376        let mut accounts = self.state.write();
377        let state = accounts.get_or_create(&req.account_id);
378        state.buses.remove(name);
379        state.rules.retain(|k, _| k.0 != name);
380
381        Ok(AwsResponse::ok_json(json!({})))
382    }
383
384    fn list_event_buses(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
385        let body = req.json_body();
386        // Smithy ListEventBusesRequest constraints:
387        //   NamePrefix: EventBusName length 1..=256
388        //   NextToken: length 1..=2048
389        //   Limit: LimitMax100 range 1..=100
390        // Unrecognised pagination tokens still fall back to the start of the
391        // list — `InvalidNextTokenException` only fires when the token shape
392        // itself is wrong, not when it points at a vanished cursor.
393        validate_optional_string_length_value("NamePrefix", &body["NamePrefix"], 1, 256)?;
394        validate_optional_string_length_value("NextToken", &body["NextToken"], 1, 2048)?;
395        validate_optional_json_range("Limit", &body["Limit"], 1, 100)?;
396        let name_prefix = body["NamePrefix"].as_str();
397        let limit = body["Limit"].as_i64().unwrap_or(100).clamp(1, 100) as usize;
398
399        let accounts = self.state.read();
400        let empty = EventBridgeState::new(&req.account_id, &req.region);
401        let state = accounts.get(&req.account_id).unwrap_or(&empty);
402        let filtered: Vec<&_> = state
403            .buses
404            .values()
405            .filter(|b| match name_prefix {
406                Some(prefix) => b.name.starts_with(prefix),
407                None => true,
408            })
409            .collect();
410
411        let (page, next_token) = paginate(&filtered, body["NextToken"].as_str(), limit);
412        let buses: Vec<Value> = page
413            .iter()
414            .map(|b| {
415                // Mirror DescribeEventBus: ListEventBuses also reports the
416                // bus creation/last-modified times, which the Terraform
417                // aws_cloudwatch_event_buses data source expects to be set.
418                json!({
419                    "Name": b.name,
420                    "Arn": b.arn,
421                    "CreationTime": b.creation_time.timestamp() as f64,
422                    "LastModifiedTime": b.last_modified_time.timestamp() as f64,
423                })
424            })
425            .collect();
426        let mut resp = json!({ "EventBuses": buses });
427        if let Some(token) = next_token {
428            resp["NextToken"] = json!(token);
429        }
430
431        Ok(AwsResponse::ok_json(resp))
432    }
433
434    fn describe_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
435        let body = req.json_body();
436        validate_optional_string_length("name", body["Name"].as_str(), 1, 1600)?;
437        let name = body["Name"].as_str().unwrap_or("default");
438
439        let accounts = self.state.read();
440        let empty = EventBridgeState::new(&req.account_id, &req.region);
441        let state = accounts.get(&req.account_id).unwrap_or(&empty);
442        let bus = state.buses.get(name).ok_or_else(|| {
443            AwsServiceError::aws_error(
444                StatusCode::BAD_REQUEST,
445                "ResourceNotFoundException",
446                format!("Event bus {name} does not exist."),
447            )
448        })?;
449
450        let mut resp = json!({
451            "Name": bus.name,
452            "Arn": bus.arn,
453            "CreationTime": bus.creation_time.timestamp() as f64,
454            "LastModifiedTime": bus.last_modified_time.timestamp() as f64,
455        });
456
457        if let Some(ref policy) = bus.policy {
458            resp["Policy"] = Value::String(serde_json::to_string(policy).unwrap());
459        }
460        if let Some(ref desc) = bus.description {
461            resp["Description"] = json!(desc);
462        }
463        if let Some(ref kms) = bus.kms_key_identifier {
464            resp["KmsKeyIdentifier"] = json!(kms);
465        }
466        if let Some(ref dlc) = bus.dead_letter_config {
467            resp["DeadLetterConfig"] = dlc.clone();
468        }
469
470        Ok(AwsResponse::ok_json(resp))
471    }
472
473    // ─── Permission Operations ──────────────────────────────────────────
474
475    fn put_permission(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
476        let body = req.json_body();
477        // Smithy PutPermissionRequest constraints (optional members but each
478        // carries a `@length` trait that must be honoured when present):
479        //   EventBusName: NonPartnerEventBusName length 1..=256
480        //   Action: length 1..=64
481        //   Principal: length 1..=12 (12-digit AWS account or `*`)
482        //   StatementId: length 1..=64
483        validate_optional_string_length_value("EventBusName", &body["EventBusName"], 1, 256)?;
484        validate_optional_string_length_value("Action", &body["Action"], 1, 64)?;
485        validate_optional_string_length_value("Principal", &body["Principal"], 1, 12)?;
486        validate_optional_string_length_value("StatementId", &body["StatementId"], 1, 64)?;
487        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
488
489        let mut accounts = self.state.write();
490        let state = accounts.get_or_create(&req.account_id);
491
492        let bus = state.buses.get_mut(event_bus_name).ok_or_else(|| {
493            AwsServiceError::aws_error(
494                StatusCode::BAD_REQUEST,
495                "ResourceNotFoundException",
496                format!("Event bus {event_bus_name} does not exist."),
497            )
498        })?;
499
500        // Check if Policy is provided (new-style)
501        if let Some(policy_str) = body["Policy"].as_str() {
502            if let Ok(policy) = serde_json::from_str::<Value>(policy_str) {
503                bus.policy = Some(policy);
504                return Ok(AwsResponse::ok_json(json!({})));
505            }
506        }
507
508        // Old-style: Action, Principal, StatementId. All are @optional in the
509        // Smithy model; non-string values were already rejected above with
510        // SerializationException, so reaching here means each is either a
511        // valid string or absent. Fall back to "" to preserve current behavior
512        // — recording an empty-statement policy entry is harmless since it can
513        // never match a real action/principal pair.
514        let action = body["Action"].as_str().unwrap_or("");
515        let principal = body["Principal"].as_str().unwrap_or("");
516        let statement_id = body["StatementId"].as_str().unwrap_or("");
517
518        // Note: real AWS does enforce a small allow-list on `Action`, but
519        // PutPermission's Smithy model only declares ResourceNotFoundException,
520        // PolicyLengthExceededException, ConcurrentModificationException,
521        // OperationDisabledException, and InternalException. We accept any
522        // action string and just record the statement.
523        // A `*` principal means "any account" and is stored verbatim, not as
524        // an account-root ARN. The Terraform aws_cloudwatch_event_permission
525        // resource reads the principal back and asserts it is exactly "*".
526        let principal_value = if principal == "*" {
527            json!("*")
528        } else {
529            json!({ "AWS": Arn::global("iam", principal, "root").to_string() })
530        };
531        let statement = json!({
532            "Sid": statement_id,
533            "Effect": "Allow",
534            "Principal": principal_value,
535            "Action": action,
536            "Resource": bus.arn,
537        });
538
539        let policy = bus.policy.get_or_insert_with(|| {
540            json!({
541                "Version": "2012-10-17",
542                "Statement": [],
543            })
544        });
545
546        if let Some(stmts) = policy["Statement"].as_array_mut() {
547            // PutPermission with an existing StatementId replaces that
548            // statement (e.g. changing its principal), it does not stack a
549            // second one. Drop any prior statement with the same Sid first.
550            stmts.retain(|s| s["Sid"].as_str() != Some(statement_id));
551            stmts.push(statement);
552        }
553
554        Ok(AwsResponse::ok_json(json!({})))
555    }
556
557    fn remove_permission(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
558        let body = req.json_body();
559        validate_optional_string_length("statementId", body["StatementId"].as_str(), 1, 64)?;
560        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 256)?;
561        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
562        let statement_id = body["StatementId"].as_str().unwrap_or("");
563        let remove_all = body["RemoveAllPermissions"].as_bool().unwrap_or(false);
564
565        let mut accounts = self.state.write();
566        let state = accounts.get_or_create(&req.account_id);
567
568        let bus = state.buses.get_mut(event_bus_name).ok_or_else(|| {
569            AwsServiceError::aws_error(
570                StatusCode::BAD_REQUEST,
571                "ResourceNotFoundException",
572                format!("Event bus {event_bus_name} does not exist."),
573            )
574        })?;
575
576        if remove_all {
577            bus.policy = None;
578            return Ok(AwsResponse::ok_json(json!({})));
579        }
580
581        let policy = bus.policy.as_mut().ok_or_else(|| {
582            AwsServiceError::aws_error(
583                StatusCode::BAD_REQUEST,
584                "ResourceNotFoundException",
585                "EventBus does not have a policy.",
586            )
587        })?;
588
589        if let Some(stmts) = policy["Statement"].as_array_mut() {
590            let before = stmts.len();
591            stmts.retain(|s| s["Sid"].as_str() != Some(statement_id));
592            if stmts.len() == before {
593                return Err(AwsServiceError::aws_error(
594                    StatusCode::BAD_REQUEST,
595                    "ResourceNotFoundException",
596                    "Statement with the provided id does not exist.",
597                ));
598            }
599            if stmts.is_empty() {
600                bus.policy = None;
601            }
602        }
603
604        Ok(AwsResponse::ok_json(json!({})))
605    }
606
607    // ─── Rule Operations ────────────────────────────────────────────────
608
609    fn put_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
610        let body = req.json_body();
611        // Smithy PutRuleRequest constraints:
612        //   Name: RuleName length 1..=64, @required
613        //   ScheduleExpression: length 0..=256
614        //   EventPattern: length 0..=4096 (raw JSON, separate
615        //     InvalidEventPatternException still applies to syntax)
616        //   State: RuleState enum {ENABLED, DISABLED,
617        //          ENABLED_WITH_ALL_CLOUDTRAIL_MANAGEMENT_EVENTS}
618        //   Description: RuleDescription length 0..=512
619        //   RoleArn: length 1..=1600
620        //   EventBusName: EventBusNameOrArn length 1..=1600
621        validate_required("Name", &body["Name"])?;
622        let name = body["Name"]
623            .as_str()
624            .ok_or_else(|| missing("Name"))?
625            .to_string();
626        validate_string_length("Name", &name, 1, 64)?;
627        validate_optional_string_length_value(
628            "ScheduleExpression",
629            &body["ScheduleExpression"],
630            0,
631            256,
632        )?;
633        validate_optional_string_length_value("EventPattern", &body["EventPattern"], 0, 4096)?;
634        validate_optional_enum_value(
635            "State",
636            &body["State"],
637            &[
638                "ENABLED",
639                "DISABLED",
640                "ENABLED_WITH_ALL_CLOUDTRAIL_MANAGEMENT_EVENTS",
641            ],
642        )?;
643        validate_optional_string_length_value("Description", &body["Description"], 0, 512)?;
644        validate_optional_string_length_value("RoleArn", &body["RoleArn"], 1, 1600)?;
645        validate_optional_string_length_value("EventBusName", &body["EventBusName"], 1, 1600)?;
646
647        let raw_bus = body["EventBusName"]
648            .as_str()
649            .unwrap_or("default")
650            .to_string();
651
652        let mut accounts = self.state.write();
653        let state = accounts.get_or_create(&req.account_id);
654        let event_bus_name = state.resolve_bus_name(&raw_bus);
655
656        let event_pattern = body["EventPattern"].as_str().and_then(|s| {
657            if s.is_empty() {
658                None
659            } else {
660                Some(s.to_string())
661            }
662        });
663        let schedule_expression = body["ScheduleExpression"].as_str().and_then(|s| {
664            if s.is_empty() {
665                None
666            } else {
667                Some(s.to_string())
668            }
669        });
670        let description = body["Description"].as_str().map(|s| s.to_string());
671        let role_arn = body["RoleArn"].as_str().map(|s| s.to_string());
672        let rule_state = body["State"].as_str().unwrap_or("ENABLED").to_string();
673
674        // Note: real AWS rejects ScheduleExpression on a non-default bus, but
675        // PutRule's Smithy model only declares InvalidEventPatternException
676        // for input-shape problems, not ValidationException. We accept the
677        // value and let the scheduler ignore it on non-default buses.
678
679        if !state.buses.contains_key(&event_bus_name) {
680            return Err(AwsServiceError::aws_error(
681                StatusCode::BAD_REQUEST,
682                "ResourceNotFoundException",
683                format!("Event bus {event_bus_name} does not exist."),
684            ));
685        }
686
687        let arn = if event_bus_name == "default" {
688            format!(
689                "arn:aws:events:{}:{}:rule/{}",
690                req.region, state.account_id, name
691            )
692        } else {
693            format!(
694                "arn:aws:events:{}:{}:rule/{}/{}",
695                req.region, state.account_id, event_bus_name, name
696            )
697        };
698
699        let key = (event_bus_name.clone(), name.clone());
700        let targets = state
701            .rules
702            .get(&key)
703            .map(|r| r.targets.clone())
704            .unwrap_or_default();
705
706        let tags = parse_tags(&body);
707
708        let rule = EventRule {
709            name: name.clone(),
710            arn: arn.clone(),
711            event_bus_name,
712            event_pattern,
713            schedule_expression,
714            state: rule_state,
715            description,
716            role_arn,
717            managed_by: None,
718            created_by: None,
719            targets,
720            tags,
721            last_fired: None,
722        };
723
724        state.rules.insert(key, rule);
725        Ok(AwsResponse::ok_json(json!({ "RuleArn": arn })))
726    }
727
728    fn delete_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
729        let body = req.json_body();
730        validate_required("Name", &body["Name"])?;
731        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
732        validate_string_length("name", name, 1, 64)?;
733        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
734        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
735
736        let mut accounts = self.state.write();
737        let state = accounts.get_or_create(&req.account_id);
738        let bus_name = state.resolve_bus_name(event_bus_name);
739        let key = (bus_name, name.to_string());
740
741        // Check if rule has targets
742        if let Some(rule) = state.rules.get(&key) {
743            if !rule.targets.is_empty() {
744                return Err(AwsServiceError::aws_error(
745                    StatusCode::BAD_REQUEST,
746                    "ValidationException",
747                    "Rule can't be deleted since it has targets.",
748                ));
749            }
750        }
751
752        state.rules.remove(&key);
753        Ok(AwsResponse::ok_json(json!({})))
754    }
755
756    fn list_rules(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
757        let body = req.json_body();
758        validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 64)?;
759        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
760        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
761        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
762        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
763        let name_prefix = body["NamePrefix"].as_str();
764        let limit = body["Limit"].as_u64().map(|n| n as usize);
765        let next_token = body["NextToken"].as_str();
766
767        let accounts = self.state.read();
768        let empty = EventBridgeState::new(&req.account_id, &req.region);
769        let state = accounts.get(&req.account_id).unwrap_or(&empty);
770        let bus_name = state.resolve_bus_name(event_bus_name);
771
772        let mut rules: Vec<&EventRule> = state
773            .rules
774            .values()
775            .filter(|r| r.event_bus_name == bus_name)
776            .filter(|r| match name_prefix {
777                Some(prefix) => r.name.starts_with(prefix),
778                None => true,
779            })
780            .collect();
781        rules.sort_by(|a, b| a.name.cmp(&b.name));
782
783        // Pagination
784        let start = next_token
785            .and_then(|t| t.parse::<usize>().ok())
786            .unwrap_or(0)
787            .min(rules.len());
788        let rules_slice = &rules[start..];
789
790        let (page, new_next_token) = if let Some(lim) = limit {
791            if rules_slice.len() > lim {
792                (&rules_slice[..lim], Some((start + lim).to_string()))
793            } else {
794                (rules_slice, None)
795            }
796        } else {
797            (rules_slice, None)
798        };
799
800        let rules_json: Vec<Value> = page
801            .iter()
802            .map(|r| {
803                let mut obj = json!({
804                    "Name": r.name,
805                    "Arn": r.arn,
806                    "EventBusName": r.event_bus_name,
807                    "State": r.state,
808                });
809                if let Some(ref desc) = r.description {
810                    obj["Description"] = json!(desc);
811                }
812                if let Some(ref ep) = r.event_pattern {
813                    obj["EventPattern"] = json!(ep);
814                }
815                if let Some(ref se) = r.schedule_expression {
816                    obj["ScheduleExpression"] = json!(se);
817                }
818                if let Some(ref role) = r.role_arn {
819                    obj["RoleArn"] = json!(role);
820                }
821                if let Some(ref mb) = r.managed_by {
822                    obj["ManagedBy"] = json!(mb);
823                }
824                obj
825            })
826            .collect();
827
828        let mut resp = json!({ "Rules": rules_json });
829        if let Some(token) = new_next_token {
830            resp["NextToken"] = json!(token);
831        }
832
833        Ok(AwsResponse::ok_json(resp))
834    }
835
836    fn describe_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
837        let body = req.json_body();
838        validate_required("Name", &body["Name"])?;
839        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
840        validate_string_length("name", name, 1, 64)?;
841        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
842        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
843
844        let accounts = self.state.read();
845        let empty = EventBridgeState::new(&req.account_id, &req.region);
846        let state = accounts.get(&req.account_id).unwrap_or(&empty);
847        let bus_name = state.resolve_bus_name(event_bus_name);
848        let key = (bus_name.clone(), name.to_string());
849
850        let rule = state.rules.get(&key).ok_or_else(|| {
851            AwsServiceError::aws_error(
852                StatusCode::BAD_REQUEST,
853                "ResourceNotFoundException",
854                format!("Rule {name} does not exist."),
855            )
856        })?;
857
858        let mut resp = json!({
859            "Name": rule.name,
860            "Arn": rule.arn,
861            "EventBusName": rule.event_bus_name,
862            "State": rule.state,
863        });
864
865        if let Some(ref desc) = rule.description {
866            resp["Description"] = json!(desc);
867        }
868        if let Some(ref ep) = rule.event_pattern {
869            resp["EventPattern"] = json!(ep);
870        }
871        if let Some(ref se) = rule.schedule_expression {
872            resp["ScheduleExpression"] = json!(se);
873        }
874        if let Some(ref role) = rule.role_arn {
875            resp["RoleArn"] = json!(role);
876        }
877        if let Some(ref mb) = rule.managed_by {
878            resp["ManagedBy"] = json!(mb);
879        }
880        if let Some(ref cb) = rule.created_by {
881            resp["CreatedBy"] = json!(cb);
882        }
883        // If non-default bus, set CreatedBy to account_id
884        if rule.event_bus_name != "default" && rule.created_by.is_none() {
885            resp["CreatedBy"] = json!(state.account_id);
886        }
887
888        Ok(AwsResponse::ok_json(resp))
889    }
890
891    fn enable_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
892        let body = req.json_body();
893        validate_required("Name", &body["Name"])?;
894        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
895        validate_string_length("name", name, 1, 64)?;
896        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
897        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
898
899        let mut accounts = self.state.write();
900        let state = accounts.get_or_create(&req.account_id);
901        let bus_name = state.resolve_bus_name(event_bus_name);
902        let key = (bus_name, name.to_string());
903
904        let rule = state.rules.get_mut(&key).ok_or_else(|| {
905            AwsServiceError::aws_error(
906                StatusCode::BAD_REQUEST,
907                "ResourceNotFoundException",
908                format!("Rule {name} does not exist."),
909            )
910        })?;
911
912        rule.state = "ENABLED".to_string();
913        Ok(AwsResponse::ok_json(json!({})))
914    }
915
916    fn disable_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
917        let body = req.json_body();
918        validate_required("Name", &body["Name"])?;
919        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
920        validate_string_length("name", name, 1, 64)?;
921        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
922        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
923
924        let mut accounts = self.state.write();
925        let state = accounts.get_or_create(&req.account_id);
926        let bus_name = state.resolve_bus_name(event_bus_name);
927        let key = (bus_name, name.to_string());
928
929        let rule = state.rules.get_mut(&key).ok_or_else(|| {
930            AwsServiceError::aws_error(
931                StatusCode::BAD_REQUEST,
932                "ResourceNotFoundException",
933                format!("Rule {name} does not exist."),
934            )
935        })?;
936
937        rule.state = "DISABLED".to_string();
938        Ok(AwsResponse::ok_json(json!({})))
939    }
940
941    // ─── Target Operations ──────────────────────────────────────────────
942
943    fn put_targets(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
944        let body = req.json_body();
945        // Smithy PutTargetsRequest constraints (top-level shape):
946        //   Rule: RuleName @required length 1..=64
947        //   EventBusName: EventBusNameOrArn length 1..=1600
948        //   Targets: TargetList @required length 1..=100
949        // Per-target validation still flows through `FailedEntries` (matching
950        // AWS); only the top-level shape produces ValidationException.
951        validate_required("Rule", &body["Rule"])?;
952        let rule_name = body["Rule"].as_str().ok_or_else(|| missing("Rule"))?;
953        validate_string_length("Rule", rule_name, 1, 64)?;
954        validate_optional_string_length_value("EventBusName", &body["EventBusName"], 1, 1600)?;
955        // Targets is @required with TargetList length 1..=100 in the Smithy
956        // model. Real AWS rejects empty / oversized lists with a
957        // ValidationException; per-target validation continues to flow
958        // through FailedEntries (matching AWS).
959        validate_required("Targets", &body["Targets"])?;
960        let targets_array = body["Targets"].as_array().ok_or_else(|| {
961            AwsServiceError::aws_error(
962                StatusCode::BAD_REQUEST,
963                "ValidationException",
964                "Targets must be a list",
965            )
966        })?;
967        if targets_array.is_empty() || targets_array.len() > 100 {
968            return Err(AwsServiceError::aws_error(
969                StatusCode::BAD_REQUEST,
970                "ValidationException",
971                "Value at 'Targets' failed to satisfy constraint: \
972                 Member must have length between 1 and 100",
973            ));
974        }
975        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
976        let targets: Vec<Value> = targets_array.clone();
977
978        let mut accounts = self.state.write();
979        let state = accounts.get_or_create(&req.account_id);
980        let bus_name = state.resolve_bus_name(event_bus_name);
981        let key = (bus_name.clone(), rule_name.to_string());
982
983        let rule = state.rules.get_mut(&key).ok_or_else(|| {
984            AwsServiceError::aws_error(
985                StatusCode::BAD_REQUEST,
986                "ResourceNotFoundException",
987                format!("Rule {rule_name} does not exist on EventBus {bus_name}."),
988            )
989        })?;
990
991        let mut failed_entries: Vec<Value> = Vec::new();
992        for target in &targets {
993            let target_id = target["Id"].as_str().unwrap_or("").to_string();
994            let target_arn = target["Arn"].as_str().unwrap_or("");
995
996            if target_arn.ends_with(".fifo") && target.get("SqsParameters").is_none() {
997                failed_entries.push(json!({
998                    "TargetId": target_id,
999                    "ErrorCode": "ValidationException",
1000                    "ErrorMessage": format!(
1001                        "Parameter(s) SqsParameters must be specified for target: {target_id}."
1002                    ),
1003                }));
1004                continue;
1005            }
1006            if !target_arn.starts_with("arn:") {
1007                failed_entries.push(json!({
1008                    "TargetId": target_id,
1009                    "ErrorCode": "ValidationException",
1010                    "ErrorMessage": format!(
1011                        "Parameter {target_arn} is not valid. Reason: Provided Arn is not in correct format."
1012                    ),
1013                }));
1014                continue;
1015            }
1016
1017            let et = parse_target(target);
1018            rule.targets.retain(|t| t.id != et.id);
1019            rule.targets.push(et);
1020        }
1021
1022        Ok(AwsResponse::ok_json(json!({
1023            "FailedEntryCount": failed_entries.len(),
1024            "FailedEntries": failed_entries,
1025        })))
1026    }
1027
1028    fn remove_targets(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1029        let body = req.json_body();
1030        validate_required("Rule", &body["Rule"])?;
1031        let rule_name = body["Rule"].as_str().ok_or_else(|| missing("Rule"))?;
1032        validate_string_length("rule", rule_name, 1, 64)?;
1033        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
1034        validate_required("Ids", &body["Ids"])?;
1035        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
1036        let ids = body["Ids"].as_array().ok_or_else(|| missing("Ids"))?;
1037
1038        let target_ids: Vec<String> = ids
1039            .iter()
1040            .filter_map(|v| v.as_str().map(|s| s.to_string()))
1041            .collect();
1042
1043        let mut accounts = self.state.write();
1044        let state = accounts.get_or_create(&req.account_id);
1045        let bus_name = state.resolve_bus_name(event_bus_name);
1046        let key = (bus_name.clone(), rule_name.to_string());
1047
1048        let rule = state.rules.get_mut(&key).ok_or_else(|| {
1049            AwsServiceError::aws_error(
1050                StatusCode::BAD_REQUEST,
1051                "ResourceNotFoundException",
1052                format!("Rule {rule_name} does not exist on EventBus {bus_name}."),
1053            )
1054        })?;
1055
1056        rule.targets.retain(|t| !target_ids.contains(&t.id));
1057
1058        Ok(AwsResponse::ok_json(json!({
1059            "FailedEntryCount": 0,
1060            "FailedEntries": [],
1061        })))
1062    }
1063
1064    fn list_targets_by_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1065        let body = req.json_body();
1066        validate_required("Rule", &body["Rule"])?;
1067        let rule_name = body["Rule"].as_str().ok_or_else(|| missing("Rule"))?;
1068        validate_string_length("rule", rule_name, 1, 64)?;
1069        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
1070        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1071        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1072        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
1073        let limit = body["Limit"].as_u64().map(|n| n as usize);
1074        let next_token = body["NextToken"].as_str();
1075
1076        let accounts = self.state.read();
1077        let empty = EventBridgeState::new(&req.account_id, &req.region);
1078        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1079        let bus_name = state.resolve_bus_name(event_bus_name);
1080        let key = (bus_name, rule_name.to_string());
1081
1082        let rule = state.rules.get(&key).ok_or_else(|| {
1083            AwsServiceError::aws_error(
1084                StatusCode::BAD_REQUEST,
1085                "ResourceNotFoundException",
1086                format!("Rule {rule_name} does not exist."),
1087            )
1088        })?;
1089
1090        let all_targets = &rule.targets;
1091        let start = next_token
1092            .and_then(|t| t.parse::<usize>().ok())
1093            .unwrap_or(0)
1094            .min(all_targets.len());
1095        let slice = &all_targets[start..];
1096
1097        let (page, new_next_token) = if let Some(lim) = limit {
1098            if slice.len() > lim {
1099                (&slice[..lim], Some((start + lim).to_string()))
1100            } else {
1101                (slice, None)
1102            }
1103        } else {
1104            (slice, None)
1105        };
1106
1107        let targets: Vec<Value> = page.iter().map(target_to_json).collect();
1108
1109        let mut resp = json!({ "Targets": targets });
1110        if let Some(token) = new_next_token {
1111            resp["NextToken"] = json!(token);
1112        }
1113
1114        Ok(AwsResponse::ok_json(resp))
1115    }
1116
1117    fn list_rule_names_by_target(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1118        let body = req.json_body();
1119        validate_required("TargetArn", &body["TargetArn"])?;
1120        let target_arn = body["TargetArn"]
1121            .as_str()
1122            .ok_or_else(|| missing("TargetArn"))?;
1123        validate_string_length("targetArn", target_arn, 1, 1600)?;
1124        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
1125        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1126        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1127        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
1128        let limit = body["Limit"].as_u64().map(|n| n as usize);
1129        let next_token = body["NextToken"].as_str();
1130
1131        let accounts = self.state.read();
1132        let empty = EventBridgeState::new(&req.account_id, &req.region);
1133        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1134        let bus_name = state.resolve_bus_name(event_bus_name);
1135
1136        // Deduplicate rule names
1137        let mut rule_names: Vec<String> = Vec::new();
1138        for rule in state.rules.values() {
1139            if rule.event_bus_name == bus_name
1140                && rule.targets.iter().any(|t| t.arn == target_arn)
1141                && !rule_names.contains(&rule.name)
1142            {
1143                rule_names.push(rule.name.clone());
1144            }
1145        }
1146        rule_names.sort();
1147
1148        let start = next_token
1149            .and_then(|t| t.parse::<usize>().ok())
1150            .unwrap_or(0)
1151            .min(rule_names.len());
1152        let slice = &rule_names[start..];
1153
1154        let (page, new_next_token) = if let Some(lim) = limit {
1155            if slice.len() > lim {
1156                (&slice[..lim], Some((start + lim).to_string()))
1157            } else {
1158                (slice, None)
1159            }
1160        } else {
1161            (slice, None)
1162        };
1163
1164        let mut resp = json!({ "RuleNames": page });
1165        if let Some(token) = new_next_token {
1166            resp["NextToken"] = json!(token);
1167        }
1168
1169        Ok(AwsResponse::ok_json(resp))
1170    }
1171
1172    // ─── Partner Event Sources ────────────���───────────────────────────
1173
1174    fn test_event_pattern(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1175        let body = req.json_body();
1176        validate_required("EventPattern", &body["EventPattern"])?;
1177        validate_required("Event", &body["Event"])?;
1178        let event_pattern = body["EventPattern"]
1179            .as_str()
1180            .ok_or_else(|| missing("EventPattern"))?;
1181        let event_str = body["Event"].as_str().ok_or_else(|| missing("Event"))?;
1182
1183        // Parse the event JSON
1184        let event: Value = serde_json::from_str(event_str).map_err(|_| {
1185            AwsServiceError::aws_error(
1186                StatusCode::BAD_REQUEST,
1187                "InvalidEventPatternException",
1188                "Event is not valid JSON.",
1189            )
1190        })?;
1191
1192        // Parse the pattern JSON
1193        let _pattern: Value = serde_json::from_str(event_pattern).map_err(|_| {
1194            AwsServiceError::aws_error(
1195                StatusCode::BAD_REQUEST,
1196                "InvalidEventPatternException",
1197                "Event pattern is not valid JSON.",
1198            )
1199        })?;
1200
1201        let source = event["source"].as_str().unwrap_or("");
1202        let detail_type = event["detail-type"].as_str().unwrap_or("");
1203        let detail = event
1204            .get("detail")
1205            .map(|v| serde_json::to_string(v).unwrap_or_default())
1206            .unwrap_or_else(|| "{}".to_string());
1207        let account = event["account"].as_str().unwrap_or("");
1208        let region = event["region"].as_str().unwrap_or("");
1209        let resources: Vec<String> = event["resources"]
1210            .as_array()
1211            .map(|arr| {
1212                arr.iter()
1213                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
1214                    .collect()
1215            })
1216            .unwrap_or_default();
1217
1218        let result = matches_pattern(
1219            Some(event_pattern),
1220            source,
1221            detail_type,
1222            &detail,
1223            account,
1224            region,
1225            &resources,
1226        );
1227
1228        Ok(AwsResponse::ok_json(json!({ "Result": result })))
1229    }
1230
1231    // ─── UpdateEventBus ─────────────────────────────────────────────────
1232
1233    fn update_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1234        let body = req.json_body();
1235        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
1236        validate_optional_string_length(
1237            "kmsKeyIdentifier",
1238            body["KmsKeyIdentifier"].as_str(),
1239            0,
1240            2048,
1241        )?;
1242        let name = body["Name"].as_str().unwrap_or("default");
1243
1244        let mut accounts = self.state.write();
1245        let state = accounts.get_or_create(&req.account_id);
1246        let bus = state.buses.get_mut(name).ok_or_else(|| {
1247            AwsServiceError::aws_error(
1248                StatusCode::BAD_REQUEST,
1249                "ResourceNotFoundException",
1250                format!("Event bus {name} does not exist."),
1251            )
1252        })?;
1253
1254        if let Some(desc) = body["Description"].as_str() {
1255            bus.description = Some(desc.to_string());
1256        }
1257        if let Some(kms) = body["KmsKeyIdentifier"].as_str() {
1258            bus.kms_key_identifier = Some(kms.to_string());
1259        }
1260        if let Some(dlc) = body.get("DeadLetterConfig") {
1261            bus.dead_letter_config = Some(dlc.clone());
1262        }
1263        bus.last_modified_time = Utc::now();
1264
1265        let arn = bus.arn.clone();
1266        let bus_name = bus.name.clone();
1267
1268        Ok(AwsResponse::ok_json(json!({
1269            "Arn": arn,
1270            "Name": bus_name,
1271        })))
1272    }
1273
1274    // ─── Endpoint Operations ────────────────────────────────────────────
1275
1276    fn put_events(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1277        let body = req.json_body();
1278        // Smithy PutEventsRequest constraints:
1279        //   EndpointId: length 1..=50
1280        //   Entries: @required, length 1..=10
1281        validate_optional_string_length_value("EndpointId", &body["EndpointId"], 1, 50)?;
1282        validate_required("Entries", &body["Entries"])?;
1283        let entries_array = body["Entries"].as_array().ok_or_else(|| {
1284            AwsServiceError::aws_error(
1285                StatusCode::BAD_REQUEST,
1286                "ValidationException",
1287                "Entries must be a list",
1288            )
1289        })?;
1290        if entries_array.is_empty() || entries_array.len() > 10 {
1291            return Err(AwsServiceError::aws_error(
1292                StatusCode::BAD_REQUEST,
1293                "ValidationException",
1294                "Value at 'Entries' failed to satisfy constraint: \
1295                 Member must have length between 1 and 10",
1296            ));
1297        }
1298        let entries: Vec<Value> = entries_array.clone();
1299        let entries = &entries;
1300
1301        let mut accounts = self.state.write();
1302        let state = accounts.get_or_create(&req.account_id);
1303        let mut result_entries = Vec::new();
1304        let mut events_to_deliver = Vec::new();
1305        let mut failed_count = 0;
1306
1307        for entry in entries {
1308            let source = entry["Source"].as_str().unwrap_or("").to_string();
1309            let detail_type = entry["DetailType"].as_str().unwrap_or("").to_string();
1310            let detail = entry["Detail"].as_str().unwrap_or("").to_string();
1311
1312            if let Err(error) = validate_put_events_entry(&source, &detail_type, &detail) {
1313                failed_count += 1;
1314                result_entries.push(error);
1315                continue;
1316            }
1317
1318            let event_id = uuid::Uuid::new_v4().to_string();
1319            let raw_bus = entry["EventBusName"]
1320                .as_str()
1321                .unwrap_or("default")
1322                .to_string();
1323            let event_bus_name = state.resolve_bus_name(&raw_bus);
1324
1325            // Bus resource-policy gate. AWS evaluates the bus's
1326            // resource policy against cross-account callers; same-account
1327            // callers always have access. The policy itself is JSON
1328            // stored as serde_json::Value so the IAM evaluator parses
1329            // it the same way it parses an S3 bucket policy.
1330            let caller_account = req
1331                .principal
1332                .as_ref()
1333                .map(|p| p.account_id.as_str())
1334                .unwrap_or(req.account_id.as_str());
1335            if caller_account != req.account_id {
1336                let bus_policy_value = state
1337                    .buses
1338                    .get(&event_bus_name)
1339                    .and_then(|b| b.policy.clone());
1340                if let Some(policy_value) = bus_policy_value {
1341                    let policy_json = serde_json::to_string(&policy_value).unwrap_or_default();
1342                    let policy_doc = fakecloud_iam::evaluator::PolicyDocument::parse(&policy_json);
1343                    let bus_arn = state
1344                        .buses
1345                        .get(&event_bus_name)
1346                        .map(|b| b.arn.clone())
1347                        .unwrap_or_default();
1348                    let principal =
1349                        req.principal
1350                            .clone()
1351                            .unwrap_or_else(|| fakecloud_core::auth::Principal {
1352                                arn: Arn::global("iam", caller_account, "root").to_string(),
1353                                user_id: caller_account.to_string(),
1354                                account_id: caller_account.to_string(),
1355                                principal_type: fakecloud_core::auth::PrincipalType::Root,
1356                                source_identity: None,
1357                                tags: None,
1358                            });
1359                    let context = fakecloud_iam::evaluator::RequestContext {
1360                        aws_principal_arn: Some(principal.arn.clone()),
1361                        aws_principal_account: Some(principal.account_id.clone()),
1362                        ..Default::default()
1363                    };
1364                    let eval_req = fakecloud_iam::evaluator::EvalRequest {
1365                        principal: &principal,
1366                        action: "events:PutEvents".to_string(),
1367                        resource: bus_arn,
1368                        context,
1369                    };
1370                    let decision = fakecloud_iam::evaluator::evaluate_resource_policy_only(
1371                        &policy_doc,
1372                        &eval_req,
1373                    );
1374                    if !matches!(decision, fakecloud_iam::evaluator::Decision::Allow) {
1375                        failed_count += 1;
1376                        result_entries.push(json!({
1377                            "ErrorCode": "AccessDeniedException",
1378                            "ErrorMessage": format!(
1379                                "User '{}' is not authorized to put events on event bus '{}'",
1380                                principal.arn, event_bus_name
1381                            ),
1382                        }));
1383                        continue;
1384                    }
1385                }
1386            }
1387
1388            let time = parse_put_events_time(&entry["Time"]);
1389            let resources: Vec<String> = entry["Resources"]
1390                .as_array()
1391                .map(|arr| {
1392                    arr.iter()
1393                        .filter_map(|v| v.as_str().map(|s| s.to_string()))
1394                        .collect()
1395                })
1396                .unwrap_or_default();
1397
1398            let event = PutEvent {
1399                event_id: event_id.clone(),
1400                source: source.clone(),
1401                detail_type: detail_type.clone(),
1402                detail: detail.clone(),
1403                event_bus_name: event_bus_name.clone(),
1404                time,
1405                resources: resources.clone(),
1406            };
1407
1408            archive_matching_event(
1409                state,
1410                &event,
1411                &event_bus_name,
1412                &source,
1413                &detail_type,
1414                &detail,
1415                &req.account_id,
1416                &req.region,
1417                &resources,
1418            );
1419
1420            state.events.push(event);
1421
1422            // Find matching rules and their targets
1423            let matching_targets: Vec<EventTarget> = state
1424                .rules
1425                .values()
1426                .filter(|r| {
1427                    r.event_bus_name == event_bus_name
1428                        && r.state == "ENABLED"
1429                        && matches_pattern(
1430                            r.event_pattern.as_deref(),
1431                            &source,
1432                            &detail_type,
1433                            &detail,
1434                            &req.account_id,
1435                            &req.region,
1436                            &resources,
1437                        )
1438                })
1439                .flat_map(|r| r.targets.clone())
1440                .collect();
1441
1442            if !matching_targets.is_empty() {
1443                events_to_deliver.push((
1444                    event_id.clone(),
1445                    source,
1446                    detail_type,
1447                    detail,
1448                    time,
1449                    resources,
1450                    matching_targets,
1451                ));
1452            }
1453
1454            result_entries.push(json!({ "EventId": event_id }));
1455        }
1456
1457        // Drop the lock before delivering
1458        drop(accounts);
1459
1460        // Deliver to targets — single-target dispatch lives in the
1461        // shared helper so cross-service callers (delivery.rs) honor the
1462        // same target shape (SQS/SNS/Lambda/Logs/Kinesis/StepFunctions/
1463        // ApiDestination/HTTP) and the same InputTransformer rules.
1464        for (event_id, source, detail_type, detail, time, resources, targets) in events_to_deliver {
1465            let detail_value: Value = serde_json::from_str(&detail).unwrap_or(json!({}));
1466            let event_json = json!({
1467                "version": "0",
1468                "id": event_id,
1469                "source": source,
1470                "account": req.account_id,
1471                "detail-type": detail_type,
1472                "detail": detail_value,
1473                "time": time.format("%Y-%m-%dT%H:%M:%SZ").to_string(),
1474                "region": req.region,
1475                "resources": resources,
1476            });
1477
1478            let ctx = EventDispatchContext {
1479                state: &self.state,
1480                delivery: &self.delivery,
1481                lambda_state: self.lambda_state.as_ref(),
1482                logs_state: self.logs_state.as_ref(),
1483                container_runtime: &self.container_runtime,
1484                account_id: &req.account_id,
1485                region: &req.region,
1486            };
1487            for target in targets {
1488                dispatch_event_target(&ctx, &target, &event_json, &event_id, &detail_type);
1489            }
1490        }
1491
1492        let resp = json!({
1493            "FailedEntryCount": failed_count,
1494            "Entries": result_entries,
1495        });
1496
1497        Ok(AwsResponse::ok_json(resp))
1498    }
1499
1500    // ─── Tagging ────────────────────────────────────────────────────────
1501
1502    fn tag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1503        let body = req.json_body();
1504        validate_required("ResourceARN", &body["ResourceARN"])?;
1505        let arn = body["ResourceARN"]
1506            .as_str()
1507            .ok_or_else(|| missing("ResourceARN"))?;
1508        validate_string_length("resourceARN", arn, 1, 1600)?;
1509        validate_required("Tags", &body["Tags"])?;
1510
1511        let mut accounts = self.state.write();
1512        let state = accounts.get_or_create(&req.account_id);
1513        let tag_map = find_tags_mut(state, arn)?;
1514
1515        fakecloud_core::tags::apply_tags(tag_map, &body, "Tags", "Key", "Value").map_err(|f| {
1516            AwsServiceError::aws_error(
1517                StatusCode::BAD_REQUEST,
1518                "ValidationException",
1519                format!("{f} must be a list"),
1520            )
1521        })?;
1522
1523        Ok(AwsResponse::ok_json(json!({})))
1524    }
1525
1526    fn untag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1527        let body = req.json_body();
1528        validate_required("ResourceARN", &body["ResourceARN"])?;
1529        let arn = body["ResourceARN"]
1530            .as_str()
1531            .ok_or_else(|| missing("ResourceARN"))?;
1532        validate_string_length("resourceARN", arn, 1, 1600)?;
1533        validate_required("TagKeys", &body["TagKeys"])?;
1534
1535        let mut accounts = self.state.write();
1536        let state = accounts.get_or_create(&req.account_id);
1537        let tag_map = find_tags_mut(state, arn)?;
1538
1539        fakecloud_core::tags::remove_tags(tag_map, &body, "TagKeys").map_err(|f| {
1540            AwsServiceError::aws_error(
1541                StatusCode::BAD_REQUEST,
1542                "ValidationException",
1543                format!("{f} must be a list"),
1544            )
1545        })?;
1546
1547        Ok(AwsResponse::ok_json(json!({})))
1548    }
1549
1550    fn list_tags_for_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1551        let body = req.json_body();
1552        validate_required("ResourceARN", &body["ResourceARN"])?;
1553        let arn = body["ResourceARN"]
1554            .as_str()
1555            .ok_or_else(|| missing("ResourceARN"))?;
1556        validate_string_length("resourceARN", arn, 1, 1600)?;
1557
1558        let accounts = self.state.read();
1559        let empty = EventBridgeState::new(&req.account_id, &req.region);
1560        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1561        let tag_map = find_tags(state, arn)?;
1562
1563        let tags = fakecloud_core::tags::tags_to_json(tag_map, "Key", "Value");
1564
1565        Ok(AwsResponse::ok_json(json!({ "Tags": tags })))
1566    }
1567
1568    // ─── Archive Operations ─────────────────────────────────────────────
1569}
1570
1571// ─── Tag Lookup Helpers ─────────────────────────────────────────────────
1572
1573// ─── Event Pattern Validation ────────────────────────────────────────
1574
1575// ─── Connection Auth Params Response Builder ────────────────────────
1576
1577// ─── Event Pattern Matching ─────────────────────────────────────────
1578
1579/// Parsed + validated inputs for `StartReplay`.
1580struct StartReplayInput {
1581    name: String,
1582    description: Option<String>,
1583    event_source_arn: String,
1584    destination: Value,
1585    destination_arn: String,
1586    event_start_time: DateTime<Utc>,
1587    event_end_time: DateTime<Utc>,
1588}
1589
1590impl StartReplayInput {
1591    fn from_body(body: &Value) -> Result<Self, AwsServiceError> {
1592        // StartReplay's Smithy model declares ResourceNotFound,
1593        // ResourceAlreadyExists, InvalidEventPattern, LimitExceeded, and
1594        // Internal — but not ValidationException. Per-field constraints are
1595        // enforced client-side by the SDK; we surface only declared errors
1596        // here. Missing required inputs default to empty strings and the
1597        // downstream bus/archive lookups produce ResourceNotFound for the
1598        // ones that matter.
1599        let name = body["ReplayName"].as_str().unwrap_or("").to_string();
1600        let description = body["Description"].as_str().map(|s| s.to_string());
1601        let event_source_arn = body["EventSourceArn"].as_str().unwrap_or("").to_string();
1602        let destination = body["Destination"].clone();
1603
1604        let event_start_time = body["EventStartTime"]
1605            .as_f64()
1606            .and_then(|f| DateTime::from_timestamp(f as i64, 0))
1607            .unwrap_or_else(Utc::now);
1608        let event_end_time = body["EventEndTime"]
1609            .as_f64()
1610            .and_then(|f| DateTime::from_timestamp(f as i64, 0))
1611            .unwrap_or_else(Utc::now);
1612
1613        let destination_arn = destination["Arn"].as_str().unwrap_or("").to_string();
1614        if !destination_arn.contains(":event-bus/") {
1615            // Missing/invalid destination cannot be resolved to a bus —
1616            // surface as ResourceNotFound rather than the undeclared
1617            // ValidationException.
1618            return Err(AwsServiceError::aws_error(
1619                StatusCode::BAD_REQUEST,
1620                "ResourceNotFoundException",
1621                format!("Destination.Arn {destination_arn} does not point to an event bus."),
1622            ));
1623        }
1624
1625        Ok(Self {
1626            name,
1627            description,
1628            event_source_arn,
1629            destination,
1630            destination_arn,
1631            event_start_time,
1632            event_end_time,
1633        })
1634    }
1635}
1636
1637#[path = "service_archives_replays.rs"]
1638mod service_archives_replays;
1639#[path = "service_connections_apidests.rs"]
1640mod service_connections_apidests;
1641#[path = "service_endpoints.rs"]
1642mod service_endpoints;
1643#[path = "service_partner_sources.rs"]
1644mod service_partner_sources;
1645
1646#[path = "helpers.rs"]
1647pub(crate) mod helpers;
1648pub(crate) use helpers::*;
1649
1650#[cfg(test)]
1651#[path = "service_tests.rs"]
1652mod tests;
1653
1654#[cfg(test)]
1655mod pagination_reject_test {
1656    #[test]
1657    fn paginate_checked_rejects_invalid_token() {
1658        use fakecloud_core::pagination::paginate_checked;
1659        let items: Vec<i32> = (0..5).collect();
1660        assert!(paginate_checked(&items, Some("bad"), 3).is_err());
1661        assert!(paginate_checked(&items, Some("2"), 3).is_ok());
1662    }
1663}