Skip to main content

fakecloud_eventbridge/
service.rs

1use async_trait::async_trait;
2use chrono::{DateTime, Utc};
3use http::StatusCode;
4use serde_json::{json, Value};
5
6use std::collections::{BTreeMap, HashMap};
7use std::sync::Arc;
8
9use tokio::sync::Mutex as AsyncMutex;
10
11use fakecloud_aws::arn::Arn;
12use fakecloud_core::delivery::DeliveryBus;
13use fakecloud_core::pagination::paginate;
14use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
15use fakecloud_core::validation::*;
16use fakecloud_persistence::SnapshotStore;
17
18use fakecloud_lambda::runtime::ContainerRuntime;
19use fakecloud_lambda::{LambdaInvocation, SharedLambdaState};
20use fakecloud_logs::SharedLogsState;
21
22use crate::state::{
23    ApiDestination, Archive, Connection, Endpoint, EventBridgeSnapshot, EventBridgeState, EventBus,
24    EventRule, EventTarget, PartnerEventSource, PutEvent, Replay, SharedEventBridgeState,
25    EVENTBRIDGE_SNAPSHOT_SCHEMA_VERSION,
26};
27
28pub struct EventBridgeService {
29    state: SharedEventBridgeState,
30    delivery: Arc<DeliveryBus>,
31    lambda_state: Option<SharedLambdaState>,
32    logs_state: Option<SharedLogsState>,
33    container_runtime: Option<Arc<ContainerRuntime>>,
34    snapshot_store: Option<Arc<dyn SnapshotStore>>,
35    snapshot_lock: Arc<AsyncMutex<()>>,
36}
37
38impl EventBridgeService {
39    pub fn new(state: SharedEventBridgeState, delivery: Arc<DeliveryBus>) -> Self {
40        Self {
41            state,
42            delivery,
43            lambda_state: None,
44            logs_state: None,
45            container_runtime: None,
46            snapshot_store: None,
47            snapshot_lock: Arc::new(AsyncMutex::new(())),
48        }
49    }
50
51    pub fn with_lambda(mut self, lambda_state: SharedLambdaState) -> Self {
52        self.lambda_state = Some(lambda_state);
53        self
54    }
55
56    pub fn with_logs(mut self, logs_state: SharedLogsState) -> Self {
57        self.logs_state = Some(logs_state);
58        self
59    }
60
61    pub fn with_runtime(mut self, runtime: Arc<ContainerRuntime>) -> Self {
62        self.container_runtime = Some(runtime);
63        self
64    }
65
66    pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
67        self.snapshot_store = Some(store);
68        self
69    }
70
71    /// Persist current state as a snapshot. Held across the
72    /// clone-serialize-write sequence to prevent stale-last writes,
73    /// with serde + file I/O offloaded to the blocking pool.
74    async fn save_snapshot(&self) {
75        save_eventbridge_snapshot(
76            &self.state,
77            self.snapshot_store.clone(),
78            &self.snapshot_lock,
79        )
80        .await;
81    }
82
83    /// Build a hook that persists the current EventBridge state when invoked, or
84    /// `None` in memory mode (no snapshot store). The CloudFormation provisioner
85    /// mutates `state` directly and uses this to write a CFN-provisioned
86    /// resource through to disk, the same way a direct mutating API call would.
87    pub fn snapshot_hook(&self) -> Option<fakecloud_persistence::SnapshotHook> {
88        let store = self.snapshot_store.clone()?;
89        let state = self.state.clone();
90        let lock = self.snapshot_lock.clone();
91        Some(Arc::new(move || {
92            let state = state.clone();
93            let store = store.clone();
94            let lock = lock.clone();
95            Box::pin(async move {
96                save_eventbridge_snapshot(&state, Some(store), &lock).await;
97            })
98        }))
99    }
100}
101
102/// Persist the current EventBridge state as a snapshot. Offloads the serde +
103/// blocking file write to the Tokio blocking pool. Noop when `store` is `None`
104/// (memory mode). Shared by `EventBridgeService::save_snapshot` and the
105/// CloudFormation provisioner's post-provision persist hook so both route
106/// through the same serialize-and-write path.
107pub async fn save_eventbridge_snapshot(
108    state: &SharedEventBridgeState,
109    store: Option<Arc<dyn SnapshotStore>>,
110    lock: &AsyncMutex<()>,
111) {
112    let Some(store) = store else {
113        return;
114    };
115    let _guard = lock.lock().await;
116    let snapshot = EventBridgeSnapshot {
117        schema_version: EVENTBRIDGE_SNAPSHOT_SCHEMA_VERSION,
118        accounts: Some(state.read().clone()),
119        state: None,
120    };
121    let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
122        let bytes = serde_json::to_vec(&snapshot)
123            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
124        store.save(&bytes)
125    })
126    .await;
127    match join {
128        Ok(Ok(())) => {}
129        Ok(Err(err)) => tracing::error!(%err, "failed to write eventbridge snapshot"),
130        Err(err) => tracing::error!(%err, "eventbridge snapshot task panicked"),
131    }
132}
133
134#[async_trait]
135impl AwsService for EventBridgeService {
136    fn service_name(&self) -> &str {
137        "events"
138    }
139
140    async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
141        let mutates = is_mutating_action(req.action.as_str());
142        let result = match req.action.as_str() {
143            "CreateEventBus" => self.create_event_bus(&req),
144            "DeleteEventBus" => self.delete_event_bus(&req),
145            "ListEventBuses" => self.list_event_buses(&req),
146            "DescribeEventBus" => self.describe_event_bus(&req),
147            "PutRule" => self.put_rule(&req),
148            "DeleteRule" => self.delete_rule(&req),
149            "ListRules" => self.list_rules(&req),
150            "DescribeRule" => self.describe_rule(&req),
151            "EnableRule" => self.enable_rule(&req),
152            "DisableRule" => self.disable_rule(&req),
153            "PutTargets" => self.put_targets(&req),
154            "RemoveTargets" => self.remove_targets(&req),
155            "ListTargetsByRule" => self.list_targets_by_rule(&req),
156            "ListRuleNamesByTarget" => self.list_rule_names_by_target(&req),
157            "PutEvents" => self.put_events(&req),
158            "PutPermission" => self.put_permission(&req),
159            "RemovePermission" => self.remove_permission(&req),
160            "TagResource" => self.tag_resource(&req),
161            "UntagResource" => self.untag_resource(&req),
162            "ListTagsForResource" => self.list_tags_for_resource(&req),
163            "CreateArchive" => self.create_archive(&req),
164            "DescribeArchive" => self.describe_archive(&req),
165            "ListArchives" => self.list_archives(&req),
166            "UpdateArchive" => self.update_archive(&req),
167            "DeleteArchive" => self.delete_archive(&req),
168            "CreateConnection" => self.create_connection(&req),
169            "DescribeConnection" => self.describe_connection(&req),
170            "ListConnections" => self.list_connections(&req),
171            "UpdateConnection" => self.update_connection(&req),
172            "DeleteConnection" => self.delete_connection(&req),
173            "CreateApiDestination" => self.create_api_destination(&req),
174            "DescribeApiDestination" => self.describe_api_destination(&req),
175            "ListApiDestinations" => self.list_api_destinations(&req),
176            "UpdateApiDestination" => self.update_api_destination(&req),
177            "DeleteApiDestination" => self.delete_api_destination(&req),
178            "StartReplay" => self.start_replay(&req),
179            "DescribeReplay" => self.describe_replay(&req),
180            "ListReplays" => self.list_replays(&req),
181            "CancelReplay" => self.cancel_replay(&req),
182            "CreatePartnerEventSource" => self.create_partner_event_source(&req),
183            "DeletePartnerEventSource" => self.delete_partner_event_source(&req),
184            "DescribePartnerEventSource" => self.describe_partner_event_source(&req),
185            "ListPartnerEventSources" => self.list_partner_event_sources(&req),
186            "ListPartnerEventSourceAccounts" => self.list_partner_event_source_accounts(&req),
187            "ActivateEventSource" => self.activate_event_source(&req),
188            "DeactivateEventSource" => self.deactivate_event_source(&req),
189            "DescribeEventSource" => self.describe_event_source(&req),
190            "ListEventSources" => self.list_event_sources(&req),
191            "PutPartnerEvents" => self.put_partner_events(&req),
192            "TestEventPattern" => self.test_event_pattern(&req),
193            "UpdateEventBus" => self.update_event_bus(&req),
194            "CreateEndpoint" => self.create_endpoint(&req),
195            "DeleteEndpoint" => self.delete_endpoint(&req),
196            "DescribeEndpoint" => self.describe_endpoint(&req),
197            "ListEndpoints" => self.list_endpoints(&req),
198            "UpdateEndpoint" => self.update_endpoint(&req),
199            "DeauthorizeConnection" => self.deauthorize_connection(&req),
200            _ => Err(AwsServiceError::action_not_implemented(
201                "events",
202                &req.action,
203            )),
204        };
205        if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
206            self.save_snapshot().await;
207        }
208        result
209    }
210
211    fn supported_actions(&self) -> &[&str] {
212        &[
213            "CreateEventBus",
214            "DeleteEventBus",
215            "ListEventBuses",
216            "DescribeEventBus",
217            "PutRule",
218            "DeleteRule",
219            "ListRules",
220            "DescribeRule",
221            "EnableRule",
222            "DisableRule",
223            "PutTargets",
224            "RemoveTargets",
225            "ListTargetsByRule",
226            "ListRuleNamesByTarget",
227            "PutEvents",
228            "PutPermission",
229            "RemovePermission",
230            "TagResource",
231            "UntagResource",
232            "ListTagsForResource",
233            "CreateArchive",
234            "DescribeArchive",
235            "ListArchives",
236            "UpdateArchive",
237            "DeleteArchive",
238            "CreateConnection",
239            "DescribeConnection",
240            "ListConnections",
241            "UpdateConnection",
242            "DeleteConnection",
243            "CreateApiDestination",
244            "DescribeApiDestination",
245            "ListApiDestinations",
246            "UpdateApiDestination",
247            "DeleteApiDestination",
248            "StartReplay",
249            "DescribeReplay",
250            "ListReplays",
251            "CancelReplay",
252            "CreatePartnerEventSource",
253            "DeletePartnerEventSource",
254            "DescribePartnerEventSource",
255            "ListPartnerEventSources",
256            "ListPartnerEventSourceAccounts",
257            "ActivateEventSource",
258            "DeactivateEventSource",
259            "DescribeEventSource",
260            "ListEventSources",
261            "PutPartnerEvents",
262            "TestEventPattern",
263            "UpdateEventBus",
264            "CreateEndpoint",
265            "DeleteEndpoint",
266            "DescribeEndpoint",
267            "ListEndpoints",
268            "UpdateEndpoint",
269            "DeauthorizeConnection",
270        ]
271    }
272}
273
274// ─── Event Bus Operations ───────────────────────────────────────────
275impl EventBridgeService {
276    fn create_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
277        let body = req.json_body();
278        validate_required("Name", &body["Name"])?;
279        let name = body["Name"]
280            .as_str()
281            .ok_or_else(|| missing("Name"))?
282            .to_string();
283        validate_string_length("name", &name, 1, 256)?;
284        validate_optional_string_length(
285            "eventSourceName",
286            body["EventSourceName"].as_str(),
287            1,
288            256,
289        )?;
290        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
291        validate_optional_string_length(
292            "kmsKeyIdentifier",
293            body["KmsKeyIdentifier"].as_str(),
294            0,
295            2048,
296        )?;
297
298        // Validate name doesn't contain '/' (unless partner bus)
299        if name.contains('/') && !name.starts_with("aws.partner/") {
300            return Err(AwsServiceError::aws_error(
301                StatusCode::BAD_REQUEST,
302                "ValidationException",
303                "Event bus name must not contain '/'.",
304            ));
305        }
306
307        // Partner event bus validation
308        if name.starts_with("aws.partner/") {
309            let event_source = body["EventSourceName"].as_str().unwrap_or("");
310            let accounts_r = self.state.read();
311            let empty_r = EventBridgeState::new(&req.account_id, &req.region);
312            let state_r = accounts_r.get(&req.account_id).unwrap_or(&empty_r);
313            let has_source = state_r.partner_event_sources.contains_key(event_source);
314            drop(accounts_r);
315            if !has_source {
316                return Err(AwsServiceError::aws_error(
317                    StatusCode::BAD_REQUEST,
318                    "ResourceNotFoundException",
319                    format!("Event source {event_source} does not exist."),
320                ));
321            }
322        }
323
324        let mut accounts = self.state.write();
325        let state = accounts.get_or_create(&req.account_id);
326
327        if state.buses.contains_key(&name) {
328            return Err(AwsServiceError::aws_error(
329                StatusCode::BAD_REQUEST,
330                "ResourceAlreadyExistsException",
331                format!("Event bus {name} already exists."),
332            ));
333        }
334
335        let arn = format!(
336            "arn:aws:events:{}:{}:event-bus/{}",
337            req.region, state.account_id, name
338        );
339        let now = Utc::now();
340        let description = body["Description"].as_str().map(|s| s.to_string());
341        let kms_key_identifier = body["KmsKeyIdentifier"].as_str().map(|s| s.to_string());
342        let dead_letter_config = body.get("DeadLetterConfig").cloned();
343
344        let tags = parse_tags(&body);
345
346        let bus = EventBus {
347            name: name.clone(),
348            arn: arn.clone(),
349            tags,
350            policy: None,
351            description,
352            kms_key_identifier,
353            dead_letter_config,
354            creation_time: now,
355            last_modified_time: now,
356        };
357        state.buses.insert(name, bus);
358
359        Ok(AwsResponse::ok_json(json!({ "EventBusArn": arn })))
360    }
361
362    fn delete_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
363        let body = req.json_body();
364        validate_required("Name", &body["Name"])?;
365        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
366        validate_string_length("name", name, 1, 256)?;
367
368        if name == "default" {
369            return Err(AwsServiceError::aws_error(
370                StatusCode::BAD_REQUEST,
371                "ValidationException",
372                format!("Cannot delete event bus {name}."),
373            ));
374        }
375
376        let mut accounts = self.state.write();
377        let state = accounts.get_or_create(&req.account_id);
378        state.buses.remove(name);
379        state.rules.retain(|k, _| k.0 != name);
380
381        Ok(AwsResponse::ok_json(json!({})))
382    }
383
384    fn list_event_buses(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
385        let body = req.json_body();
386        // Smithy ListEventBusesRequest constraints:
387        //   NamePrefix: EventBusName length 1..=256
388        //   NextToken: length 1..=2048
389        //   Limit: LimitMax100 range 1..=100
390        // Unrecognised pagination tokens still fall back to the start of the
391        // list — `InvalidNextTokenException` only fires when the token shape
392        // itself is wrong, not when it points at a vanished cursor.
393        validate_optional_string_length_value("NamePrefix", &body["NamePrefix"], 1, 256)?;
394        validate_optional_string_length_value("NextToken", &body["NextToken"], 1, 2048)?;
395        validate_optional_json_range("Limit", &body["Limit"], 1, 100)?;
396        let name_prefix = body["NamePrefix"].as_str();
397        let limit = body["Limit"].as_i64().unwrap_or(100).clamp(1, 100) as usize;
398
399        let accounts = self.state.read();
400        let empty = EventBridgeState::new(&req.account_id, &req.region);
401        let state = accounts.get(&req.account_id).unwrap_or(&empty);
402        let filtered: Vec<&_> = state
403            .buses
404            .values()
405            .filter(|b| match name_prefix {
406                Some(prefix) => b.name.starts_with(prefix),
407                None => true,
408            })
409            .collect();
410
411        let (page, next_token) = paginate(&filtered, body["NextToken"].as_str(), limit);
412        let buses: Vec<Value> = page
413            .iter()
414            .map(|b| json!({ "Name": b.name, "Arn": b.arn }))
415            .collect();
416        let mut resp = json!({ "EventBuses": buses });
417        if let Some(token) = next_token {
418            resp["NextToken"] = json!(token);
419        }
420
421        Ok(AwsResponse::ok_json(resp))
422    }
423
424    fn describe_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
425        let body = req.json_body();
426        validate_optional_string_length("name", body["Name"].as_str(), 1, 1600)?;
427        let name = body["Name"].as_str().unwrap_or("default");
428
429        let accounts = self.state.read();
430        let empty = EventBridgeState::new(&req.account_id, &req.region);
431        let state = accounts.get(&req.account_id).unwrap_or(&empty);
432        let bus = state.buses.get(name).ok_or_else(|| {
433            AwsServiceError::aws_error(
434                StatusCode::BAD_REQUEST,
435                "ResourceNotFoundException",
436                format!("Event bus {name} does not exist."),
437            )
438        })?;
439
440        let mut resp = json!({
441            "Name": bus.name,
442            "Arn": bus.arn,
443            "CreationTime": bus.creation_time.timestamp() as f64,
444            "LastModifiedTime": bus.last_modified_time.timestamp() as f64,
445        });
446
447        if let Some(ref policy) = bus.policy {
448            resp["Policy"] = Value::String(serde_json::to_string(policy).unwrap());
449        }
450        if let Some(ref desc) = bus.description {
451            resp["Description"] = json!(desc);
452        }
453        if let Some(ref kms) = bus.kms_key_identifier {
454            resp["KmsKeyIdentifier"] = json!(kms);
455        }
456        if let Some(ref dlc) = bus.dead_letter_config {
457            resp["DeadLetterConfig"] = dlc.clone();
458        }
459
460        Ok(AwsResponse::ok_json(resp))
461    }
462
463    // ─── Permission Operations ──────────────────────────────────────────
464
465    fn put_permission(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
466        let body = req.json_body();
467        // Smithy PutPermissionRequest constraints (optional members but each
468        // carries a `@length` trait that must be honoured when present):
469        //   EventBusName: NonPartnerEventBusName length 1..=256
470        //   Action: length 1..=64
471        //   Principal: length 1..=12 (12-digit AWS account or `*`)
472        //   StatementId: length 1..=64
473        validate_optional_string_length_value("EventBusName", &body["EventBusName"], 1, 256)?;
474        validate_optional_string_length_value("Action", &body["Action"], 1, 64)?;
475        validate_optional_string_length_value("Principal", &body["Principal"], 1, 12)?;
476        validate_optional_string_length_value("StatementId", &body["StatementId"], 1, 64)?;
477        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
478
479        let mut accounts = self.state.write();
480        let state = accounts.get_or_create(&req.account_id);
481
482        let bus = state.buses.get_mut(event_bus_name).ok_or_else(|| {
483            AwsServiceError::aws_error(
484                StatusCode::BAD_REQUEST,
485                "ResourceNotFoundException",
486                format!("Event bus {event_bus_name} does not exist."),
487            )
488        })?;
489
490        // Check if Policy is provided (new-style)
491        if let Some(policy_str) = body["Policy"].as_str() {
492            if let Ok(policy) = serde_json::from_str::<Value>(policy_str) {
493                bus.policy = Some(policy);
494                return Ok(AwsResponse::ok_json(json!({})));
495            }
496        }
497
498        // Old-style: Action, Principal, StatementId. All are @optional in the
499        // Smithy model; non-string values were already rejected above with
500        // SerializationException, so reaching here means each is either a
501        // valid string or absent. Fall back to "" to preserve current behavior
502        // — recording an empty-statement policy entry is harmless since it can
503        // never match a real action/principal pair.
504        let action = body["Action"].as_str().unwrap_or("");
505        let principal = body["Principal"].as_str().unwrap_or("");
506        let statement_id = body["StatementId"].as_str().unwrap_or("");
507
508        // Note: real AWS does enforce a small allow-list on `Action`, but
509        // PutPermission's Smithy model only declares ResourceNotFoundException,
510        // PolicyLengthExceededException, ConcurrentModificationException,
511        // OperationDisabledException, and InternalException. We accept any
512        // action string and just record the statement.
513        let statement = json!({
514            "Sid": statement_id,
515            "Effect": "Allow",
516            "Principal": { "AWS": Arn::global("iam", principal, "root").to_string() },
517            "Action": action,
518            "Resource": bus.arn,
519        });
520
521        let policy = bus.policy.get_or_insert_with(|| {
522            json!({
523                "Version": "2012-10-17",
524                "Statement": [],
525            })
526        });
527
528        if let Some(stmts) = policy["Statement"].as_array_mut() {
529            stmts.push(statement);
530        }
531
532        Ok(AwsResponse::ok_json(json!({})))
533    }
534
535    fn remove_permission(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
536        let body = req.json_body();
537        validate_optional_string_length("statementId", body["StatementId"].as_str(), 1, 64)?;
538        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 256)?;
539        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
540        let statement_id = body["StatementId"].as_str().unwrap_or("");
541        let remove_all = body["RemoveAllPermissions"].as_bool().unwrap_or(false);
542
543        let mut accounts = self.state.write();
544        let state = accounts.get_or_create(&req.account_id);
545
546        let bus = state.buses.get_mut(event_bus_name).ok_or_else(|| {
547            AwsServiceError::aws_error(
548                StatusCode::BAD_REQUEST,
549                "ResourceNotFoundException",
550                format!("Event bus {event_bus_name} does not exist."),
551            )
552        })?;
553
554        if remove_all {
555            bus.policy = None;
556            return Ok(AwsResponse::ok_json(json!({})));
557        }
558
559        let policy = bus.policy.as_mut().ok_or_else(|| {
560            AwsServiceError::aws_error(
561                StatusCode::BAD_REQUEST,
562                "ResourceNotFoundException",
563                "EventBus does not have a policy.",
564            )
565        })?;
566
567        if let Some(stmts) = policy["Statement"].as_array_mut() {
568            let before = stmts.len();
569            stmts.retain(|s| s["Sid"].as_str() != Some(statement_id));
570            if stmts.len() == before {
571                return Err(AwsServiceError::aws_error(
572                    StatusCode::BAD_REQUEST,
573                    "ResourceNotFoundException",
574                    "Statement with the provided id does not exist.",
575                ));
576            }
577            if stmts.is_empty() {
578                bus.policy = None;
579            }
580        }
581
582        Ok(AwsResponse::ok_json(json!({})))
583    }
584
585    // ─── Rule Operations ────────────────────────────────────────────────
586
587    fn put_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
588        let body = req.json_body();
589        // Smithy PutRuleRequest constraints:
590        //   Name: RuleName length 1..=64, @required
591        //   ScheduleExpression: length 0..=256
592        //   EventPattern: length 0..=4096 (raw JSON, separate
593        //     InvalidEventPatternException still applies to syntax)
594        //   State: RuleState enum {ENABLED, DISABLED,
595        //          ENABLED_WITH_ALL_CLOUDTRAIL_MANAGEMENT_EVENTS}
596        //   Description: RuleDescription length 0..=512
597        //   RoleArn: length 1..=1600
598        //   EventBusName: EventBusNameOrArn length 1..=1600
599        validate_required("Name", &body["Name"])?;
600        let name = body["Name"]
601            .as_str()
602            .ok_or_else(|| missing("Name"))?
603            .to_string();
604        validate_string_length("Name", &name, 1, 64)?;
605        validate_optional_string_length_value(
606            "ScheduleExpression",
607            &body["ScheduleExpression"],
608            0,
609            256,
610        )?;
611        validate_optional_string_length_value("EventPattern", &body["EventPattern"], 0, 4096)?;
612        validate_optional_enum_value(
613            "State",
614            &body["State"],
615            &[
616                "ENABLED",
617                "DISABLED",
618                "ENABLED_WITH_ALL_CLOUDTRAIL_MANAGEMENT_EVENTS",
619            ],
620        )?;
621        validate_optional_string_length_value("Description", &body["Description"], 0, 512)?;
622        validate_optional_string_length_value("RoleArn", &body["RoleArn"], 1, 1600)?;
623        validate_optional_string_length_value("EventBusName", &body["EventBusName"], 1, 1600)?;
624
625        let raw_bus = body["EventBusName"]
626            .as_str()
627            .unwrap_or("default")
628            .to_string();
629
630        let mut accounts = self.state.write();
631        let state = accounts.get_or_create(&req.account_id);
632        let event_bus_name = state.resolve_bus_name(&raw_bus);
633
634        let event_pattern = body["EventPattern"].as_str().and_then(|s| {
635            if s.is_empty() {
636                None
637            } else {
638                Some(s.to_string())
639            }
640        });
641        let schedule_expression = body["ScheduleExpression"].as_str().and_then(|s| {
642            if s.is_empty() {
643                None
644            } else {
645                Some(s.to_string())
646            }
647        });
648        let description = body["Description"].as_str().map(|s| s.to_string());
649        let role_arn = body["RoleArn"].as_str().map(|s| s.to_string());
650        let rule_state = body["State"].as_str().unwrap_or("ENABLED").to_string();
651
652        // Note: real AWS rejects ScheduleExpression on a non-default bus, but
653        // PutRule's Smithy model only declares InvalidEventPatternException
654        // for input-shape problems, not ValidationException. We accept the
655        // value and let the scheduler ignore it on non-default buses.
656
657        if !state.buses.contains_key(&event_bus_name) {
658            return Err(AwsServiceError::aws_error(
659                StatusCode::BAD_REQUEST,
660                "ResourceNotFoundException",
661                format!("Event bus {event_bus_name} does not exist."),
662            ));
663        }
664
665        let arn = if event_bus_name == "default" {
666            format!(
667                "arn:aws:events:{}:{}:rule/{}",
668                req.region, state.account_id, name
669            )
670        } else {
671            format!(
672                "arn:aws:events:{}:{}:rule/{}/{}",
673                req.region, state.account_id, event_bus_name, name
674            )
675        };
676
677        let key = (event_bus_name.clone(), name.clone());
678        let targets = state
679            .rules
680            .get(&key)
681            .map(|r| r.targets.clone())
682            .unwrap_or_default();
683
684        let tags = parse_tags(&body);
685
686        let rule = EventRule {
687            name: name.clone(),
688            arn: arn.clone(),
689            event_bus_name,
690            event_pattern,
691            schedule_expression,
692            state: rule_state,
693            description,
694            role_arn,
695            managed_by: None,
696            created_by: None,
697            targets,
698            tags,
699            last_fired: None,
700        };
701
702        state.rules.insert(key, rule);
703        Ok(AwsResponse::ok_json(json!({ "RuleArn": arn })))
704    }
705
706    fn delete_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
707        let body = req.json_body();
708        validate_required("Name", &body["Name"])?;
709        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
710        validate_string_length("name", name, 1, 64)?;
711        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
712        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
713
714        let mut accounts = self.state.write();
715        let state = accounts.get_or_create(&req.account_id);
716        let bus_name = state.resolve_bus_name(event_bus_name);
717        let key = (bus_name, name.to_string());
718
719        // Check if rule has targets
720        if let Some(rule) = state.rules.get(&key) {
721            if !rule.targets.is_empty() {
722                return Err(AwsServiceError::aws_error(
723                    StatusCode::BAD_REQUEST,
724                    "ValidationException",
725                    "Rule can't be deleted since it has targets.",
726                ));
727            }
728        }
729
730        state.rules.remove(&key);
731        Ok(AwsResponse::ok_json(json!({})))
732    }
733
734    fn list_rules(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
735        let body = req.json_body();
736        validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 64)?;
737        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
738        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
739        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
740        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
741        let name_prefix = body["NamePrefix"].as_str();
742        let limit = body["Limit"].as_u64().map(|n| n as usize);
743        let next_token = body["NextToken"].as_str();
744
745        let accounts = self.state.read();
746        let empty = EventBridgeState::new(&req.account_id, &req.region);
747        let state = accounts.get(&req.account_id).unwrap_or(&empty);
748        let bus_name = state.resolve_bus_name(event_bus_name);
749
750        let mut rules: Vec<&EventRule> = state
751            .rules
752            .values()
753            .filter(|r| r.event_bus_name == bus_name)
754            .filter(|r| match name_prefix {
755                Some(prefix) => r.name.starts_with(prefix),
756                None => true,
757            })
758            .collect();
759        rules.sort_by(|a, b| a.name.cmp(&b.name));
760
761        // Pagination
762        let start = next_token
763            .and_then(|t| t.parse::<usize>().ok())
764            .unwrap_or(0)
765            .min(rules.len());
766        let rules_slice = &rules[start..];
767
768        let (page, new_next_token) = if let Some(lim) = limit {
769            if rules_slice.len() > lim {
770                (&rules_slice[..lim], Some((start + lim).to_string()))
771            } else {
772                (rules_slice, None)
773            }
774        } else {
775            (rules_slice, None)
776        };
777
778        let rules_json: Vec<Value> = page
779            .iter()
780            .map(|r| {
781                let mut obj = json!({
782                    "Name": r.name,
783                    "Arn": r.arn,
784                    "EventBusName": r.event_bus_name,
785                    "State": r.state,
786                });
787                if let Some(ref desc) = r.description {
788                    obj["Description"] = json!(desc);
789                }
790                if let Some(ref ep) = r.event_pattern {
791                    obj["EventPattern"] = json!(ep);
792                }
793                if let Some(ref se) = r.schedule_expression {
794                    obj["ScheduleExpression"] = json!(se);
795                }
796                if let Some(ref role) = r.role_arn {
797                    obj["RoleArn"] = json!(role);
798                }
799                if let Some(ref mb) = r.managed_by {
800                    obj["ManagedBy"] = json!(mb);
801                }
802                obj
803            })
804            .collect();
805
806        let mut resp = json!({ "Rules": rules_json });
807        if let Some(token) = new_next_token {
808            resp["NextToken"] = json!(token);
809        }
810
811        Ok(AwsResponse::ok_json(resp))
812    }
813
814    fn describe_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
815        let body = req.json_body();
816        validate_required("Name", &body["Name"])?;
817        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
818        validate_string_length("name", name, 1, 64)?;
819        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
820        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
821
822        let accounts = self.state.read();
823        let empty = EventBridgeState::new(&req.account_id, &req.region);
824        let state = accounts.get(&req.account_id).unwrap_or(&empty);
825        let bus_name = state.resolve_bus_name(event_bus_name);
826        let key = (bus_name.clone(), name.to_string());
827
828        let rule = state.rules.get(&key).ok_or_else(|| {
829            AwsServiceError::aws_error(
830                StatusCode::BAD_REQUEST,
831                "ResourceNotFoundException",
832                format!("Rule {name} does not exist."),
833            )
834        })?;
835
836        let mut resp = json!({
837            "Name": rule.name,
838            "Arn": rule.arn,
839            "EventBusName": rule.event_bus_name,
840            "State": rule.state,
841        });
842
843        if let Some(ref desc) = rule.description {
844            resp["Description"] = json!(desc);
845        }
846        if let Some(ref ep) = rule.event_pattern {
847            resp["EventPattern"] = json!(ep);
848        }
849        if let Some(ref se) = rule.schedule_expression {
850            resp["ScheduleExpression"] = json!(se);
851        }
852        if let Some(ref role) = rule.role_arn {
853            resp["RoleArn"] = json!(role);
854        }
855        if let Some(ref mb) = rule.managed_by {
856            resp["ManagedBy"] = json!(mb);
857        }
858        if let Some(ref cb) = rule.created_by {
859            resp["CreatedBy"] = json!(cb);
860        }
861        // If non-default bus, set CreatedBy to account_id
862        if rule.event_bus_name != "default" && rule.created_by.is_none() {
863            resp["CreatedBy"] = json!(state.account_id);
864        }
865
866        Ok(AwsResponse::ok_json(resp))
867    }
868
869    fn enable_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
870        let body = req.json_body();
871        validate_required("Name", &body["Name"])?;
872        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
873        validate_string_length("name", name, 1, 64)?;
874        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
875        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
876
877        let mut accounts = self.state.write();
878        let state = accounts.get_or_create(&req.account_id);
879        let bus_name = state.resolve_bus_name(event_bus_name);
880        let key = (bus_name, name.to_string());
881
882        let rule = state.rules.get_mut(&key).ok_or_else(|| {
883            AwsServiceError::aws_error(
884                StatusCode::BAD_REQUEST,
885                "ResourceNotFoundException",
886                format!("Rule {name} does not exist."),
887            )
888        })?;
889
890        rule.state = "ENABLED".to_string();
891        Ok(AwsResponse::ok_json(json!({})))
892    }
893
894    fn disable_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
895        let body = req.json_body();
896        validate_required("Name", &body["Name"])?;
897        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
898        validate_string_length("name", name, 1, 64)?;
899        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
900        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
901
902        let mut accounts = self.state.write();
903        let state = accounts.get_or_create(&req.account_id);
904        let bus_name = state.resolve_bus_name(event_bus_name);
905        let key = (bus_name, name.to_string());
906
907        let rule = state.rules.get_mut(&key).ok_or_else(|| {
908            AwsServiceError::aws_error(
909                StatusCode::BAD_REQUEST,
910                "ResourceNotFoundException",
911                format!("Rule {name} does not exist."),
912            )
913        })?;
914
915        rule.state = "DISABLED".to_string();
916        Ok(AwsResponse::ok_json(json!({})))
917    }
918
919    // ─── Target Operations ──────────────────────────────────────────────
920
921    fn put_targets(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
922        let body = req.json_body();
923        // Smithy PutTargetsRequest constraints (top-level shape):
924        //   Rule: RuleName @required length 1..=64
925        //   EventBusName: EventBusNameOrArn length 1..=1600
926        //   Targets: TargetList @required length 1..=100
927        // Per-target validation still flows through `FailedEntries` (matching
928        // AWS); only the top-level shape produces ValidationException.
929        validate_required("Rule", &body["Rule"])?;
930        let rule_name = body["Rule"].as_str().ok_or_else(|| missing("Rule"))?;
931        validate_string_length("Rule", rule_name, 1, 64)?;
932        validate_optional_string_length_value("EventBusName", &body["EventBusName"], 1, 1600)?;
933        // Targets is @required with TargetList length 1..=100 in the Smithy
934        // model. Real AWS rejects empty / oversized lists with a
935        // ValidationException; per-target validation continues to flow
936        // through FailedEntries (matching AWS).
937        validate_required("Targets", &body["Targets"])?;
938        let targets_array = body["Targets"].as_array().ok_or_else(|| {
939            AwsServiceError::aws_error(
940                StatusCode::BAD_REQUEST,
941                "ValidationException",
942                "Targets must be a list",
943            )
944        })?;
945        if targets_array.is_empty() || targets_array.len() > 100 {
946            return Err(AwsServiceError::aws_error(
947                StatusCode::BAD_REQUEST,
948                "ValidationException",
949                "Value at 'Targets' failed to satisfy constraint: \
950                 Member must have length between 1 and 100",
951            ));
952        }
953        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
954        let targets: Vec<Value> = targets_array.clone();
955
956        let mut accounts = self.state.write();
957        let state = accounts.get_or_create(&req.account_id);
958        let bus_name = state.resolve_bus_name(event_bus_name);
959        let key = (bus_name.clone(), rule_name.to_string());
960
961        let rule = state.rules.get_mut(&key).ok_or_else(|| {
962            AwsServiceError::aws_error(
963                StatusCode::BAD_REQUEST,
964                "ResourceNotFoundException",
965                format!("Rule {rule_name} does not exist on EventBus {bus_name}."),
966            )
967        })?;
968
969        let mut failed_entries: Vec<Value> = Vec::new();
970        for target in &targets {
971            let target_id = target["Id"].as_str().unwrap_or("").to_string();
972            let target_arn = target["Arn"].as_str().unwrap_or("");
973
974            if target_arn.ends_with(".fifo") && target.get("SqsParameters").is_none() {
975                failed_entries.push(json!({
976                    "TargetId": target_id,
977                    "ErrorCode": "ValidationException",
978                    "ErrorMessage": format!(
979                        "Parameter(s) SqsParameters must be specified for target: {target_id}."
980                    ),
981                }));
982                continue;
983            }
984            if !target_arn.starts_with("arn:") {
985                failed_entries.push(json!({
986                    "TargetId": target_id,
987                    "ErrorCode": "ValidationException",
988                    "ErrorMessage": format!(
989                        "Parameter {target_arn} is not valid. Reason: Provided Arn is not in correct format."
990                    ),
991                }));
992                continue;
993            }
994
995            let et = parse_target(target);
996            rule.targets.retain(|t| t.id != et.id);
997            rule.targets.push(et);
998        }
999
1000        Ok(AwsResponse::ok_json(json!({
1001            "FailedEntryCount": failed_entries.len(),
1002            "FailedEntries": failed_entries,
1003        })))
1004    }
1005
1006    fn remove_targets(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1007        let body = req.json_body();
1008        validate_required("Rule", &body["Rule"])?;
1009        let rule_name = body["Rule"].as_str().ok_or_else(|| missing("Rule"))?;
1010        validate_string_length("rule", rule_name, 1, 64)?;
1011        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
1012        validate_required("Ids", &body["Ids"])?;
1013        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
1014        let ids = body["Ids"].as_array().ok_or_else(|| missing("Ids"))?;
1015
1016        let target_ids: Vec<String> = ids
1017            .iter()
1018            .filter_map(|v| v.as_str().map(|s| s.to_string()))
1019            .collect();
1020
1021        let mut accounts = self.state.write();
1022        let state = accounts.get_or_create(&req.account_id);
1023        let bus_name = state.resolve_bus_name(event_bus_name);
1024        let key = (bus_name.clone(), rule_name.to_string());
1025
1026        let rule = state.rules.get_mut(&key).ok_or_else(|| {
1027            AwsServiceError::aws_error(
1028                StatusCode::BAD_REQUEST,
1029                "ResourceNotFoundException",
1030                format!("Rule {rule_name} does not exist on EventBus {bus_name}."),
1031            )
1032        })?;
1033
1034        rule.targets.retain(|t| !target_ids.contains(&t.id));
1035
1036        Ok(AwsResponse::ok_json(json!({
1037            "FailedEntryCount": 0,
1038            "FailedEntries": [],
1039        })))
1040    }
1041
1042    fn list_targets_by_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1043        let body = req.json_body();
1044        validate_required("Rule", &body["Rule"])?;
1045        let rule_name = body["Rule"].as_str().ok_or_else(|| missing("Rule"))?;
1046        validate_string_length("rule", rule_name, 1, 64)?;
1047        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
1048        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1049        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1050        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
1051        let limit = body["Limit"].as_u64().map(|n| n as usize);
1052        let next_token = body["NextToken"].as_str();
1053
1054        let accounts = self.state.read();
1055        let empty = EventBridgeState::new(&req.account_id, &req.region);
1056        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1057        let bus_name = state.resolve_bus_name(event_bus_name);
1058        let key = (bus_name, rule_name.to_string());
1059
1060        let rule = state.rules.get(&key).ok_or_else(|| {
1061            AwsServiceError::aws_error(
1062                StatusCode::BAD_REQUEST,
1063                "ResourceNotFoundException",
1064                format!("Rule {rule_name} does not exist."),
1065            )
1066        })?;
1067
1068        let all_targets = &rule.targets;
1069        let start = next_token
1070            .and_then(|t| t.parse::<usize>().ok())
1071            .unwrap_or(0)
1072            .min(all_targets.len());
1073        let slice = &all_targets[start..];
1074
1075        let (page, new_next_token) = if let Some(lim) = limit {
1076            if slice.len() > lim {
1077                (&slice[..lim], Some((start + lim).to_string()))
1078            } else {
1079                (slice, None)
1080            }
1081        } else {
1082            (slice, None)
1083        };
1084
1085        let targets: Vec<Value> = page.iter().map(target_to_json).collect();
1086
1087        let mut resp = json!({ "Targets": targets });
1088        if let Some(token) = new_next_token {
1089            resp["NextToken"] = json!(token);
1090        }
1091
1092        Ok(AwsResponse::ok_json(resp))
1093    }
1094
1095    fn list_rule_names_by_target(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1096        let body = req.json_body();
1097        validate_required("TargetArn", &body["TargetArn"])?;
1098        let target_arn = body["TargetArn"]
1099            .as_str()
1100            .ok_or_else(|| missing("TargetArn"))?;
1101        validate_string_length("targetArn", target_arn, 1, 1600)?;
1102        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
1103        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1104        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1105        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
1106        let limit = body["Limit"].as_u64().map(|n| n as usize);
1107        let next_token = body["NextToken"].as_str();
1108
1109        let accounts = self.state.read();
1110        let empty = EventBridgeState::new(&req.account_id, &req.region);
1111        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1112        let bus_name = state.resolve_bus_name(event_bus_name);
1113
1114        // Deduplicate rule names
1115        let mut rule_names: Vec<String> = Vec::new();
1116        for rule in state.rules.values() {
1117            if rule.event_bus_name == bus_name
1118                && rule.targets.iter().any(|t| t.arn == target_arn)
1119                && !rule_names.contains(&rule.name)
1120            {
1121                rule_names.push(rule.name.clone());
1122            }
1123        }
1124        rule_names.sort();
1125
1126        let start = next_token
1127            .and_then(|t| t.parse::<usize>().ok())
1128            .unwrap_or(0)
1129            .min(rule_names.len());
1130        let slice = &rule_names[start..];
1131
1132        let (page, new_next_token) = if let Some(lim) = limit {
1133            if slice.len() > lim {
1134                (&slice[..lim], Some((start + lim).to_string()))
1135            } else {
1136                (slice, None)
1137            }
1138        } else {
1139            (slice, None)
1140        };
1141
1142        let mut resp = json!({ "RuleNames": page });
1143        if let Some(token) = new_next_token {
1144            resp["NextToken"] = json!(token);
1145        }
1146
1147        Ok(AwsResponse::ok_json(resp))
1148    }
1149
1150    // ─── Partner Event Sources ────────────���───────────────────────────
1151
1152    fn test_event_pattern(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1153        let body = req.json_body();
1154        validate_required("EventPattern", &body["EventPattern"])?;
1155        validate_required("Event", &body["Event"])?;
1156        let event_pattern = body["EventPattern"]
1157            .as_str()
1158            .ok_or_else(|| missing("EventPattern"))?;
1159        let event_str = body["Event"].as_str().ok_or_else(|| missing("Event"))?;
1160
1161        // Parse the event JSON
1162        let event: Value = serde_json::from_str(event_str).map_err(|_| {
1163            AwsServiceError::aws_error(
1164                StatusCode::BAD_REQUEST,
1165                "InvalidEventPatternException",
1166                "Event is not valid JSON.",
1167            )
1168        })?;
1169
1170        // Parse the pattern JSON
1171        let _pattern: Value = serde_json::from_str(event_pattern).map_err(|_| {
1172            AwsServiceError::aws_error(
1173                StatusCode::BAD_REQUEST,
1174                "InvalidEventPatternException",
1175                "Event pattern is not valid JSON.",
1176            )
1177        })?;
1178
1179        let source = event["source"].as_str().unwrap_or("");
1180        let detail_type = event["detail-type"].as_str().unwrap_or("");
1181        let detail = event
1182            .get("detail")
1183            .map(|v| serde_json::to_string(v).unwrap_or_default())
1184            .unwrap_or_else(|| "{}".to_string());
1185        let account = event["account"].as_str().unwrap_or("");
1186        let region = event["region"].as_str().unwrap_or("");
1187        let resources: Vec<String> = event["resources"]
1188            .as_array()
1189            .map(|arr| {
1190                arr.iter()
1191                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
1192                    .collect()
1193            })
1194            .unwrap_or_default();
1195
1196        let result = matches_pattern(
1197            Some(event_pattern),
1198            source,
1199            detail_type,
1200            &detail,
1201            account,
1202            region,
1203            &resources,
1204        );
1205
1206        Ok(AwsResponse::ok_json(json!({ "Result": result })))
1207    }
1208
1209    // ─── UpdateEventBus ─────────────────────────────────────────────────
1210
1211    fn update_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1212        let body = req.json_body();
1213        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
1214        validate_optional_string_length(
1215            "kmsKeyIdentifier",
1216            body["KmsKeyIdentifier"].as_str(),
1217            0,
1218            2048,
1219        )?;
1220        let name = body["Name"].as_str().unwrap_or("default");
1221
1222        let mut accounts = self.state.write();
1223        let state = accounts.get_or_create(&req.account_id);
1224        let bus = state.buses.get_mut(name).ok_or_else(|| {
1225            AwsServiceError::aws_error(
1226                StatusCode::BAD_REQUEST,
1227                "ResourceNotFoundException",
1228                format!("Event bus {name} does not exist."),
1229            )
1230        })?;
1231
1232        if let Some(desc) = body["Description"].as_str() {
1233            bus.description = Some(desc.to_string());
1234        }
1235        if let Some(kms) = body["KmsKeyIdentifier"].as_str() {
1236            bus.kms_key_identifier = Some(kms.to_string());
1237        }
1238        if let Some(dlc) = body.get("DeadLetterConfig") {
1239            bus.dead_letter_config = Some(dlc.clone());
1240        }
1241        bus.last_modified_time = Utc::now();
1242
1243        let arn = bus.arn.clone();
1244        let bus_name = bus.name.clone();
1245
1246        Ok(AwsResponse::ok_json(json!({
1247            "Arn": arn,
1248            "Name": bus_name,
1249        })))
1250    }
1251
1252    // ─── Endpoint Operations ────────────────────────────────────────────
1253
1254    fn put_events(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1255        let body = req.json_body();
1256        // Smithy PutEventsRequest constraints:
1257        //   EndpointId: length 1..=50
1258        //   Entries: @required, length 1..=10
1259        validate_optional_string_length_value("EndpointId", &body["EndpointId"], 1, 50)?;
1260        validate_required("Entries", &body["Entries"])?;
1261        let entries_array = body["Entries"].as_array().ok_or_else(|| {
1262            AwsServiceError::aws_error(
1263                StatusCode::BAD_REQUEST,
1264                "ValidationException",
1265                "Entries must be a list",
1266            )
1267        })?;
1268        if entries_array.is_empty() || entries_array.len() > 10 {
1269            return Err(AwsServiceError::aws_error(
1270                StatusCode::BAD_REQUEST,
1271                "ValidationException",
1272                "Value at 'Entries' failed to satisfy constraint: \
1273                 Member must have length between 1 and 10",
1274            ));
1275        }
1276        let entries: Vec<Value> = entries_array.clone();
1277        let entries = &entries;
1278
1279        let mut accounts = self.state.write();
1280        let state = accounts.get_or_create(&req.account_id);
1281        let mut result_entries = Vec::new();
1282        let mut events_to_deliver = Vec::new();
1283        let mut failed_count = 0;
1284
1285        for entry in entries {
1286            let source = entry["Source"].as_str().unwrap_or("").to_string();
1287            let detail_type = entry["DetailType"].as_str().unwrap_or("").to_string();
1288            let detail = entry["Detail"].as_str().unwrap_or("").to_string();
1289
1290            if let Err(error) = validate_put_events_entry(&source, &detail_type, &detail) {
1291                failed_count += 1;
1292                result_entries.push(error);
1293                continue;
1294            }
1295
1296            let event_id = uuid::Uuid::new_v4().to_string();
1297            let raw_bus = entry["EventBusName"]
1298                .as_str()
1299                .unwrap_or("default")
1300                .to_string();
1301            let event_bus_name = state.resolve_bus_name(&raw_bus);
1302
1303            // Bus resource-policy gate. AWS evaluates the bus's
1304            // resource policy against cross-account callers; same-account
1305            // callers always have access. The policy itself is JSON
1306            // stored as serde_json::Value so the IAM evaluator parses
1307            // it the same way it parses an S3 bucket policy.
1308            let caller_account = req
1309                .principal
1310                .as_ref()
1311                .map(|p| p.account_id.as_str())
1312                .unwrap_or(req.account_id.as_str());
1313            if caller_account != req.account_id {
1314                let bus_policy_value = state
1315                    .buses
1316                    .get(&event_bus_name)
1317                    .and_then(|b| b.policy.clone());
1318                if let Some(policy_value) = bus_policy_value {
1319                    let policy_json = serde_json::to_string(&policy_value).unwrap_or_default();
1320                    let policy_doc = fakecloud_iam::evaluator::PolicyDocument::parse(&policy_json);
1321                    let bus_arn = state
1322                        .buses
1323                        .get(&event_bus_name)
1324                        .map(|b| b.arn.clone())
1325                        .unwrap_or_default();
1326                    let principal =
1327                        req.principal
1328                            .clone()
1329                            .unwrap_or_else(|| fakecloud_core::auth::Principal {
1330                                arn: Arn::global("iam", caller_account, "root").to_string(),
1331                                user_id: caller_account.to_string(),
1332                                account_id: caller_account.to_string(),
1333                                principal_type: fakecloud_core::auth::PrincipalType::Root,
1334                                source_identity: None,
1335                                tags: None,
1336                            });
1337                    let context = fakecloud_iam::evaluator::RequestContext {
1338                        aws_principal_arn: Some(principal.arn.clone()),
1339                        aws_principal_account: Some(principal.account_id.clone()),
1340                        ..Default::default()
1341                    };
1342                    let eval_req = fakecloud_iam::evaluator::EvalRequest {
1343                        principal: &principal,
1344                        action: "events:PutEvents".to_string(),
1345                        resource: bus_arn,
1346                        context,
1347                    };
1348                    let decision = fakecloud_iam::evaluator::evaluate_resource_policy_only(
1349                        &policy_doc,
1350                        &eval_req,
1351                    );
1352                    if !matches!(decision, fakecloud_iam::evaluator::Decision::Allow) {
1353                        failed_count += 1;
1354                        result_entries.push(json!({
1355                            "ErrorCode": "AccessDeniedException",
1356                            "ErrorMessage": format!(
1357                                "User '{}' is not authorized to put events on event bus '{}'",
1358                                principal.arn, event_bus_name
1359                            ),
1360                        }));
1361                        continue;
1362                    }
1363                }
1364            }
1365
1366            let time = parse_put_events_time(&entry["Time"]);
1367            let resources: Vec<String> = entry["Resources"]
1368                .as_array()
1369                .map(|arr| {
1370                    arr.iter()
1371                        .filter_map(|v| v.as_str().map(|s| s.to_string()))
1372                        .collect()
1373                })
1374                .unwrap_or_default();
1375
1376            let event = PutEvent {
1377                event_id: event_id.clone(),
1378                source: source.clone(),
1379                detail_type: detail_type.clone(),
1380                detail: detail.clone(),
1381                event_bus_name: event_bus_name.clone(),
1382                time,
1383                resources: resources.clone(),
1384            };
1385
1386            archive_matching_event(
1387                state,
1388                &event,
1389                &event_bus_name,
1390                &source,
1391                &detail_type,
1392                &detail,
1393                &req.account_id,
1394                &req.region,
1395                &resources,
1396            );
1397
1398            state.events.push(event);
1399
1400            // Find matching rules and their targets
1401            let matching_targets: Vec<EventTarget> = state
1402                .rules
1403                .values()
1404                .filter(|r| {
1405                    r.event_bus_name == event_bus_name
1406                        && r.state == "ENABLED"
1407                        && matches_pattern(
1408                            r.event_pattern.as_deref(),
1409                            &source,
1410                            &detail_type,
1411                            &detail,
1412                            &req.account_id,
1413                            &req.region,
1414                            &resources,
1415                        )
1416                })
1417                .flat_map(|r| r.targets.clone())
1418                .collect();
1419
1420            if !matching_targets.is_empty() {
1421                events_to_deliver.push((
1422                    event_id.clone(),
1423                    source,
1424                    detail_type,
1425                    detail,
1426                    time,
1427                    resources,
1428                    matching_targets,
1429                ));
1430            }
1431
1432            result_entries.push(json!({ "EventId": event_id }));
1433        }
1434
1435        // Drop the lock before delivering
1436        drop(accounts);
1437
1438        // Deliver to targets — single-target dispatch lives in the
1439        // shared helper so cross-service callers (delivery.rs) honor the
1440        // same target shape (SQS/SNS/Lambda/Logs/Kinesis/StepFunctions/
1441        // ApiDestination/HTTP) and the same InputTransformer rules.
1442        for (event_id, source, detail_type, detail, time, resources, targets) in events_to_deliver {
1443            let detail_value: Value = serde_json::from_str(&detail).unwrap_or(json!({}));
1444            let event_json = json!({
1445                "version": "0",
1446                "id": event_id,
1447                "source": source,
1448                "account": req.account_id,
1449                "detail-type": detail_type,
1450                "detail": detail_value,
1451                "time": time.format("%Y-%m-%dT%H:%M:%SZ").to_string(),
1452                "region": req.region,
1453                "resources": resources,
1454            });
1455
1456            let ctx = EventDispatchContext {
1457                state: &self.state,
1458                delivery: &self.delivery,
1459                lambda_state: self.lambda_state.as_ref(),
1460                logs_state: self.logs_state.as_ref(),
1461                container_runtime: &self.container_runtime,
1462                account_id: &req.account_id,
1463                region: &req.region,
1464            };
1465            for target in targets {
1466                dispatch_event_target(&ctx, &target, &event_json, &event_id, &detail_type);
1467            }
1468        }
1469
1470        let resp = json!({
1471            "FailedEntryCount": failed_count,
1472            "Entries": result_entries,
1473        });
1474
1475        Ok(AwsResponse::ok_json(resp))
1476    }
1477
1478    // ─── Tagging ────────────────────────────────────────────────────────
1479
1480    fn tag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1481        let body = req.json_body();
1482        validate_required("ResourceARN", &body["ResourceARN"])?;
1483        let arn = body["ResourceARN"]
1484            .as_str()
1485            .ok_or_else(|| missing("ResourceARN"))?;
1486        validate_string_length("resourceARN", arn, 1, 1600)?;
1487        validate_required("Tags", &body["Tags"])?;
1488
1489        let mut accounts = self.state.write();
1490        let state = accounts.get_or_create(&req.account_id);
1491        let tag_map = find_tags_mut(state, arn)?;
1492
1493        fakecloud_core::tags::apply_tags(tag_map, &body, "Tags", "Key", "Value").map_err(|f| {
1494            AwsServiceError::aws_error(
1495                StatusCode::BAD_REQUEST,
1496                "ValidationException",
1497                format!("{f} must be a list"),
1498            )
1499        })?;
1500
1501        Ok(AwsResponse::ok_json(json!({})))
1502    }
1503
1504    fn untag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1505        let body = req.json_body();
1506        validate_required("ResourceARN", &body["ResourceARN"])?;
1507        let arn = body["ResourceARN"]
1508            .as_str()
1509            .ok_or_else(|| missing("ResourceARN"))?;
1510        validate_string_length("resourceARN", arn, 1, 1600)?;
1511        validate_required("TagKeys", &body["TagKeys"])?;
1512
1513        let mut accounts = self.state.write();
1514        let state = accounts.get_or_create(&req.account_id);
1515        let tag_map = find_tags_mut(state, arn)?;
1516
1517        fakecloud_core::tags::remove_tags(tag_map, &body, "TagKeys").map_err(|f| {
1518            AwsServiceError::aws_error(
1519                StatusCode::BAD_REQUEST,
1520                "ValidationException",
1521                format!("{f} must be a list"),
1522            )
1523        })?;
1524
1525        Ok(AwsResponse::ok_json(json!({})))
1526    }
1527
1528    fn list_tags_for_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1529        let body = req.json_body();
1530        validate_required("ResourceARN", &body["ResourceARN"])?;
1531        let arn = body["ResourceARN"]
1532            .as_str()
1533            .ok_or_else(|| missing("ResourceARN"))?;
1534        validate_string_length("resourceARN", arn, 1, 1600)?;
1535
1536        let accounts = self.state.read();
1537        let empty = EventBridgeState::new(&req.account_id, &req.region);
1538        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1539        let tag_map = find_tags(state, arn)?;
1540
1541        let tags = fakecloud_core::tags::tags_to_json(tag_map, "Key", "Value");
1542
1543        Ok(AwsResponse::ok_json(json!({ "Tags": tags })))
1544    }
1545
1546    // ─── Archive Operations ─────────────────────────────────────────────
1547}
1548
1549// ─── Tag Lookup Helpers ─────────────────────────────────────────────────
1550
1551// ─── Event Pattern Validation ────────────────────────────────────────
1552
1553// ─── Connection Auth Params Response Builder ────────────────────────
1554
1555// ─── Event Pattern Matching ─────────────────────────────────────────
1556
1557/// Parsed + validated inputs for `StartReplay`.
1558struct StartReplayInput {
1559    name: String,
1560    description: Option<String>,
1561    event_source_arn: String,
1562    destination: Value,
1563    destination_arn: String,
1564    event_start_time: DateTime<Utc>,
1565    event_end_time: DateTime<Utc>,
1566}
1567
1568impl StartReplayInput {
1569    fn from_body(body: &Value) -> Result<Self, AwsServiceError> {
1570        // StartReplay's Smithy model declares ResourceNotFound,
1571        // ResourceAlreadyExists, InvalidEventPattern, LimitExceeded, and
1572        // Internal — but not ValidationException. Per-field constraints are
1573        // enforced client-side by the SDK; we surface only declared errors
1574        // here. Missing required inputs default to empty strings and the
1575        // downstream bus/archive lookups produce ResourceNotFound for the
1576        // ones that matter.
1577        let name = body["ReplayName"].as_str().unwrap_or("").to_string();
1578        let description = body["Description"].as_str().map(|s| s.to_string());
1579        let event_source_arn = body["EventSourceArn"].as_str().unwrap_or("").to_string();
1580        let destination = body["Destination"].clone();
1581
1582        let event_start_time = body["EventStartTime"]
1583            .as_f64()
1584            .and_then(|f| DateTime::from_timestamp(f as i64, 0))
1585            .unwrap_or_else(Utc::now);
1586        let event_end_time = body["EventEndTime"]
1587            .as_f64()
1588            .and_then(|f| DateTime::from_timestamp(f as i64, 0))
1589            .unwrap_or_else(Utc::now);
1590
1591        let destination_arn = destination["Arn"].as_str().unwrap_or("").to_string();
1592        if !destination_arn.contains(":event-bus/") {
1593            // Missing/invalid destination cannot be resolved to a bus —
1594            // surface as ResourceNotFound rather than the undeclared
1595            // ValidationException.
1596            return Err(AwsServiceError::aws_error(
1597                StatusCode::BAD_REQUEST,
1598                "ResourceNotFoundException",
1599                format!("Destination.Arn {destination_arn} does not point to an event bus."),
1600            ));
1601        }
1602
1603        Ok(Self {
1604            name,
1605            description,
1606            event_source_arn,
1607            destination,
1608            destination_arn,
1609            event_start_time,
1610            event_end_time,
1611        })
1612    }
1613}
1614
1615#[path = "service_archives_replays.rs"]
1616mod service_archives_replays;
1617#[path = "service_connections_apidests.rs"]
1618mod service_connections_apidests;
1619#[path = "service_endpoints.rs"]
1620mod service_endpoints;
1621#[path = "service_partner_sources.rs"]
1622mod service_partner_sources;
1623
1624#[path = "helpers.rs"]
1625mod helpers;
1626pub(crate) use helpers::*;
1627
1628#[cfg(test)]
1629#[path = "service_tests.rs"]
1630mod tests;
1631
1632#[cfg(test)]
1633mod pagination_reject_test {
1634    #[test]
1635    fn paginate_checked_rejects_invalid_token() {
1636        use fakecloud_core::pagination::paginate_checked;
1637        let items: Vec<i32> = (0..5).collect();
1638        assert!(paginate_checked(&items, Some("bad"), 3).is_err());
1639        assert!(paginate_checked(&items, Some("2"), 3).is_ok());
1640    }
1641}