Skip to main content

fakecloud_eventbridge/
service.rs

1use async_trait::async_trait;
2use chrono::{DateTime, Utc};
3use http::StatusCode;
4use serde_json::{json, Value};
5
6use std::collections::{BTreeMap, HashMap};
7use std::sync::Arc;
8
9use tokio::sync::Mutex as AsyncMutex;
10
11use fakecloud_aws::arn::Arn;
12use fakecloud_core::delivery::DeliveryBus;
13use fakecloud_core::pagination::paginate;
14use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
15use fakecloud_core::validation::*;
16use fakecloud_persistence::SnapshotStore;
17
18use fakecloud_lambda::runtime::ContainerRuntime;
19use fakecloud_lambda::{LambdaInvocation, SharedLambdaState};
20use fakecloud_logs::SharedLogsState;
21
22use crate::state::{
23    ApiDestination, Archive, Connection, Endpoint, EventBridgeSnapshot, EventBridgeState, EventBus,
24    EventRule, EventTarget, PartnerEventSource, PutEvent, Replay, SharedEventBridgeState,
25    EVENTBRIDGE_SNAPSHOT_SCHEMA_VERSION,
26};
27
28pub struct EventBridgeService {
29    state: SharedEventBridgeState,
30    delivery: Arc<DeliveryBus>,
31    lambda_state: Option<SharedLambdaState>,
32    logs_state: Option<SharedLogsState>,
33    container_runtime: Option<Arc<ContainerRuntime>>,
34    snapshot_store: Option<Arc<dyn SnapshotStore>>,
35    snapshot_lock: Arc<AsyncMutex<()>>,
36}
37
38impl EventBridgeService {
39    pub fn new(state: SharedEventBridgeState, delivery: Arc<DeliveryBus>) -> Self {
40        Self {
41            state,
42            delivery,
43            lambda_state: None,
44            logs_state: None,
45            container_runtime: None,
46            snapshot_store: None,
47            snapshot_lock: Arc::new(AsyncMutex::new(())),
48        }
49    }
50
51    pub fn with_lambda(mut self, lambda_state: SharedLambdaState) -> Self {
52        self.lambda_state = Some(lambda_state);
53        self
54    }
55
56    pub fn with_logs(mut self, logs_state: SharedLogsState) -> Self {
57        self.logs_state = Some(logs_state);
58        self
59    }
60
61    pub fn with_runtime(mut self, runtime: Arc<ContainerRuntime>) -> Self {
62        self.container_runtime = Some(runtime);
63        self
64    }
65
66    pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
67        self.snapshot_store = Some(store);
68        self
69    }
70
71    /// Persist current state as a snapshot. Held across the
72    /// clone-serialize-write sequence to prevent stale-last writes,
73    /// with serde + file I/O offloaded to the blocking pool.
74    async fn save_snapshot(&self) {
75        save_eventbridge_snapshot(
76            &self.state,
77            self.snapshot_store.clone(),
78            &self.snapshot_lock,
79        )
80        .await;
81    }
82
83    /// Build a hook that persists the current EventBridge state when invoked, or
84    /// `None` in memory mode (no snapshot store). The CloudFormation provisioner
85    /// mutates `state` directly and uses this to write a CFN-provisioned
86    /// resource through to disk, the same way a direct mutating API call would.
87    pub fn snapshot_hook(&self) -> Option<fakecloud_persistence::SnapshotHook> {
88        let store = self.snapshot_store.clone()?;
89        let state = self.state.clone();
90        let lock = self.snapshot_lock.clone();
91        Some(Arc::new(move || {
92            let state = state.clone();
93            let store = store.clone();
94            let lock = lock.clone();
95            Box::pin(async move {
96                save_eventbridge_snapshot(&state, Some(store), &lock).await;
97            })
98        }))
99    }
100}
101
102/// Persist the current EventBridge state as a snapshot. Offloads the serde +
103/// blocking file write to the Tokio blocking pool. Noop when `store` is `None`
104/// (memory mode). Shared by `EventBridgeService::save_snapshot` and the
105/// CloudFormation provisioner's post-provision persist hook so both route
106/// through the same serialize-and-write path.
107pub async fn save_eventbridge_snapshot(
108    state: &SharedEventBridgeState,
109    store: Option<Arc<dyn SnapshotStore>>,
110    lock: &AsyncMutex<()>,
111) {
112    let Some(store) = store else {
113        return;
114    };
115    let _guard = lock.lock().await;
116    let snapshot = EventBridgeSnapshot {
117        schema_version: EVENTBRIDGE_SNAPSHOT_SCHEMA_VERSION,
118        accounts: Some(state.read().clone()),
119        state: None,
120    };
121    let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
122        let bytes = serde_json::to_vec(&snapshot)
123            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
124        store.save(&bytes)
125    })
126    .await;
127    match join {
128        Ok(Ok(())) => {}
129        Ok(Err(err)) => tracing::error!(%err, "failed to write eventbridge snapshot"),
130        Err(err) => tracing::error!(%err, "eventbridge snapshot task panicked"),
131    }
132}
133
134#[async_trait]
135impl AwsService for EventBridgeService {
136    fn service_name(&self) -> &str {
137        "events"
138    }
139
140    async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
141        let mutates = is_mutating_action(req.action.as_str());
142        let result = match req.action.as_str() {
143            "CreateEventBus" => self.create_event_bus(&req),
144            "DeleteEventBus" => self.delete_event_bus(&req),
145            "ListEventBuses" => self.list_event_buses(&req),
146            "DescribeEventBus" => self.describe_event_bus(&req),
147            "PutRule" => self.put_rule(&req),
148            "DeleteRule" => self.delete_rule(&req),
149            "ListRules" => self.list_rules(&req),
150            "DescribeRule" => self.describe_rule(&req),
151            "EnableRule" => self.enable_rule(&req),
152            "DisableRule" => self.disable_rule(&req),
153            "PutTargets" => self.put_targets(&req),
154            "RemoveTargets" => self.remove_targets(&req),
155            "ListTargetsByRule" => self.list_targets_by_rule(&req),
156            "ListRuleNamesByTarget" => self.list_rule_names_by_target(&req),
157            "PutEvents" => self.put_events(&req),
158            "PutPermission" => self.put_permission(&req),
159            "RemovePermission" => self.remove_permission(&req),
160            "TagResource" => self.tag_resource(&req),
161            "UntagResource" => self.untag_resource(&req),
162            "ListTagsForResource" => self.list_tags_for_resource(&req),
163            "CreateArchive" => self.create_archive(&req),
164            "DescribeArchive" => self.describe_archive(&req),
165            "ListArchives" => self.list_archives(&req),
166            "UpdateArchive" => self.update_archive(&req),
167            "DeleteArchive" => self.delete_archive(&req),
168            "CreateConnection" => self.create_connection(&req),
169            "DescribeConnection" => self.describe_connection(&req),
170            "ListConnections" => self.list_connections(&req),
171            "UpdateConnection" => self.update_connection(&req),
172            "DeleteConnection" => self.delete_connection(&req),
173            "CreateApiDestination" => self.create_api_destination(&req),
174            "DescribeApiDestination" => self.describe_api_destination(&req),
175            "ListApiDestinations" => self.list_api_destinations(&req),
176            "UpdateApiDestination" => self.update_api_destination(&req),
177            "DeleteApiDestination" => self.delete_api_destination(&req),
178            "StartReplay" => self.start_replay(&req),
179            "DescribeReplay" => self.describe_replay(&req),
180            "ListReplays" => self.list_replays(&req),
181            "CancelReplay" => self.cancel_replay(&req),
182            "CreatePartnerEventSource" => self.create_partner_event_source(&req),
183            "DeletePartnerEventSource" => self.delete_partner_event_source(&req),
184            "DescribePartnerEventSource" => self.describe_partner_event_source(&req),
185            "ListPartnerEventSources" => self.list_partner_event_sources(&req),
186            "ListPartnerEventSourceAccounts" => self.list_partner_event_source_accounts(&req),
187            "ActivateEventSource" => self.activate_event_source(&req),
188            "DeactivateEventSource" => self.deactivate_event_source(&req),
189            "DescribeEventSource" => self.describe_event_source(&req),
190            "ListEventSources" => self.list_event_sources(&req),
191            "PutPartnerEvents" => self.put_partner_events(&req),
192            "TestEventPattern" => self.test_event_pattern(&req),
193            "UpdateEventBus" => self.update_event_bus(&req),
194            "CreateEndpoint" => self.create_endpoint(&req),
195            "DeleteEndpoint" => self.delete_endpoint(&req),
196            "DescribeEndpoint" => self.describe_endpoint(&req),
197            "ListEndpoints" => self.list_endpoints(&req),
198            "UpdateEndpoint" => self.update_endpoint(&req),
199            "DeauthorizeConnection" => self.deauthorize_connection(&req),
200            _ => Err(AwsServiceError::action_not_implemented(
201                "events",
202                &req.action,
203            )),
204        };
205        if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
206            self.save_snapshot().await;
207        }
208        result
209    }
210
211    fn supported_actions(&self) -> &[&str] {
212        &[
213            "CreateEventBus",
214            "DeleteEventBus",
215            "ListEventBuses",
216            "DescribeEventBus",
217            "PutRule",
218            "DeleteRule",
219            "ListRules",
220            "DescribeRule",
221            "EnableRule",
222            "DisableRule",
223            "PutTargets",
224            "RemoveTargets",
225            "ListTargetsByRule",
226            "ListRuleNamesByTarget",
227            "PutEvents",
228            "PutPermission",
229            "RemovePermission",
230            "TagResource",
231            "UntagResource",
232            "ListTagsForResource",
233            "CreateArchive",
234            "DescribeArchive",
235            "ListArchives",
236            "UpdateArchive",
237            "DeleteArchive",
238            "CreateConnection",
239            "DescribeConnection",
240            "ListConnections",
241            "UpdateConnection",
242            "DeleteConnection",
243            "CreateApiDestination",
244            "DescribeApiDestination",
245            "ListApiDestinations",
246            "UpdateApiDestination",
247            "DeleteApiDestination",
248            "StartReplay",
249            "DescribeReplay",
250            "ListReplays",
251            "CancelReplay",
252            "CreatePartnerEventSource",
253            "DeletePartnerEventSource",
254            "DescribePartnerEventSource",
255            "ListPartnerEventSources",
256            "ListPartnerEventSourceAccounts",
257            "ActivateEventSource",
258            "DeactivateEventSource",
259            "DescribeEventSource",
260            "ListEventSources",
261            "PutPartnerEvents",
262            "TestEventPattern",
263            "UpdateEventBus",
264            "CreateEndpoint",
265            "DeleteEndpoint",
266            "DescribeEndpoint",
267            "ListEndpoints",
268            "UpdateEndpoint",
269            "DeauthorizeConnection",
270        ]
271    }
272}
273
274// ─── Event Bus Operations ───────────────────────────────────────────
275impl EventBridgeService {
276    fn create_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
277        let body = req.json_body();
278        validate_required("Name", &body["Name"])?;
279        let name = body["Name"]
280            .as_str()
281            .ok_or_else(|| missing("Name"))?
282            .to_string();
283        validate_string_length("name", &name, 1, 256)?;
284        validate_optional_string_length(
285            "eventSourceName",
286            body["EventSourceName"].as_str(),
287            1,
288            256,
289        )?;
290        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
291        validate_optional_string_length(
292            "kmsKeyIdentifier",
293            body["KmsKeyIdentifier"].as_str(),
294            0,
295            2048,
296        )?;
297
298        // Validate name doesn't contain '/' (unless partner bus)
299        if name.contains('/') && !name.starts_with("aws.partner/") {
300            return Err(AwsServiceError::aws_error(
301                StatusCode::BAD_REQUEST,
302                "ValidationException",
303                "Event bus name must not contain '/'.",
304            ));
305        }
306
307        // Partner event bus validation
308        if name.starts_with("aws.partner/") {
309            let event_source = body["EventSourceName"].as_str().unwrap_or("");
310            let accounts_r = self.state.read();
311            let empty_r = EventBridgeState::new(&req.account_id, &req.region);
312            let state_r = accounts_r.get(&req.account_id).unwrap_or(&empty_r);
313            let has_source = state_r.partner_event_sources.contains_key(event_source);
314            drop(accounts_r);
315            if !has_source {
316                return Err(AwsServiceError::aws_error(
317                    StatusCode::BAD_REQUEST,
318                    "ResourceNotFoundException",
319                    format!("Event source {event_source} does not exist."),
320                ));
321            }
322        }
323
324        let mut accounts = self.state.write();
325        let state = accounts.get_or_create(&req.account_id);
326
327        if state.buses.contains_key(&name) {
328            return Err(AwsServiceError::aws_error(
329                StatusCode::BAD_REQUEST,
330                "ResourceAlreadyExistsException",
331                format!("Event bus {name} already exists."),
332            ));
333        }
334
335        let arn = format!(
336            "arn:aws:events:{}:{}:event-bus/{}",
337            req.region, state.account_id, name
338        );
339        let now = Utc::now();
340        let description = body["Description"].as_str().map(|s| s.to_string());
341        let kms_key_identifier = body["KmsKeyIdentifier"].as_str().map(|s| s.to_string());
342        let dead_letter_config = body.get("DeadLetterConfig").cloned();
343
344        let tags = parse_tags(&body);
345
346        let bus = EventBus {
347            name: name.clone(),
348            arn: arn.clone(),
349            tags,
350            policy: None,
351            description,
352            kms_key_identifier,
353            dead_letter_config,
354            creation_time: now,
355            last_modified_time: now,
356        };
357        state.buses.insert(name, bus);
358
359        Ok(AwsResponse::ok_json(json!({ "EventBusArn": arn })))
360    }
361
362    fn delete_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
363        let body = req.json_body();
364        validate_required("Name", &body["Name"])?;
365        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
366        validate_string_length("name", name, 1, 256)?;
367
368        if name == "default" {
369            return Err(AwsServiceError::aws_error(
370                StatusCode::BAD_REQUEST,
371                "ValidationException",
372                format!("Cannot delete event bus {name}."),
373            ));
374        }
375
376        let mut accounts = self.state.write();
377        let state = accounts.get_or_create(&req.account_id);
378        state.buses.remove(name);
379        state.rules.retain(|k, _| k.0 != name);
380
381        Ok(AwsResponse::ok_json(json!({})))
382    }
383
384    fn list_event_buses(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
385        let body = req.json_body();
386        // Smithy ListEventBusesRequest constraints:
387        //   NamePrefix: EventBusName length 1..=256
388        //   NextToken: length 1..=2048
389        //   Limit: LimitMax100 range 1..=100
390        // Unrecognised pagination tokens still fall back to the start of the
391        // list — `InvalidNextTokenException` only fires when the token shape
392        // itself is wrong, not when it points at a vanished cursor.
393        validate_optional_string_length_value("NamePrefix", &body["NamePrefix"], 1, 256)?;
394        validate_optional_string_length_value("NextToken", &body["NextToken"], 1, 2048)?;
395        validate_optional_json_range("Limit", &body["Limit"], 1, 100)?;
396        let name_prefix = body["NamePrefix"].as_str();
397        let limit = body["Limit"].as_i64().unwrap_or(100).clamp(1, 100) as usize;
398
399        let accounts = self.state.read();
400        let empty = EventBridgeState::new(&req.account_id, &req.region);
401        let state = accounts.get(&req.account_id).unwrap_or(&empty);
402        let filtered: Vec<&_> = state
403            .buses
404            .values()
405            .filter(|b| match name_prefix {
406                Some(prefix) => b.name.starts_with(prefix),
407                None => true,
408            })
409            .collect();
410
411        let (page, next_token) = paginate(&filtered, body["NextToken"].as_str(), limit);
412        let buses: Vec<Value> = page
413            .iter()
414            .map(|b| {
415                // Mirror DescribeEventBus: ListEventBuses also reports the
416                // bus creation/last-modified times, which the Terraform
417                // aws_cloudwatch_event_buses data source expects to be set.
418                json!({
419                    "Name": b.name,
420                    "Arn": b.arn,
421                    "CreationTime": b.creation_time.timestamp() as f64,
422                    "LastModifiedTime": b.last_modified_time.timestamp() as f64,
423                })
424            })
425            .collect();
426        let mut resp = json!({ "EventBuses": buses });
427        if let Some(token) = next_token {
428            resp["NextToken"] = json!(token);
429        }
430
431        Ok(AwsResponse::ok_json(resp))
432    }
433
434    fn describe_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
435        let body = req.json_body();
436        validate_optional_string_length("name", body["Name"].as_str(), 1, 1600)?;
437        let name = body["Name"].as_str().unwrap_or("default");
438
439        let accounts = self.state.read();
440        let empty = EventBridgeState::new(&req.account_id, &req.region);
441        let state = accounts.get(&req.account_id).unwrap_or(&empty);
442        let bus = state.buses.get(name).ok_or_else(|| {
443            AwsServiceError::aws_error(
444                StatusCode::BAD_REQUEST,
445                "ResourceNotFoundException",
446                format!("Event bus {name} does not exist."),
447            )
448        })?;
449
450        let mut resp = json!({
451            "Name": bus.name,
452            "Arn": bus.arn,
453            "CreationTime": bus.creation_time.timestamp() as f64,
454            "LastModifiedTime": bus.last_modified_time.timestamp() as f64,
455        });
456
457        if let Some(ref policy) = bus.policy {
458            resp["Policy"] = Value::String(serde_json::to_string(policy).unwrap());
459        }
460        if let Some(ref desc) = bus.description {
461            resp["Description"] = json!(desc);
462        }
463        if let Some(ref kms) = bus.kms_key_identifier {
464            resp["KmsKeyIdentifier"] = json!(kms);
465        }
466        if let Some(ref dlc) = bus.dead_letter_config {
467            resp["DeadLetterConfig"] = dlc.clone();
468        }
469
470        Ok(AwsResponse::ok_json(resp))
471    }
472
473    // ─── Permission Operations ──────────────────────────────────────────
474
475    fn put_permission(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
476        let body = req.json_body();
477        // Smithy PutPermissionRequest constraints (optional members but each
478        // carries a `@length` trait that must be honoured when present):
479        //   EventBusName: NonPartnerEventBusName length 1..=256
480        //   Action: length 1..=64
481        //   Principal: length 1..=12 (12-digit AWS account or `*`)
482        //   StatementId: length 1..=64
483        validate_optional_string_length_value("EventBusName", &body["EventBusName"], 1, 256)?;
484        validate_optional_string_length_value("Action", &body["Action"], 1, 64)?;
485        validate_optional_string_length_value("Principal", &body["Principal"], 1, 12)?;
486        validate_optional_string_length_value("StatementId", &body["StatementId"], 1, 64)?;
487        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
488
489        let mut accounts = self.state.write();
490        let state = accounts.get_or_create(&req.account_id);
491
492        let bus = state.buses.get_mut(event_bus_name).ok_or_else(|| {
493            AwsServiceError::aws_error(
494                StatusCode::BAD_REQUEST,
495                "ResourceNotFoundException",
496                format!("Event bus {event_bus_name} does not exist."),
497            )
498        })?;
499
500        // Check if Policy is provided (new-style)
501        if let Some(policy_str) = body["Policy"].as_str() {
502            if let Ok(policy) = serde_json::from_str::<Value>(policy_str) {
503                bus.policy = Some(policy);
504                return Ok(AwsResponse::ok_json(json!({})));
505            }
506        }
507
508        // Old-style: Action, Principal, StatementId. All are @optional in the
509        // Smithy model; non-string values were already rejected above with
510        // SerializationException, so reaching here means each is either a
511        // valid string or absent. Fall back to "" to preserve current behavior
512        // — recording an empty-statement policy entry is harmless since it can
513        // never match a real action/principal pair.
514        let action = body["Action"].as_str().unwrap_or("");
515        let principal = body["Principal"].as_str().unwrap_or("");
516        let statement_id = body["StatementId"].as_str().unwrap_or("");
517
518        // Note: real AWS does enforce a small allow-list on `Action`, but
519        // PutPermission's Smithy model only declares ResourceNotFoundException,
520        // PolicyLengthExceededException, ConcurrentModificationException,
521        // OperationDisabledException, and InternalException. We accept any
522        // action string and just record the statement.
523        // A `*` principal means "any account" and is stored verbatim, not as
524        // an account-root ARN. The Terraform aws_cloudwatch_event_permission
525        // resource reads the principal back and asserts it is exactly "*".
526        let principal_value = if principal == "*" {
527            json!("*")
528        } else {
529            json!({ "AWS": Arn::global("iam", principal, "root").to_string() })
530        };
531        let statement = json!({
532            "Sid": statement_id,
533            "Effect": "Allow",
534            "Principal": principal_value,
535            "Action": action,
536            "Resource": bus.arn,
537        });
538
539        let policy = bus.policy.get_or_insert_with(|| {
540            json!({
541                "Version": "2012-10-17",
542                "Statement": [],
543            })
544        });
545
546        if let Some(stmts) = policy["Statement"].as_array_mut() {
547            // PutPermission with an existing StatementId replaces that
548            // statement (e.g. changing its principal), it does not stack a
549            // second one. Drop any prior statement with the same Sid first.
550            stmts.retain(|s| s["Sid"].as_str() != Some(statement_id));
551            stmts.push(statement);
552        }
553
554        Ok(AwsResponse::ok_json(json!({})))
555    }
556
557    fn remove_permission(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
558        let body = req.json_body();
559        validate_optional_string_length("statementId", body["StatementId"].as_str(), 1, 64)?;
560        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 256)?;
561        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
562        let statement_id = body["StatementId"].as_str().unwrap_or("");
563        let remove_all = body["RemoveAllPermissions"].as_bool().unwrap_or(false);
564
565        let mut accounts = self.state.write();
566        let state = accounts.get_or_create(&req.account_id);
567
568        let bus = state.buses.get_mut(event_bus_name).ok_or_else(|| {
569            AwsServiceError::aws_error(
570                StatusCode::BAD_REQUEST,
571                "ResourceNotFoundException",
572                format!("Event bus {event_bus_name} does not exist."),
573            )
574        })?;
575
576        if remove_all {
577            bus.policy = None;
578            return Ok(AwsResponse::ok_json(json!({})));
579        }
580
581        let policy = bus.policy.as_mut().ok_or_else(|| {
582            AwsServiceError::aws_error(
583                StatusCode::BAD_REQUEST,
584                "ResourceNotFoundException",
585                "EventBus does not have a policy.",
586            )
587        })?;
588
589        if let Some(stmts) = policy["Statement"].as_array_mut() {
590            let before = stmts.len();
591            stmts.retain(|s| s["Sid"].as_str() != Some(statement_id));
592            if stmts.len() == before {
593                return Err(AwsServiceError::aws_error(
594                    StatusCode::BAD_REQUEST,
595                    "ResourceNotFoundException",
596                    "Statement with the provided id does not exist.",
597                ));
598            }
599            if stmts.is_empty() {
600                bus.policy = None;
601            }
602        }
603
604        Ok(AwsResponse::ok_json(json!({})))
605    }
606
607    // ─── Rule Operations ────────────────────────────────────────────────
608
609    fn put_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
610        let body = req.json_body();
611        // Smithy PutRuleRequest constraints:
612        //   Name: RuleName length 1..=64, @required
613        //   ScheduleExpression: length 0..=256
614        //   EventPattern: length 0..=4096 (raw JSON, separate
615        //     InvalidEventPatternException still applies to syntax)
616        //   State: RuleState enum {ENABLED, DISABLED,
617        //          ENABLED_WITH_ALL_CLOUDTRAIL_MANAGEMENT_EVENTS}
618        //   Description: RuleDescription length 0..=512
619        //   RoleArn: length 1..=1600
620        //   EventBusName: EventBusNameOrArn length 1..=1600
621        validate_required("Name", &body["Name"])?;
622        let name = body["Name"]
623            .as_str()
624            .ok_or_else(|| missing("Name"))?
625            .to_string();
626        validate_string_length("Name", &name, 1, 64)?;
627        validate_optional_string_length_value(
628            "ScheduleExpression",
629            &body["ScheduleExpression"],
630            0,
631            256,
632        )?;
633        validate_optional_string_length_value("EventPattern", &body["EventPattern"], 0, 4096)?;
634        // Reject a syntactically invalid EventPattern at PutRule time, like AWS.
635        // Previously only the length was checked, so an invalid pattern was
636        // stored and PutEvents silently never matched it.
637        if let Some(pattern) = body["EventPattern"].as_str().filter(|s| !s.is_empty()) {
638            validate_event_pattern(pattern)?;
639        }
640        validate_optional_enum_value(
641            "State",
642            &body["State"],
643            &[
644                "ENABLED",
645                "DISABLED",
646                "ENABLED_WITH_ALL_CLOUDTRAIL_MANAGEMENT_EVENTS",
647            ],
648        )?;
649        validate_optional_string_length_value("Description", &body["Description"], 0, 512)?;
650        validate_optional_string_length_value("RoleArn", &body["RoleArn"], 1, 1600)?;
651        validate_optional_string_length_value("EventBusName", &body["EventBusName"], 1, 1600)?;
652
653        let raw_bus = body["EventBusName"]
654            .as_str()
655            .unwrap_or("default")
656            .to_string();
657
658        let mut accounts = self.state.write();
659        let state = accounts.get_or_create(&req.account_id);
660        let event_bus_name = state.resolve_bus_name(&raw_bus);
661
662        let event_pattern = body["EventPattern"].as_str().and_then(|s| {
663            if s.is_empty() {
664                None
665            } else {
666                Some(s.to_string())
667            }
668        });
669        let schedule_expression = body["ScheduleExpression"].as_str().and_then(|s| {
670            if s.is_empty() {
671                None
672            } else {
673                Some(s.to_string())
674            }
675        });
676        let description = body["Description"].as_str().map(|s| s.to_string());
677        let role_arn = body["RoleArn"].as_str().map(|s| s.to_string());
678        let rule_state = body["State"].as_str().unwrap_or("ENABLED").to_string();
679
680        // Note: real AWS rejects ScheduleExpression on a non-default bus, but
681        // PutRule's Smithy model only declares InvalidEventPatternException
682        // for input-shape problems, not ValidationException. We accept the
683        // value and let the scheduler ignore it on non-default buses.
684
685        if !state.buses.contains_key(&event_bus_name) {
686            return Err(AwsServiceError::aws_error(
687                StatusCode::BAD_REQUEST,
688                "ResourceNotFoundException",
689                format!("Event bus {event_bus_name} does not exist."),
690            ));
691        }
692
693        let arn = if event_bus_name == "default" {
694            format!(
695                "arn:aws:events:{}:{}:rule/{}",
696                req.region, state.account_id, name
697            )
698        } else {
699            format!(
700                "arn:aws:events:{}:{}:rule/{}/{}",
701                req.region, state.account_id, event_bus_name, name
702            )
703        };
704
705        let key = (event_bus_name.clone(), name.clone());
706        let targets = state
707            .rules
708            .get(&key)
709            .map(|r| r.targets.clone())
710            .unwrap_or_default();
711
712        let tags = parse_tags(&body);
713
714        let rule = EventRule {
715            name: name.clone(),
716            arn: arn.clone(),
717            event_bus_name,
718            event_pattern,
719            schedule_expression,
720            state: rule_state,
721            description,
722            role_arn,
723            managed_by: None,
724            created_by: None,
725            targets,
726            tags,
727            last_fired: None,
728        };
729
730        state.rules.insert(key, rule);
731        Ok(AwsResponse::ok_json(json!({ "RuleArn": arn })))
732    }
733
734    fn delete_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
735        let body = req.json_body();
736        validate_required("Name", &body["Name"])?;
737        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
738        validate_string_length("name", name, 1, 64)?;
739        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
740        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
741
742        let mut accounts = self.state.write();
743        let state = accounts.get_or_create(&req.account_id);
744        let bus_name = state.resolve_bus_name(event_bus_name);
745        let key = (bus_name, name.to_string());
746
747        // Check if rule has targets
748        if let Some(rule) = state.rules.get(&key) {
749            if !rule.targets.is_empty() {
750                return Err(AwsServiceError::aws_error(
751                    StatusCode::BAD_REQUEST,
752                    "ValidationException",
753                    "Rule can't be deleted since it has targets.",
754                ));
755            }
756        }
757
758        state.rules.remove(&key);
759        Ok(AwsResponse::ok_json(json!({})))
760    }
761
762    fn list_rules(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
763        let body = req.json_body();
764        validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 64)?;
765        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
766        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
767        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
768        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
769        let name_prefix = body["NamePrefix"].as_str();
770        let limit = body["Limit"].as_u64().map(|n| n as usize);
771        let next_token = body["NextToken"].as_str();
772
773        let accounts = self.state.read();
774        let empty = EventBridgeState::new(&req.account_id, &req.region);
775        let state = accounts.get(&req.account_id).unwrap_or(&empty);
776        let bus_name = state.resolve_bus_name(event_bus_name);
777
778        let mut rules: Vec<&EventRule> = state
779            .rules
780            .values()
781            .filter(|r| r.event_bus_name == bus_name)
782            .filter(|r| match name_prefix {
783                Some(prefix) => r.name.starts_with(prefix),
784                None => true,
785            })
786            .collect();
787        rules.sort_by(|a, b| a.name.cmp(&b.name));
788
789        // Pagination
790        let start = next_token
791            .and_then(|t| t.parse::<usize>().ok())
792            .unwrap_or(0)
793            .min(rules.len());
794        let rules_slice = &rules[start..];
795
796        let (page, new_next_token) = if let Some(lim) = limit {
797            if rules_slice.len() > lim {
798                (&rules_slice[..lim], Some((start + lim).to_string()))
799            } else {
800                (rules_slice, None)
801            }
802        } else {
803            (rules_slice, None)
804        };
805
806        let rules_json: Vec<Value> = page
807            .iter()
808            .map(|r| {
809                let mut obj = json!({
810                    "Name": r.name,
811                    "Arn": r.arn,
812                    "EventBusName": r.event_bus_name,
813                    "State": r.state,
814                });
815                if let Some(ref desc) = r.description {
816                    obj["Description"] = json!(desc);
817                }
818                if let Some(ref ep) = r.event_pattern {
819                    obj["EventPattern"] = json!(ep);
820                }
821                if let Some(ref se) = r.schedule_expression {
822                    obj["ScheduleExpression"] = json!(se);
823                }
824                if let Some(ref role) = r.role_arn {
825                    obj["RoleArn"] = json!(role);
826                }
827                if let Some(ref mb) = r.managed_by {
828                    obj["ManagedBy"] = json!(mb);
829                }
830                obj
831            })
832            .collect();
833
834        let mut resp = json!({ "Rules": rules_json });
835        if let Some(token) = new_next_token {
836            resp["NextToken"] = json!(token);
837        }
838
839        Ok(AwsResponse::ok_json(resp))
840    }
841
842    fn describe_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
843        let body = req.json_body();
844        validate_required("Name", &body["Name"])?;
845        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
846        validate_string_length("name", name, 1, 64)?;
847        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
848        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
849
850        let accounts = self.state.read();
851        let empty = EventBridgeState::new(&req.account_id, &req.region);
852        let state = accounts.get(&req.account_id).unwrap_or(&empty);
853        let bus_name = state.resolve_bus_name(event_bus_name);
854        let key = (bus_name.clone(), name.to_string());
855
856        let rule = state.rules.get(&key).ok_or_else(|| {
857            AwsServiceError::aws_error(
858                StatusCode::BAD_REQUEST,
859                "ResourceNotFoundException",
860                format!("Rule {name} does not exist."),
861            )
862        })?;
863
864        let mut resp = json!({
865            "Name": rule.name,
866            "Arn": rule.arn,
867            "EventBusName": rule.event_bus_name,
868            "State": rule.state,
869        });
870
871        if let Some(ref desc) = rule.description {
872            resp["Description"] = json!(desc);
873        }
874        if let Some(ref ep) = rule.event_pattern {
875            resp["EventPattern"] = json!(ep);
876        }
877        if let Some(ref se) = rule.schedule_expression {
878            resp["ScheduleExpression"] = json!(se);
879        }
880        if let Some(ref role) = rule.role_arn {
881            resp["RoleArn"] = json!(role);
882        }
883        if let Some(ref mb) = rule.managed_by {
884            resp["ManagedBy"] = json!(mb);
885        }
886        if let Some(ref cb) = rule.created_by {
887            resp["CreatedBy"] = json!(cb);
888        }
889        // If non-default bus, set CreatedBy to account_id
890        if rule.event_bus_name != "default" && rule.created_by.is_none() {
891            resp["CreatedBy"] = json!(state.account_id);
892        }
893
894        Ok(AwsResponse::ok_json(resp))
895    }
896
897    fn enable_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
898        let body = req.json_body();
899        validate_required("Name", &body["Name"])?;
900        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
901        validate_string_length("name", name, 1, 64)?;
902        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
903        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
904
905        let mut accounts = self.state.write();
906        let state = accounts.get_or_create(&req.account_id);
907        let bus_name = state.resolve_bus_name(event_bus_name);
908        let key = (bus_name, name.to_string());
909
910        let rule = state.rules.get_mut(&key).ok_or_else(|| {
911            AwsServiceError::aws_error(
912                StatusCode::BAD_REQUEST,
913                "ResourceNotFoundException",
914                format!("Rule {name} does not exist."),
915            )
916        })?;
917
918        rule.state = "ENABLED".to_string();
919        Ok(AwsResponse::ok_json(json!({})))
920    }
921
922    fn disable_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
923        let body = req.json_body();
924        validate_required("Name", &body["Name"])?;
925        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
926        validate_string_length("name", name, 1, 64)?;
927        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
928        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
929
930        let mut accounts = self.state.write();
931        let state = accounts.get_or_create(&req.account_id);
932        let bus_name = state.resolve_bus_name(event_bus_name);
933        let key = (bus_name, name.to_string());
934
935        let rule = state.rules.get_mut(&key).ok_or_else(|| {
936            AwsServiceError::aws_error(
937                StatusCode::BAD_REQUEST,
938                "ResourceNotFoundException",
939                format!("Rule {name} does not exist."),
940            )
941        })?;
942
943        rule.state = "DISABLED".to_string();
944        Ok(AwsResponse::ok_json(json!({})))
945    }
946
947    // ─── Target Operations ──────────────────────────────────────────────
948
949    fn put_targets(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
950        let body = req.json_body();
951        // Smithy PutTargetsRequest constraints (top-level shape):
952        //   Rule: RuleName @required length 1..=64
953        //   EventBusName: EventBusNameOrArn length 1..=1600
954        //   Targets: TargetList @required length 1..=100
955        // Per-target validation still flows through `FailedEntries` (matching
956        // AWS); only the top-level shape produces ValidationException.
957        validate_required("Rule", &body["Rule"])?;
958        let rule_name = body["Rule"].as_str().ok_or_else(|| missing("Rule"))?;
959        validate_string_length("Rule", rule_name, 1, 64)?;
960        validate_optional_string_length_value("EventBusName", &body["EventBusName"], 1, 1600)?;
961        // Targets is @required with TargetList length 1..=100 in the Smithy
962        // model. Real AWS rejects empty / oversized lists with a
963        // ValidationException; per-target validation continues to flow
964        // through FailedEntries (matching AWS).
965        validate_required("Targets", &body["Targets"])?;
966        let targets_array = body["Targets"].as_array().ok_or_else(|| {
967            AwsServiceError::aws_error(
968                StatusCode::BAD_REQUEST,
969                "ValidationException",
970                "Targets must be a list",
971            )
972        })?;
973        if targets_array.is_empty() || targets_array.len() > 100 {
974            return Err(AwsServiceError::aws_error(
975                StatusCode::BAD_REQUEST,
976                "ValidationException",
977                "Value at 'Targets' failed to satisfy constraint: \
978                 Member must have length between 1 and 100",
979            ));
980        }
981        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
982        let targets: Vec<Value> = targets_array.clone();
983
984        let mut accounts = self.state.write();
985        let state = accounts.get_or_create(&req.account_id);
986        let bus_name = state.resolve_bus_name(event_bus_name);
987        let key = (bus_name.clone(), rule_name.to_string());
988
989        let rule = state.rules.get_mut(&key).ok_or_else(|| {
990            AwsServiceError::aws_error(
991                StatusCode::BAD_REQUEST,
992                "ResourceNotFoundException",
993                format!("Rule {rule_name} does not exist on EventBus {bus_name}."),
994            )
995        })?;
996
997        let mut failed_entries: Vec<Value> = Vec::new();
998        for target in &targets {
999            let target_id = target["Id"].as_str().unwrap_or("").to_string();
1000            let target_arn = target["Arn"].as_str().unwrap_or("");
1001
1002            if target_arn.ends_with(".fifo") && target.get("SqsParameters").is_none() {
1003                failed_entries.push(json!({
1004                    "TargetId": target_id,
1005                    "ErrorCode": "ValidationException",
1006                    "ErrorMessage": format!(
1007                        "Parameter(s) SqsParameters must be specified for target: {target_id}."
1008                    ),
1009                }));
1010                continue;
1011            }
1012            if !target_arn.starts_with("arn:") {
1013                failed_entries.push(json!({
1014                    "TargetId": target_id,
1015                    "ErrorCode": "ValidationException",
1016                    "ErrorMessage": format!(
1017                        "Parameter {target_arn} is not valid. Reason: Provided Arn is not in correct format."
1018                    ),
1019                }));
1020                continue;
1021            }
1022
1023            let et = parse_target(target);
1024            rule.targets.retain(|t| t.id != et.id);
1025            rule.targets.push(et);
1026        }
1027
1028        Ok(AwsResponse::ok_json(json!({
1029            "FailedEntryCount": failed_entries.len(),
1030            "FailedEntries": failed_entries,
1031        })))
1032    }
1033
1034    fn remove_targets(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1035        let body = req.json_body();
1036        validate_required("Rule", &body["Rule"])?;
1037        let rule_name = body["Rule"].as_str().ok_or_else(|| missing("Rule"))?;
1038        validate_string_length("rule", rule_name, 1, 64)?;
1039        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
1040        validate_required("Ids", &body["Ids"])?;
1041        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
1042        let ids = body["Ids"].as_array().ok_or_else(|| missing("Ids"))?;
1043
1044        let target_ids: Vec<String> = ids
1045            .iter()
1046            .filter_map(|v| v.as_str().map(|s| s.to_string()))
1047            .collect();
1048
1049        let mut accounts = self.state.write();
1050        let state = accounts.get_or_create(&req.account_id);
1051        let bus_name = state.resolve_bus_name(event_bus_name);
1052        let key = (bus_name.clone(), rule_name.to_string());
1053
1054        let rule = state.rules.get_mut(&key).ok_or_else(|| {
1055            AwsServiceError::aws_error(
1056                StatusCode::BAD_REQUEST,
1057                "ResourceNotFoundException",
1058                format!("Rule {rule_name} does not exist on EventBus {bus_name}."),
1059            )
1060        })?;
1061
1062        rule.targets.retain(|t| !target_ids.contains(&t.id));
1063
1064        Ok(AwsResponse::ok_json(json!({
1065            "FailedEntryCount": 0,
1066            "FailedEntries": [],
1067        })))
1068    }
1069
1070    fn list_targets_by_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1071        let body = req.json_body();
1072        validate_required("Rule", &body["Rule"])?;
1073        let rule_name = body["Rule"].as_str().ok_or_else(|| missing("Rule"))?;
1074        validate_string_length("rule", rule_name, 1, 64)?;
1075        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
1076        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1077        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1078        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
1079        let limit = body["Limit"].as_u64().map(|n| n as usize);
1080        let next_token = body["NextToken"].as_str();
1081
1082        let accounts = self.state.read();
1083        let empty = EventBridgeState::new(&req.account_id, &req.region);
1084        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1085        let bus_name = state.resolve_bus_name(event_bus_name);
1086        let key = (bus_name, rule_name.to_string());
1087
1088        let rule = state.rules.get(&key).ok_or_else(|| {
1089            AwsServiceError::aws_error(
1090                StatusCode::BAD_REQUEST,
1091                "ResourceNotFoundException",
1092                format!("Rule {rule_name} does not exist."),
1093            )
1094        })?;
1095
1096        let all_targets = &rule.targets;
1097        let start = next_token
1098            .and_then(|t| t.parse::<usize>().ok())
1099            .unwrap_or(0)
1100            .min(all_targets.len());
1101        let slice = &all_targets[start..];
1102
1103        let (page, new_next_token) = if let Some(lim) = limit {
1104            if slice.len() > lim {
1105                (&slice[..lim], Some((start + lim).to_string()))
1106            } else {
1107                (slice, None)
1108            }
1109        } else {
1110            (slice, None)
1111        };
1112
1113        let targets: Vec<Value> = page.iter().map(target_to_json).collect();
1114
1115        let mut resp = json!({ "Targets": targets });
1116        if let Some(token) = new_next_token {
1117            resp["NextToken"] = json!(token);
1118        }
1119
1120        Ok(AwsResponse::ok_json(resp))
1121    }
1122
1123    fn list_rule_names_by_target(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1124        let body = req.json_body();
1125        validate_required("TargetArn", &body["TargetArn"])?;
1126        let target_arn = body["TargetArn"]
1127            .as_str()
1128            .ok_or_else(|| missing("TargetArn"))?;
1129        validate_string_length("targetArn", target_arn, 1, 1600)?;
1130        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
1131        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1132        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1133        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
1134        let limit = body["Limit"].as_u64().map(|n| n as usize);
1135        let next_token = body["NextToken"].as_str();
1136
1137        let accounts = self.state.read();
1138        let empty = EventBridgeState::new(&req.account_id, &req.region);
1139        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1140        let bus_name = state.resolve_bus_name(event_bus_name);
1141
1142        // Deduplicate rule names
1143        let mut rule_names: Vec<String> = Vec::new();
1144        for rule in state.rules.values() {
1145            if rule.event_bus_name == bus_name
1146                && rule.targets.iter().any(|t| t.arn == target_arn)
1147                && !rule_names.contains(&rule.name)
1148            {
1149                rule_names.push(rule.name.clone());
1150            }
1151        }
1152        rule_names.sort();
1153
1154        let start = next_token
1155            .and_then(|t| t.parse::<usize>().ok())
1156            .unwrap_or(0)
1157            .min(rule_names.len());
1158        let slice = &rule_names[start..];
1159
1160        let (page, new_next_token) = if let Some(lim) = limit {
1161            if slice.len() > lim {
1162                (&slice[..lim], Some((start + lim).to_string()))
1163            } else {
1164                (slice, None)
1165            }
1166        } else {
1167            (slice, None)
1168        };
1169
1170        let mut resp = json!({ "RuleNames": page });
1171        if let Some(token) = new_next_token {
1172            resp["NextToken"] = json!(token);
1173        }
1174
1175        Ok(AwsResponse::ok_json(resp))
1176    }
1177
1178    // ─── Partner Event Sources ────────────���───────────────────────────
1179
1180    fn test_event_pattern(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1181        let body = req.json_body();
1182        validate_required("EventPattern", &body["EventPattern"])?;
1183        validate_required("Event", &body["Event"])?;
1184        let event_pattern = body["EventPattern"]
1185            .as_str()
1186            .ok_or_else(|| missing("EventPattern"))?;
1187        let event_str = body["Event"].as_str().ok_or_else(|| missing("Event"))?;
1188
1189        // Parse the event JSON
1190        let event: Value = serde_json::from_str(event_str).map_err(|_| {
1191            AwsServiceError::aws_error(
1192                StatusCode::BAD_REQUEST,
1193                "InvalidEventPatternException",
1194                "Event is not valid JSON.",
1195            )
1196        })?;
1197
1198        // Parse the pattern JSON
1199        let _pattern: Value = serde_json::from_str(event_pattern).map_err(|_| {
1200            AwsServiceError::aws_error(
1201                StatusCode::BAD_REQUEST,
1202                "InvalidEventPatternException",
1203                "Event pattern is not valid JSON.",
1204            )
1205        })?;
1206
1207        let source = event["source"].as_str().unwrap_or("");
1208        let detail_type = event["detail-type"].as_str().unwrap_or("");
1209        let detail = event
1210            .get("detail")
1211            .map(|v| serde_json::to_string(v).unwrap_or_default())
1212            .unwrap_or_else(|| "{}".to_string());
1213        let account = event["account"].as_str().unwrap_or("");
1214        let region = event["region"].as_str().unwrap_or("");
1215        let resources: Vec<String> = event["resources"]
1216            .as_array()
1217            .map(|arr| {
1218                arr.iter()
1219                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
1220                    .collect()
1221            })
1222            .unwrap_or_default();
1223
1224        let result = matches_pattern(
1225            Some(event_pattern),
1226            source,
1227            detail_type,
1228            &detail,
1229            account,
1230            region,
1231            &resources,
1232        );
1233
1234        Ok(AwsResponse::ok_json(json!({ "Result": result })))
1235    }
1236
1237    // ─── UpdateEventBus ─────────────────────────────────────────────────
1238
1239    fn update_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1240        let body = req.json_body();
1241        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
1242        validate_optional_string_length(
1243            "kmsKeyIdentifier",
1244            body["KmsKeyIdentifier"].as_str(),
1245            0,
1246            2048,
1247        )?;
1248        let name = body["Name"].as_str().unwrap_or("default");
1249
1250        let mut accounts = self.state.write();
1251        let state = accounts.get_or_create(&req.account_id);
1252        let bus = state.buses.get_mut(name).ok_or_else(|| {
1253            AwsServiceError::aws_error(
1254                StatusCode::BAD_REQUEST,
1255                "ResourceNotFoundException",
1256                format!("Event bus {name} does not exist."),
1257            )
1258        })?;
1259
1260        if let Some(desc) = body["Description"].as_str() {
1261            bus.description = Some(desc.to_string());
1262        }
1263        if let Some(kms) = body["KmsKeyIdentifier"].as_str() {
1264            bus.kms_key_identifier = Some(kms.to_string());
1265        }
1266        if let Some(dlc) = body.get("DeadLetterConfig") {
1267            bus.dead_letter_config = Some(dlc.clone());
1268        }
1269        bus.last_modified_time = Utc::now();
1270
1271        let arn = bus.arn.clone();
1272        let bus_name = bus.name.clone();
1273
1274        Ok(AwsResponse::ok_json(json!({
1275            "Arn": arn,
1276            "Name": bus_name,
1277        })))
1278    }
1279
1280    // ─── Endpoint Operations ────────────────────────────────────────────
1281
1282    fn put_events(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1283        let body = req.json_body();
1284        // Smithy PutEventsRequest constraints:
1285        //   EndpointId: length 1..=50
1286        //   Entries: @required, length 1..=10
1287        validate_optional_string_length_value("EndpointId", &body["EndpointId"], 1, 50)?;
1288        validate_required("Entries", &body["Entries"])?;
1289        let entries_array = body["Entries"].as_array().ok_or_else(|| {
1290            AwsServiceError::aws_error(
1291                StatusCode::BAD_REQUEST,
1292                "ValidationException",
1293                "Entries must be a list",
1294            )
1295        })?;
1296        if entries_array.is_empty() || entries_array.len() > 10 {
1297            return Err(AwsServiceError::aws_error(
1298                StatusCode::BAD_REQUEST,
1299                "ValidationException",
1300                "Value at 'Entries' failed to satisfy constraint: \
1301                 Member must have length between 1 and 10",
1302            ));
1303        }
1304        let entries: Vec<Value> = entries_array.clone();
1305        let entries = &entries;
1306
1307        let mut accounts = self.state.write();
1308        let state = accounts.get_or_create(&req.account_id);
1309        let mut result_entries = Vec::new();
1310        let mut events_to_deliver = Vec::new();
1311        let mut failed_count = 0;
1312
1313        for entry in entries {
1314            let source = entry["Source"].as_str().unwrap_or("").to_string();
1315            let detail_type = entry["DetailType"].as_str().unwrap_or("").to_string();
1316            let detail = entry["Detail"].as_str().unwrap_or("").to_string();
1317
1318            if let Err(error) = validate_put_events_entry(&source, &detail_type, &detail) {
1319                failed_count += 1;
1320                result_entries.push(error);
1321                continue;
1322            }
1323
1324            let event_id = uuid::Uuid::new_v4().to_string();
1325            let raw_bus = entry["EventBusName"]
1326                .as_str()
1327                .unwrap_or("default")
1328                .to_string();
1329            let event_bus_name = state.resolve_bus_name(&raw_bus);
1330
1331            // Bus resource-policy gate. AWS evaluates the bus's
1332            // resource policy against cross-account callers; same-account
1333            // callers always have access. The policy itself is JSON
1334            // stored as serde_json::Value so the IAM evaluator parses
1335            // it the same way it parses an S3 bucket policy.
1336            let caller_account = req
1337                .principal
1338                .as_ref()
1339                .map(|p| p.account_id.as_str())
1340                .unwrap_or(req.account_id.as_str());
1341            if caller_account != req.account_id {
1342                let bus_policy_value = state
1343                    .buses
1344                    .get(&event_bus_name)
1345                    .and_then(|b| b.policy.clone());
1346                if let Some(policy_value) = bus_policy_value {
1347                    let policy_json = serde_json::to_string(&policy_value).unwrap_or_default();
1348                    let policy_doc = fakecloud_iam::evaluator::PolicyDocument::parse(&policy_json);
1349                    let bus_arn = state
1350                        .buses
1351                        .get(&event_bus_name)
1352                        .map(|b| b.arn.clone())
1353                        .unwrap_or_default();
1354                    let principal =
1355                        req.principal
1356                            .clone()
1357                            .unwrap_or_else(|| fakecloud_core::auth::Principal {
1358                                arn: Arn::global("iam", caller_account, "root").to_string(),
1359                                user_id: caller_account.to_string(),
1360                                account_id: caller_account.to_string(),
1361                                principal_type: fakecloud_core::auth::PrincipalType::Root,
1362                                source_identity: None,
1363                                tags: None,
1364                            });
1365                    let context = fakecloud_iam::evaluator::RequestContext {
1366                        aws_principal_arn: Some(principal.arn.clone()),
1367                        aws_principal_account: Some(principal.account_id.clone()),
1368                        ..Default::default()
1369                    };
1370                    let eval_req = fakecloud_iam::evaluator::EvalRequest {
1371                        principal: &principal,
1372                        action: "events:PutEvents".to_string(),
1373                        resource: bus_arn,
1374                        context,
1375                    };
1376                    let decision = fakecloud_iam::evaluator::evaluate_resource_policy_only(
1377                        &policy_doc,
1378                        &eval_req,
1379                    );
1380                    if !matches!(decision, fakecloud_iam::evaluator::Decision::Allow) {
1381                        failed_count += 1;
1382                        result_entries.push(json!({
1383                            "ErrorCode": "AccessDeniedException",
1384                            "ErrorMessage": format!(
1385                                "User '{}' is not authorized to put events on event bus '{}'",
1386                                principal.arn, event_bus_name
1387                            ),
1388                        }));
1389                        continue;
1390                    }
1391                }
1392            }
1393
1394            let time = parse_put_events_time(&entry["Time"]);
1395            let resources: Vec<String> = entry["Resources"]
1396                .as_array()
1397                .map(|arr| {
1398                    arr.iter()
1399                        .filter_map(|v| v.as_str().map(|s| s.to_string()))
1400                        .collect()
1401                })
1402                .unwrap_or_default();
1403
1404            let event = PutEvent {
1405                event_id: event_id.clone(),
1406                source: source.clone(),
1407                detail_type: detail_type.clone(),
1408                detail: detail.clone(),
1409                event_bus_name: event_bus_name.clone(),
1410                time,
1411                resources: resources.clone(),
1412            };
1413
1414            archive_matching_event(
1415                state,
1416                &event,
1417                &event_bus_name,
1418                &source,
1419                &detail_type,
1420                &detail,
1421                &req.account_id,
1422                &req.region,
1423                &resources,
1424            );
1425
1426            state.events.push(event);
1427
1428            // Find matching rules and their targets
1429            let matching_targets: Vec<EventTarget> = state
1430                .rules
1431                .values()
1432                .filter(|r| {
1433                    r.event_bus_name == event_bus_name
1434                        && r.state == "ENABLED"
1435                        && matches_pattern(
1436                            r.event_pattern.as_deref(),
1437                            &source,
1438                            &detail_type,
1439                            &detail,
1440                            &req.account_id,
1441                            &req.region,
1442                            &resources,
1443                        )
1444                })
1445                .flat_map(|r| r.targets.clone())
1446                .collect();
1447
1448            if !matching_targets.is_empty() {
1449                events_to_deliver.push((
1450                    event_id.clone(),
1451                    source,
1452                    detail_type,
1453                    detail,
1454                    time,
1455                    resources,
1456                    matching_targets,
1457                ));
1458            }
1459
1460            result_entries.push(json!({ "EventId": event_id }));
1461        }
1462
1463        // Drop the lock before delivering
1464        drop(accounts);
1465
1466        // Deliver to targets — single-target dispatch lives in the
1467        // shared helper so cross-service callers (delivery.rs) honor the
1468        // same target shape (SQS/SNS/Lambda/Logs/Kinesis/StepFunctions/
1469        // ApiDestination/HTTP) and the same InputTransformer rules.
1470        for (event_id, source, detail_type, detail, time, resources, targets) in events_to_deliver {
1471            let detail_value: Value = serde_json::from_str(&detail).unwrap_or(json!({}));
1472            let event_json = json!({
1473                "version": "0",
1474                "id": event_id,
1475                "source": source,
1476                "account": req.account_id,
1477                "detail-type": detail_type,
1478                "detail": detail_value,
1479                "time": time.format("%Y-%m-%dT%H:%M:%SZ").to_string(),
1480                "region": req.region,
1481                "resources": resources,
1482            });
1483
1484            let ctx = EventDispatchContext {
1485                state: &self.state,
1486                delivery: &self.delivery,
1487                lambda_state: self.lambda_state.as_ref(),
1488                logs_state: self.logs_state.as_ref(),
1489                container_runtime: &self.container_runtime,
1490                account_id: &req.account_id,
1491                region: &req.region,
1492            };
1493            for target in targets {
1494                dispatch_event_target(&ctx, &target, &event_json, &event_id, &detail_type);
1495            }
1496        }
1497
1498        let resp = json!({
1499            "FailedEntryCount": failed_count,
1500            "Entries": result_entries,
1501        });
1502
1503        Ok(AwsResponse::ok_json(resp))
1504    }
1505
1506    // ─── Tagging ────────────────────────────────────────────────────────
1507
1508    fn tag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1509        let body = req.json_body();
1510        validate_required("ResourceARN", &body["ResourceARN"])?;
1511        let arn = body["ResourceARN"]
1512            .as_str()
1513            .ok_or_else(|| missing("ResourceARN"))?;
1514        validate_string_length("resourceARN", arn, 1, 1600)?;
1515        validate_required("Tags", &body["Tags"])?;
1516
1517        let mut accounts = self.state.write();
1518        let state = accounts.get_or_create(&req.account_id);
1519        let tag_map = find_tags_mut(state, arn)?;
1520
1521        fakecloud_core::tags::apply_tags(tag_map, &body, "Tags", "Key", "Value").map_err(|f| {
1522            AwsServiceError::aws_error(
1523                StatusCode::BAD_REQUEST,
1524                "ValidationException",
1525                format!("{f} must be a list"),
1526            )
1527        })?;
1528
1529        Ok(AwsResponse::ok_json(json!({})))
1530    }
1531
1532    fn untag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1533        let body = req.json_body();
1534        validate_required("ResourceARN", &body["ResourceARN"])?;
1535        let arn = body["ResourceARN"]
1536            .as_str()
1537            .ok_or_else(|| missing("ResourceARN"))?;
1538        validate_string_length("resourceARN", arn, 1, 1600)?;
1539        validate_required("TagKeys", &body["TagKeys"])?;
1540
1541        let mut accounts = self.state.write();
1542        let state = accounts.get_or_create(&req.account_id);
1543        let tag_map = find_tags_mut(state, arn)?;
1544
1545        fakecloud_core::tags::remove_tags(tag_map, &body, "TagKeys").map_err(|f| {
1546            AwsServiceError::aws_error(
1547                StatusCode::BAD_REQUEST,
1548                "ValidationException",
1549                format!("{f} must be a list"),
1550            )
1551        })?;
1552
1553        Ok(AwsResponse::ok_json(json!({})))
1554    }
1555
1556    fn list_tags_for_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1557        let body = req.json_body();
1558        validate_required("ResourceARN", &body["ResourceARN"])?;
1559        let arn = body["ResourceARN"]
1560            .as_str()
1561            .ok_or_else(|| missing("ResourceARN"))?;
1562        validate_string_length("resourceARN", arn, 1, 1600)?;
1563
1564        let accounts = self.state.read();
1565        let empty = EventBridgeState::new(&req.account_id, &req.region);
1566        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1567        let tag_map = find_tags(state, arn)?;
1568
1569        let tags = fakecloud_core::tags::tags_to_json(tag_map, "Key", "Value");
1570
1571        Ok(AwsResponse::ok_json(json!({ "Tags": tags })))
1572    }
1573
1574    // ─── Archive Operations ─────────────────────────────────────────────
1575}
1576
1577// ─── Tag Lookup Helpers ─────────────────────────────────────────────────
1578
1579// ─── Event Pattern Validation ────────────────────────────────────────
1580
1581// ─── Connection Auth Params Response Builder ────────────────────────
1582
1583// ─── Event Pattern Matching ─────────────────────────────────────────
1584
1585/// Parsed + validated inputs for `StartReplay`.
1586struct StartReplayInput {
1587    name: String,
1588    description: Option<String>,
1589    event_source_arn: String,
1590    destination: Value,
1591    destination_arn: String,
1592    event_start_time: DateTime<Utc>,
1593    event_end_time: DateTime<Utc>,
1594}
1595
1596impl StartReplayInput {
1597    fn from_body(body: &Value) -> Result<Self, AwsServiceError> {
1598        // StartReplay's Smithy model declares ResourceNotFound,
1599        // ResourceAlreadyExists, InvalidEventPattern, LimitExceeded, and
1600        // Internal — but not ValidationException. Per-field constraints are
1601        // enforced client-side by the SDK; we surface only declared errors
1602        // here. Missing required inputs default to empty strings and the
1603        // downstream bus/archive lookups produce ResourceNotFound for the
1604        // ones that matter.
1605        let name = body["ReplayName"].as_str().unwrap_or("").to_string();
1606        let description = body["Description"].as_str().map(|s| s.to_string());
1607        let event_source_arn = body["EventSourceArn"].as_str().unwrap_or("").to_string();
1608        let destination = body["Destination"].clone();
1609
1610        let event_start_time = body["EventStartTime"]
1611            .as_f64()
1612            .and_then(|f| DateTime::from_timestamp(f as i64, 0))
1613            .unwrap_or_else(Utc::now);
1614        let event_end_time = body["EventEndTime"]
1615            .as_f64()
1616            .and_then(|f| DateTime::from_timestamp(f as i64, 0))
1617            .unwrap_or_else(Utc::now);
1618
1619        let destination_arn = destination["Arn"].as_str().unwrap_or("").to_string();
1620        if !destination_arn.contains(":event-bus/") {
1621            // Missing/invalid destination cannot be resolved to a bus —
1622            // surface as ResourceNotFound rather than the undeclared
1623            // ValidationException.
1624            return Err(AwsServiceError::aws_error(
1625                StatusCode::BAD_REQUEST,
1626                "ResourceNotFoundException",
1627                format!("Destination.Arn {destination_arn} does not point to an event bus."),
1628            ));
1629        }
1630
1631        Ok(Self {
1632            name,
1633            description,
1634            event_source_arn,
1635            destination,
1636            destination_arn,
1637            event_start_time,
1638            event_end_time,
1639        })
1640    }
1641}
1642
1643#[path = "service_archives_replays.rs"]
1644mod service_archives_replays;
1645#[path = "service_connections_apidests.rs"]
1646mod service_connections_apidests;
1647#[path = "service_endpoints.rs"]
1648mod service_endpoints;
1649#[path = "service_partner_sources.rs"]
1650mod service_partner_sources;
1651
1652#[path = "helpers.rs"]
1653pub(crate) mod helpers;
1654pub(crate) use helpers::*;
1655
1656#[cfg(test)]
1657#[path = "service_tests.rs"]
1658mod tests;
1659
1660#[cfg(test)]
1661mod pagination_reject_test {
1662    #[test]
1663    fn paginate_checked_rejects_invalid_token() {
1664        use fakecloud_core::pagination::paginate_checked;
1665        let items: Vec<i32> = (0..5).collect();
1666        assert!(paginate_checked(&items, Some("bad"), 3).is_err());
1667        assert!(paginate_checked(&items, Some("2"), 3).is_ok());
1668    }
1669}