Skip to main content

fakecloud_eventbridge/
service.rs

1use async_trait::async_trait;
2use chrono::{DateTime, Utc};
3use http::StatusCode;
4use serde_json::{json, Value};
5
6use std::collections::HashMap;
7use std::sync::Arc;
8
9use tokio::sync::Mutex as AsyncMutex;
10
11use fakecloud_aws::arn::Arn;
12use fakecloud_core::delivery::DeliveryBus;
13use fakecloud_core::pagination::paginate;
14use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
15use fakecloud_core::validation::*;
16use fakecloud_persistence::SnapshotStore;
17
18use fakecloud_lambda::runtime::ContainerRuntime;
19use fakecloud_lambda::state::{LambdaInvocation, SharedLambdaState};
20use fakecloud_logs::state::SharedLogsState;
21
22use crate::state::{
23    ApiDestination, Archive, Connection, Endpoint, EventBridgeSnapshot, EventBridgeState, EventBus,
24    EventRule, EventTarget, PartnerEventSource, PutEvent, Replay, SharedEventBridgeState,
25    EVENTBRIDGE_SNAPSHOT_SCHEMA_VERSION,
26};
27
28/// Validate a single `PutEvents` entry's required fields (`Source`,
29/// `DetailType`, `Detail`) and that `Detail` is a well-formed JSON
30/// object. Returns the JSON error body AWS surfaces in the matching
31/// `Entries[]` slot on failure.
32fn validate_put_events_entry(source: &str, detail_type: &str, detail: &str) -> Result<(), Value> {
33    if source.is_empty() {
34        return Err(json!({
35            "ErrorCode": "InvalidArgument",
36            "ErrorMessage": "Parameter Source is not valid. Reason: Source is a required argument.",
37        }));
38    }
39    if detail_type.is_empty() {
40        return Err(json!({
41            "ErrorCode": "InvalidArgument",
42            "ErrorMessage": "Parameter DetailType is not valid. Reason: DetailType is a required argument.",
43        }));
44    }
45    if detail.is_empty() {
46        return Err(json!({
47            "ErrorCode": "InvalidArgument",
48            "ErrorMessage": "Parameter Detail is not valid. Reason: Detail is a required argument.",
49        }));
50    }
51    if serde_json::from_str::<Value>(detail).is_err() {
52        return Err(json!({
53            "ErrorCode": "MalformedDetail",
54            "ErrorMessage": "Detail is malformed.",
55        }));
56    }
57    Ok(())
58}
59
60/// Parse an entry's `Time` field, tolerating the three formats AWS
61/// accepts (RFC 3339 string, fractional seconds as a float, integer
62/// seconds). Falls back to "now" if the field is absent or
63/// unparseable, which matches the real service.
64fn parse_put_events_time(raw: &Value) -> DateTime<Utc> {
65    if let Some(s) = raw.as_str() {
66        return DateTime::parse_from_rfc3339(s)
67            .map(|dt| dt.with_timezone(&Utc))
68            .unwrap_or_else(|_| Utc::now());
69    }
70    if let Some(ts) = raw.as_f64() {
71        return DateTime::from_timestamp(ts as i64, ((ts.fract()) * 1_000_000_000.0) as u32)
72            .unwrap_or_else(Utc::now);
73    }
74    if let Some(ts) = raw.as_i64() {
75        return DateTime::from_timestamp(ts, 0).unwrap_or_else(Utc::now);
76    }
77    Utc::now()
78}
79
80/// Actions that mutate EventBridge state.
81fn is_mutating_action(action: &str) -> bool {
82    matches!(
83        action,
84        "CreateEventBus"
85            | "DeleteEventBus"
86            | "UpdateEventBus"
87            | "PutRule"
88            | "DeleteRule"
89            | "EnableRule"
90            | "DisableRule"
91            | "PutTargets"
92            | "RemoveTargets"
93            | "PutEvents"
94            | "PutPermission"
95            | "RemovePermission"
96            | "TagResource"
97            | "UntagResource"
98            | "CreateArchive"
99            | "UpdateArchive"
100            | "DeleteArchive"
101            | "CreateConnection"
102            | "UpdateConnection"
103            | "DeleteConnection"
104            | "DeauthorizeConnection"
105            | "CreateApiDestination"
106            | "UpdateApiDestination"
107            | "DeleteApiDestination"
108            | "StartReplay"
109            | "CancelReplay"
110            | "CreatePartnerEventSource"
111            | "DeletePartnerEventSource"
112            | "ActivateEventSource"
113            | "DeactivateEventSource"
114            | "PutPartnerEvents"
115            | "CreateEndpoint"
116            | "DeleteEndpoint"
117            | "UpdateEndpoint"
118    )
119}
120
121pub struct EventBridgeService {
122    state: SharedEventBridgeState,
123    delivery: Arc<DeliveryBus>,
124    lambda_state: Option<SharedLambdaState>,
125    logs_state: Option<SharedLogsState>,
126    container_runtime: Option<Arc<ContainerRuntime>>,
127    snapshot_store: Option<Arc<dyn SnapshotStore>>,
128    snapshot_lock: Arc<AsyncMutex<()>>,
129}
130
131impl EventBridgeService {
132    pub fn new(state: SharedEventBridgeState, delivery: Arc<DeliveryBus>) -> Self {
133        Self {
134            state,
135            delivery,
136            lambda_state: None,
137            logs_state: None,
138            container_runtime: None,
139            snapshot_store: None,
140            snapshot_lock: Arc::new(AsyncMutex::new(())),
141        }
142    }
143
144    pub fn with_lambda(mut self, lambda_state: SharedLambdaState) -> Self {
145        self.lambda_state = Some(lambda_state);
146        self
147    }
148
149    pub fn with_logs(mut self, logs_state: SharedLogsState) -> Self {
150        self.logs_state = Some(logs_state);
151        self
152    }
153
154    pub fn with_runtime(mut self, runtime: Arc<ContainerRuntime>) -> Self {
155        self.container_runtime = Some(runtime);
156        self
157    }
158
159    pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
160        self.snapshot_store = Some(store);
161        self
162    }
163
164    /// Persist current state as a snapshot. Held across the
165    /// clone-serialize-write sequence to prevent stale-last writes,
166    /// with serde + file I/O offloaded to the blocking pool.
167    async fn save_snapshot(&self) {
168        let Some(store) = self.snapshot_store.clone() else {
169            return;
170        };
171        let _guard = self.snapshot_lock.lock().await;
172        let snapshot = EventBridgeSnapshot {
173            schema_version: EVENTBRIDGE_SNAPSHOT_SCHEMA_VERSION,
174            accounts: Some(self.state.read().clone()),
175            state: None,
176        };
177        let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
178            let bytes = serde_json::to_vec(&snapshot)
179                .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
180            store.save(&bytes)
181        })
182        .await;
183        match join {
184            Ok(Ok(())) => {}
185            Ok(Err(err)) => tracing::error!(%err, "failed to write eventbridge snapshot"),
186            Err(err) => tracing::error!(%err, "eventbridge snapshot task panicked"),
187        }
188    }
189}
190
191#[async_trait]
192impl AwsService for EventBridgeService {
193    fn service_name(&self) -> &str {
194        "events"
195    }
196
197    async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
198        let mutates = is_mutating_action(req.action.as_str());
199        let result = match req.action.as_str() {
200            "CreateEventBus" => self.create_event_bus(&req),
201            "DeleteEventBus" => self.delete_event_bus(&req),
202            "ListEventBuses" => self.list_event_buses(&req),
203            "DescribeEventBus" => self.describe_event_bus(&req),
204            "PutRule" => self.put_rule(&req),
205            "DeleteRule" => self.delete_rule(&req),
206            "ListRules" => self.list_rules(&req),
207            "DescribeRule" => self.describe_rule(&req),
208            "EnableRule" => self.enable_rule(&req),
209            "DisableRule" => self.disable_rule(&req),
210            "PutTargets" => self.put_targets(&req),
211            "RemoveTargets" => self.remove_targets(&req),
212            "ListTargetsByRule" => self.list_targets_by_rule(&req),
213            "ListRuleNamesByTarget" => self.list_rule_names_by_target(&req),
214            "PutEvents" => self.put_events(&req),
215            "PutPermission" => self.put_permission(&req),
216            "RemovePermission" => self.remove_permission(&req),
217            "TagResource" => self.tag_resource(&req),
218            "UntagResource" => self.untag_resource(&req),
219            "ListTagsForResource" => self.list_tags_for_resource(&req),
220            "CreateArchive" => self.create_archive(&req),
221            "DescribeArchive" => self.describe_archive(&req),
222            "ListArchives" => self.list_archives(&req),
223            "UpdateArchive" => self.update_archive(&req),
224            "DeleteArchive" => self.delete_archive(&req),
225            "CreateConnection" => self.create_connection(&req),
226            "DescribeConnection" => self.describe_connection(&req),
227            "ListConnections" => self.list_connections(&req),
228            "UpdateConnection" => self.update_connection(&req),
229            "DeleteConnection" => self.delete_connection(&req),
230            "CreateApiDestination" => self.create_api_destination(&req),
231            "DescribeApiDestination" => self.describe_api_destination(&req),
232            "ListApiDestinations" => self.list_api_destinations(&req),
233            "UpdateApiDestination" => self.update_api_destination(&req),
234            "DeleteApiDestination" => self.delete_api_destination(&req),
235            "StartReplay" => self.start_replay(&req),
236            "DescribeReplay" => self.describe_replay(&req),
237            "ListReplays" => self.list_replays(&req),
238            "CancelReplay" => self.cancel_replay(&req),
239            "CreatePartnerEventSource" => self.create_partner_event_source(&req),
240            "DeletePartnerEventSource" => self.delete_partner_event_source(&req),
241            "DescribePartnerEventSource" => self.describe_partner_event_source(&req),
242            "ListPartnerEventSources" => self.list_partner_event_sources(&req),
243            "ListPartnerEventSourceAccounts" => self.list_partner_event_source_accounts(&req),
244            "ActivateEventSource" => self.activate_event_source(&req),
245            "DeactivateEventSource" => self.deactivate_event_source(&req),
246            "DescribeEventSource" => self.describe_event_source(&req),
247            "ListEventSources" => self.list_event_sources(&req),
248            "PutPartnerEvents" => self.put_partner_events(&req),
249            "TestEventPattern" => self.test_event_pattern(&req),
250            "UpdateEventBus" => self.update_event_bus(&req),
251            "CreateEndpoint" => self.create_endpoint(&req),
252            "DeleteEndpoint" => self.delete_endpoint(&req),
253            "DescribeEndpoint" => self.describe_endpoint(&req),
254            "ListEndpoints" => self.list_endpoints(&req),
255            "UpdateEndpoint" => self.update_endpoint(&req),
256            "DeauthorizeConnection" => self.deauthorize_connection(&req),
257            _ => Err(AwsServiceError::action_not_implemented(
258                "events",
259                &req.action,
260            )),
261        };
262        if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
263            self.save_snapshot().await;
264        }
265        result
266    }
267
268    fn supported_actions(&self) -> &[&str] {
269        &[
270            "CreateEventBus",
271            "DeleteEventBus",
272            "ListEventBuses",
273            "DescribeEventBus",
274            "PutRule",
275            "DeleteRule",
276            "ListRules",
277            "DescribeRule",
278            "EnableRule",
279            "DisableRule",
280            "PutTargets",
281            "RemoveTargets",
282            "ListTargetsByRule",
283            "ListRuleNamesByTarget",
284            "PutEvents",
285            "PutPermission",
286            "RemovePermission",
287            "TagResource",
288            "UntagResource",
289            "ListTagsForResource",
290            "CreateArchive",
291            "DescribeArchive",
292            "ListArchives",
293            "UpdateArchive",
294            "DeleteArchive",
295            "CreateConnection",
296            "DescribeConnection",
297            "ListConnections",
298            "UpdateConnection",
299            "DeleteConnection",
300            "CreateApiDestination",
301            "DescribeApiDestination",
302            "ListApiDestinations",
303            "UpdateApiDestination",
304            "DeleteApiDestination",
305            "StartReplay",
306            "DescribeReplay",
307            "ListReplays",
308            "CancelReplay",
309            "CreatePartnerEventSource",
310            "DeletePartnerEventSource",
311            "DescribePartnerEventSource",
312            "ListPartnerEventSources",
313            "ListPartnerEventSourceAccounts",
314            "ActivateEventSource",
315            "DeactivateEventSource",
316            "DescribeEventSource",
317            "ListEventSources",
318            "PutPartnerEvents",
319            "TestEventPattern",
320            "UpdateEventBus",
321            "CreateEndpoint",
322            "DeleteEndpoint",
323            "DescribeEndpoint",
324            "ListEndpoints",
325            "UpdateEndpoint",
326            "DeauthorizeConnection",
327        ]
328    }
329}
330
331fn parse_tags(body: &Value) -> HashMap<String, String> {
332    let mut tags = HashMap::new();
333    if let Some(arr) = body["Tags"].as_array() {
334        for tag in arr {
335            if let (Some(key), Some(val)) = (tag["Key"].as_str(), tag["Value"].as_str()) {
336                tags.insert(key.to_string(), val.to_string());
337            }
338        }
339    }
340    tags
341}
342
343fn parse_target(target: &Value) -> EventTarget {
344    EventTarget {
345        id: target["Id"].as_str().unwrap_or("").to_string(),
346        arn: target["Arn"].as_str().unwrap_or("").to_string(),
347        input: target["Input"].as_str().map(|s| s.to_string()),
348        input_path: target["InputPath"].as_str().map(|s| s.to_string()),
349        input_transformer: target.get("InputTransformer").cloned(),
350        sqs_parameters: target.get("SqsParameters").cloned(),
351    }
352}
353
354fn target_to_json(t: &EventTarget) -> Value {
355    let mut obj = json!({ "Id": t.id, "Arn": t.arn });
356    if let Some(ref input) = t.input {
357        obj["Input"] = json!(input);
358    }
359    if let Some(ref input_path) = t.input_path {
360        obj["InputPath"] = json!(input_path);
361    }
362    if let Some(ref it) = t.input_transformer {
363        obj["InputTransformer"] = it.clone();
364    }
365    if let Some(ref sp) = t.sqs_parameters {
366        obj["SqsParameters"] = sp.clone();
367    }
368    obj
369}
370
371// ─── Event Bus Operations ───────────────────────────────────────────
372impl EventBridgeService {
373    fn create_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
374        let body = req.json_body();
375        validate_required("Name", &body["Name"])?;
376        let name = body["Name"]
377            .as_str()
378            .ok_or_else(|| missing("Name"))?
379            .to_string();
380        validate_string_length("name", &name, 1, 256)?;
381        validate_optional_string_length(
382            "eventSourceName",
383            body["EventSourceName"].as_str(),
384            1,
385            256,
386        )?;
387        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
388        validate_optional_string_length(
389            "kmsKeyIdentifier",
390            body["KmsKeyIdentifier"].as_str(),
391            0,
392            2048,
393        )?;
394
395        // Validate name doesn't contain '/' (unless partner bus)
396        if name.contains('/') && !name.starts_with("aws.partner/") {
397            return Err(AwsServiceError::aws_error(
398                StatusCode::BAD_REQUEST,
399                "ValidationException",
400                "Event bus name must not contain '/'.",
401            ));
402        }
403
404        // Partner event bus validation
405        if name.starts_with("aws.partner/") {
406            let event_source = body["EventSourceName"].as_str().unwrap_or("");
407            let accounts_r = self.state.read();
408            let empty_r = EventBridgeState::new(&req.account_id, &req.region);
409            let state_r = accounts_r.get(&req.account_id).unwrap_or(&empty_r);
410            let has_source = state_r.partner_event_sources.contains_key(event_source);
411            drop(accounts_r);
412            if !has_source {
413                return Err(AwsServiceError::aws_error(
414                    StatusCode::BAD_REQUEST,
415                    "ResourceNotFoundException",
416                    format!("Event source {event_source} does not exist."),
417                ));
418            }
419        }
420
421        let mut accounts = self.state.write();
422        let state = accounts.get_or_create(&req.account_id);
423
424        if state.buses.contains_key(&name) {
425            return Err(AwsServiceError::aws_error(
426                StatusCode::BAD_REQUEST,
427                "ResourceAlreadyExistsException",
428                format!("Event bus {name} already exists."),
429            ));
430        }
431
432        let arn = format!(
433            "arn:aws:events:{}:{}:event-bus/{}",
434            req.region, state.account_id, name
435        );
436        let now = Utc::now();
437        let description = body["Description"].as_str().map(|s| s.to_string());
438        let kms_key_identifier = body["KmsKeyIdentifier"].as_str().map(|s| s.to_string());
439        let dead_letter_config = body.get("DeadLetterConfig").cloned();
440
441        let tags = parse_tags(&body);
442
443        let bus = EventBus {
444            name: name.clone(),
445            arn: arn.clone(),
446            tags,
447            policy: None,
448            description,
449            kms_key_identifier,
450            dead_letter_config,
451            creation_time: now,
452            last_modified_time: now,
453        };
454        state.buses.insert(name, bus);
455
456        Ok(AwsResponse::ok_json(json!({ "EventBusArn": arn })))
457    }
458
459    fn delete_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
460        let body = req.json_body();
461        validate_required("Name", &body["Name"])?;
462        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
463        validate_string_length("name", name, 1, 256)?;
464
465        if name == "default" {
466            return Err(AwsServiceError::aws_error(
467                StatusCode::BAD_REQUEST,
468                "ValidationException",
469                format!("Cannot delete event bus {name}."),
470            ));
471        }
472
473        let mut accounts = self.state.write();
474        let state = accounts.get_or_create(&req.account_id);
475        state.buses.remove(name);
476        state.rules.retain(|k, _| k.0 != name);
477
478        Ok(AwsResponse::ok_json(json!({})))
479    }
480
481    fn list_event_buses(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
482        let body = req.json_body();
483        validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 256)?;
484        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
485        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
486        let name_prefix = body["NamePrefix"].as_str();
487        let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
488        if let Some(t) = body["NextToken"].as_str() {
489            t.parse::<usize>().map_err(|_| {
490                AwsServiceError::aws_error(
491                    StatusCode::BAD_REQUEST,
492                    "InvalidNextTokenException",
493                    format!("Invalid NextToken value: '{t}'"),
494                )
495            })?;
496        }
497
498        let accounts = self.state.read();
499        let empty = EventBridgeState::new(&req.account_id, &req.region);
500        let state = accounts.get(&req.account_id).unwrap_or(&empty);
501        let filtered: Vec<&_> = state
502            .buses
503            .values()
504            .filter(|b| match name_prefix {
505                Some(prefix) => b.name.starts_with(prefix),
506                None => true,
507            })
508            .collect();
509
510        let (page, next_token) = paginate(&filtered, body["NextToken"].as_str(), limit);
511        let buses: Vec<Value> = page
512            .iter()
513            .map(|b| json!({ "Name": b.name, "Arn": b.arn }))
514            .collect();
515        let mut resp = json!({ "EventBuses": buses });
516        if let Some(token) = next_token {
517            resp["NextToken"] = json!(token);
518        }
519
520        Ok(AwsResponse::ok_json(resp))
521    }
522
523    fn describe_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
524        let body = req.json_body();
525        validate_optional_string_length("name", body["Name"].as_str(), 1, 1600)?;
526        let name = body["Name"].as_str().unwrap_or("default");
527
528        let accounts = self.state.read();
529        let empty = EventBridgeState::new(&req.account_id, &req.region);
530        let state = accounts.get(&req.account_id).unwrap_or(&empty);
531        let bus = state.buses.get(name).ok_or_else(|| {
532            AwsServiceError::aws_error(
533                StatusCode::BAD_REQUEST,
534                "ResourceNotFoundException",
535                format!("Event bus {name} does not exist."),
536            )
537        })?;
538
539        let mut resp = json!({
540            "Name": bus.name,
541            "Arn": bus.arn,
542            "CreationTime": bus.creation_time.timestamp() as f64,
543            "LastModifiedTime": bus.last_modified_time.timestamp() as f64,
544        });
545
546        if let Some(ref policy) = bus.policy {
547            resp["Policy"] = Value::String(serde_json::to_string(policy).unwrap());
548        }
549        if let Some(ref desc) = bus.description {
550            resp["Description"] = json!(desc);
551        }
552        if let Some(ref kms) = bus.kms_key_identifier {
553            resp["KmsKeyIdentifier"] = json!(kms);
554        }
555        if let Some(ref dlc) = bus.dead_letter_config {
556            resp["DeadLetterConfig"] = dlc.clone();
557        }
558
559        Ok(AwsResponse::ok_json(resp))
560    }
561
562    // ─── Permission Operations ──────────────────────────────────────────
563
564    fn put_permission(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
565        let body = req.json_body();
566        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 256)?;
567        validate_optional_string_length("action", body["Action"].as_str(), 1, 64)?;
568        validate_optional_string_length("principal", body["Principal"].as_str(), 1, 12)?;
569        validate_optional_string_length("statementId", body["StatementId"].as_str(), 1, 64)?;
570        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
571
572        let mut accounts = self.state.write();
573        let state = accounts.get_or_create(&req.account_id);
574
575        let bus = state.buses.get_mut(event_bus_name).ok_or_else(|| {
576            AwsServiceError::aws_error(
577                StatusCode::BAD_REQUEST,
578                "ResourceNotFoundException",
579                format!("Event bus {event_bus_name} does not exist."),
580            )
581        })?;
582
583        // Check if Policy is provided (new-style)
584        if let Some(policy_str) = body["Policy"].as_str() {
585            if let Ok(policy) = serde_json::from_str::<Value>(policy_str) {
586                bus.policy = Some(policy);
587                return Ok(AwsResponse::ok_json(json!({})));
588            }
589        }
590
591        // Old-style: Action, Principal, StatementId
592        let action = body["Action"].as_str().unwrap_or("");
593        let principal = body["Principal"].as_str().unwrap_or("");
594        let statement_id = body["StatementId"].as_str().unwrap_or("");
595
596        // Validate action
597        if action != "events:PutEvents" {
598            return Err(AwsServiceError::aws_error(
599                StatusCode::BAD_REQUEST,
600                "ValidationException",
601                "Provided value in parameter 'action' is not supported.",
602            ));
603        }
604
605        let statement = json!({
606            "Sid": statement_id,
607            "Effect": "Allow",
608            "Principal": { "AWS": Arn::global("iam", principal, "root").to_string() },
609            "Action": action,
610            "Resource": bus.arn,
611        });
612
613        let policy = bus.policy.get_or_insert_with(|| {
614            json!({
615                "Version": "2012-10-17",
616                "Statement": [],
617            })
618        });
619
620        if let Some(stmts) = policy["Statement"].as_array_mut() {
621            stmts.push(statement);
622        }
623
624        Ok(AwsResponse::ok_json(json!({})))
625    }
626
627    fn remove_permission(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
628        let body = req.json_body();
629        validate_optional_string_length("statementId", body["StatementId"].as_str(), 1, 64)?;
630        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 256)?;
631        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
632        let statement_id = body["StatementId"].as_str().unwrap_or("");
633        let remove_all = body["RemoveAllPermissions"].as_bool().unwrap_or(false);
634
635        let mut accounts = self.state.write();
636        let state = accounts.get_or_create(&req.account_id);
637
638        let bus = state.buses.get_mut(event_bus_name).ok_or_else(|| {
639            AwsServiceError::aws_error(
640                StatusCode::BAD_REQUEST,
641                "ResourceNotFoundException",
642                format!("Event bus {event_bus_name} does not exist."),
643            )
644        })?;
645
646        if remove_all {
647            bus.policy = None;
648            return Ok(AwsResponse::ok_json(json!({})));
649        }
650
651        let policy = bus.policy.as_mut().ok_or_else(|| {
652            AwsServiceError::aws_error(
653                StatusCode::BAD_REQUEST,
654                "ResourceNotFoundException",
655                "EventBus does not have a policy.",
656            )
657        })?;
658
659        if let Some(stmts) = policy["Statement"].as_array_mut() {
660            let before = stmts.len();
661            stmts.retain(|s| s["Sid"].as_str() != Some(statement_id));
662            if stmts.len() == before {
663                return Err(AwsServiceError::aws_error(
664                    StatusCode::BAD_REQUEST,
665                    "ResourceNotFoundException",
666                    "Statement with the provided id does not exist.",
667                ));
668            }
669            if stmts.is_empty() {
670                bus.policy = None;
671            }
672        }
673
674        Ok(AwsResponse::ok_json(json!({})))
675    }
676
677    // ─── Rule Operations ────────────────────────────────────────────────
678
679    fn put_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
680        let body = req.json_body();
681        validate_required("Name", &body["Name"])?;
682        let name = body["Name"]
683            .as_str()
684            .ok_or_else(|| missing("Name"))?
685            .to_string();
686        validate_string_length("name", &name, 1, 64)?;
687        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
688        validate_optional_string_length(
689            "scheduleExpression",
690            body["ScheduleExpression"].as_str(),
691            0,
692            256,
693        )?;
694        validate_optional_string_length("eventPattern", body["EventPattern"].as_str(), 0, 4096)?;
695        validate_optional_enum(
696            "state",
697            body["State"].as_str(),
698            &[
699                "ENABLED",
700                "DISABLED",
701                "ENABLED_WITH_ALL_CLOUDTRAIL_MANAGEMENT_EVENTS",
702            ],
703        )?;
704        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
705        validate_optional_string_length("roleArn", body["RoleArn"].as_str(), 1, 1600)?;
706
707        let raw_bus = body["EventBusName"]
708            .as_str()
709            .unwrap_or("default")
710            .to_string();
711
712        let mut accounts = self.state.write();
713        let state = accounts.get_or_create(&req.account_id);
714        let event_bus_name = state.resolve_bus_name(&raw_bus);
715
716        let event_pattern = body["EventPattern"].as_str().and_then(|s| {
717            if s.is_empty() {
718                None
719            } else {
720                Some(s.to_string())
721            }
722        });
723        let schedule_expression = body["ScheduleExpression"].as_str().and_then(|s| {
724            if s.is_empty() {
725                None
726            } else {
727                Some(s.to_string())
728            }
729        });
730        let description = body["Description"].as_str().map(|s| s.to_string());
731        let role_arn = body["RoleArn"].as_str().map(|s| s.to_string());
732        let rule_state = body["State"].as_str().unwrap_or("ENABLED").to_string();
733
734        // Validate: schedule expressions only on default bus
735        if schedule_expression.is_some() && event_bus_name != "default" {
736            return Err(AwsServiceError::aws_error(
737                StatusCode::BAD_REQUEST,
738                "ValidationException",
739                "ScheduleExpression is supported only on the default event bus.",
740            ));
741        }
742
743        if !state.buses.contains_key(&event_bus_name) {
744            return Err(AwsServiceError::aws_error(
745                StatusCode::BAD_REQUEST,
746                "ResourceNotFoundException",
747                format!("Event bus {event_bus_name} does not exist."),
748            ));
749        }
750
751        let arn = if event_bus_name == "default" {
752            format!(
753                "arn:aws:events:{}:{}:rule/{}",
754                req.region, state.account_id, name
755            )
756        } else {
757            format!(
758                "arn:aws:events:{}:{}:rule/{}/{}",
759                req.region, state.account_id, event_bus_name, name
760            )
761        };
762
763        let key = (event_bus_name.clone(), name.clone());
764        let targets = state
765            .rules
766            .get(&key)
767            .map(|r| r.targets.clone())
768            .unwrap_or_default();
769
770        let tags = parse_tags(&body);
771
772        let rule = EventRule {
773            name: name.clone(),
774            arn: arn.clone(),
775            event_bus_name,
776            event_pattern,
777            schedule_expression,
778            state: rule_state,
779            description,
780            role_arn,
781            managed_by: None,
782            created_by: None,
783            targets,
784            tags,
785            last_fired: None,
786        };
787
788        state.rules.insert(key, rule);
789        Ok(AwsResponse::ok_json(json!({ "RuleArn": arn })))
790    }
791
792    fn delete_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
793        let body = req.json_body();
794        validate_required("Name", &body["Name"])?;
795        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
796        validate_string_length("name", name, 1, 64)?;
797        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
798        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
799
800        let mut accounts = self.state.write();
801        let state = accounts.get_or_create(&req.account_id);
802        let bus_name = state.resolve_bus_name(event_bus_name);
803        let key = (bus_name, name.to_string());
804
805        // Check if rule has targets
806        if let Some(rule) = state.rules.get(&key) {
807            if !rule.targets.is_empty() {
808                return Err(AwsServiceError::aws_error(
809                    StatusCode::BAD_REQUEST,
810                    "ValidationException",
811                    "Rule can't be deleted since it has targets.",
812                ));
813            }
814        }
815
816        state.rules.remove(&key);
817        Ok(AwsResponse::ok_json(json!({})))
818    }
819
820    fn list_rules(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
821        let body = req.json_body();
822        validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 64)?;
823        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
824        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
825        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
826        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
827        let name_prefix = body["NamePrefix"].as_str();
828        let limit = body["Limit"].as_u64().map(|n| n as usize);
829        let next_token = body["NextToken"].as_str();
830
831        let accounts = self.state.read();
832        let empty = EventBridgeState::new(&req.account_id, &req.region);
833        let state = accounts.get(&req.account_id).unwrap_or(&empty);
834        let bus_name = state.resolve_bus_name(event_bus_name);
835
836        let mut rules: Vec<&EventRule> = state
837            .rules
838            .values()
839            .filter(|r| r.event_bus_name == bus_name)
840            .filter(|r| match name_prefix {
841                Some(prefix) => r.name.starts_with(prefix),
842                None => true,
843            })
844            .collect();
845        rules.sort_by(|a, b| a.name.cmp(&b.name));
846
847        // Pagination
848        let start = next_token
849            .and_then(|t| t.parse::<usize>().ok())
850            .unwrap_or(0)
851            .min(rules.len());
852        let rules_slice = &rules[start..];
853
854        let (page, new_next_token) = if let Some(lim) = limit {
855            if rules_slice.len() > lim {
856                (&rules_slice[..lim], Some((start + lim).to_string()))
857            } else {
858                (rules_slice, None)
859            }
860        } else {
861            (rules_slice, None)
862        };
863
864        let rules_json: Vec<Value> = page
865            .iter()
866            .map(|r| {
867                let mut obj = json!({
868                    "Name": r.name,
869                    "Arn": r.arn,
870                    "EventBusName": r.event_bus_name,
871                    "State": r.state,
872                });
873                if let Some(ref desc) = r.description {
874                    obj["Description"] = json!(desc);
875                }
876                if let Some(ref ep) = r.event_pattern {
877                    obj["EventPattern"] = json!(ep);
878                }
879                if let Some(ref se) = r.schedule_expression {
880                    obj["ScheduleExpression"] = json!(se);
881                }
882                if let Some(ref mb) = r.managed_by {
883                    obj["ManagedBy"] = json!(mb);
884                }
885                obj
886            })
887            .collect();
888
889        let mut resp = json!({ "Rules": rules_json });
890        if let Some(token) = new_next_token {
891            resp["NextToken"] = json!(token);
892        }
893
894        Ok(AwsResponse::ok_json(resp))
895    }
896
897    fn describe_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
898        let body = req.json_body();
899        validate_required("Name", &body["Name"])?;
900        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
901        validate_string_length("name", name, 1, 64)?;
902        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
903        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
904
905        let accounts = self.state.read();
906        let empty = EventBridgeState::new(&req.account_id, &req.region);
907        let state = accounts.get(&req.account_id).unwrap_or(&empty);
908        let bus_name = state.resolve_bus_name(event_bus_name);
909        let key = (bus_name.clone(), name.to_string());
910
911        let rule = state.rules.get(&key).ok_or_else(|| {
912            AwsServiceError::aws_error(
913                StatusCode::BAD_REQUEST,
914                "ResourceNotFoundException",
915                format!("Rule {name} does not exist."),
916            )
917        })?;
918
919        let mut resp = json!({
920            "Name": rule.name,
921            "Arn": rule.arn,
922            "EventBusName": rule.event_bus_name,
923            "State": rule.state,
924        });
925
926        if let Some(ref desc) = rule.description {
927            resp["Description"] = json!(desc);
928        }
929        if let Some(ref ep) = rule.event_pattern {
930            resp["EventPattern"] = json!(ep);
931        }
932        if let Some(ref se) = rule.schedule_expression {
933            resp["ScheduleExpression"] = json!(se);
934        }
935        if let Some(ref role) = rule.role_arn {
936            resp["RoleArn"] = json!(role);
937        }
938        if let Some(ref mb) = rule.managed_by {
939            resp["ManagedBy"] = json!(mb);
940        }
941        if let Some(ref cb) = rule.created_by {
942            resp["CreatedBy"] = json!(cb);
943        }
944        // If non-default bus, set CreatedBy to account_id
945        if rule.event_bus_name != "default" && rule.created_by.is_none() {
946            resp["CreatedBy"] = json!(state.account_id);
947        }
948
949        Ok(AwsResponse::ok_json(resp))
950    }
951
952    fn enable_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
953        let body = req.json_body();
954        validate_required("Name", &body["Name"])?;
955        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
956        validate_string_length("name", name, 1, 64)?;
957        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
958        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
959
960        let mut accounts = self.state.write();
961        let state = accounts.get_or_create(&req.account_id);
962        let bus_name = state.resolve_bus_name(event_bus_name);
963        let key = (bus_name, name.to_string());
964
965        let rule = state.rules.get_mut(&key).ok_or_else(|| {
966            AwsServiceError::aws_error(
967                StatusCode::BAD_REQUEST,
968                "ResourceNotFoundException",
969                format!("Rule {name} does not exist."),
970            )
971        })?;
972
973        rule.state = "ENABLED".to_string();
974        Ok(AwsResponse::ok_json(json!({})))
975    }
976
977    fn disable_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
978        let body = req.json_body();
979        validate_required("Name", &body["Name"])?;
980        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
981        validate_string_length("name", name, 1, 64)?;
982        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
983        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
984
985        let mut accounts = self.state.write();
986        let state = accounts.get_or_create(&req.account_id);
987        let bus_name = state.resolve_bus_name(event_bus_name);
988        let key = (bus_name, name.to_string());
989
990        let rule = state.rules.get_mut(&key).ok_or_else(|| {
991            AwsServiceError::aws_error(
992                StatusCode::BAD_REQUEST,
993                "ResourceNotFoundException",
994                format!("Rule {name} does not exist."),
995            )
996        })?;
997
998        rule.state = "DISABLED".to_string();
999        Ok(AwsResponse::ok_json(json!({})))
1000    }
1001
1002    // ─── Target Operations ──────────────────────────────────────────────
1003
1004    fn put_targets(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1005        let body = req.json_body();
1006        validate_required("Rule", &body["Rule"])?;
1007        let rule_name = body["Rule"].as_str().ok_or_else(|| missing("Rule"))?;
1008        validate_string_length("rule", rule_name, 1, 64)?;
1009        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
1010        validate_required("Targets", &body["Targets"])?;
1011        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
1012        let targets = body["Targets"]
1013            .as_array()
1014            .ok_or_else(|| missing("Targets"))?;
1015
1016        // Validate targets - check for FIFO SQS without SqsParameters
1017        for target in targets {
1018            let target_id = target["Id"].as_str().unwrap_or("");
1019            let target_arn = target["Arn"].as_str().unwrap_or("");
1020
1021            if target_arn.ends_with(".fifo") && target.get("SqsParameters").is_none() {
1022                return Err(AwsServiceError::aws_error(
1023                    StatusCode::BAD_REQUEST,
1024                    "ValidationException",
1025                    format!(
1026                        "Parameter(s) SqsParameters must be specified for target: {target_id}."
1027                    ),
1028                ));
1029            }
1030
1031            // Validate ARN format
1032            if !target_arn.starts_with("arn:") {
1033                return Err(AwsServiceError::aws_error(
1034                    StatusCode::BAD_REQUEST,
1035                    "ValidationException",
1036                    format!(
1037                        "Parameter {target_arn} is not valid. Reason: Provided Arn is not in correct format."
1038                    ),
1039                ));
1040            }
1041        }
1042
1043        let mut accounts = self.state.write();
1044        let state = accounts.get_or_create(&req.account_id);
1045        let bus_name = state.resolve_bus_name(event_bus_name);
1046        let key = (bus_name.clone(), rule_name.to_string());
1047
1048        let rule = state.rules.get_mut(&key).ok_or_else(|| {
1049            AwsServiceError::aws_error(
1050                StatusCode::BAD_REQUEST,
1051                "ResourceNotFoundException",
1052                format!("Rule {rule_name} does not exist on EventBus {bus_name}."),
1053            )
1054        })?;
1055
1056        for target in targets {
1057            let et = parse_target(target);
1058            // Remove existing target with same ID
1059            rule.targets.retain(|t| t.id != et.id);
1060            rule.targets.push(et);
1061        }
1062
1063        Ok(AwsResponse::ok_json(json!({
1064            "FailedEntryCount": 0,
1065            "FailedEntries": [],
1066        })))
1067    }
1068
1069    fn remove_targets(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1070        let body = req.json_body();
1071        validate_required("Rule", &body["Rule"])?;
1072        let rule_name = body["Rule"].as_str().ok_or_else(|| missing("Rule"))?;
1073        validate_string_length("rule", rule_name, 1, 64)?;
1074        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
1075        validate_required("Ids", &body["Ids"])?;
1076        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
1077        let ids = body["Ids"].as_array().ok_or_else(|| missing("Ids"))?;
1078
1079        let target_ids: Vec<String> = ids
1080            .iter()
1081            .filter_map(|v| v.as_str().map(|s| s.to_string()))
1082            .collect();
1083
1084        let mut accounts = self.state.write();
1085        let state = accounts.get_or_create(&req.account_id);
1086        let bus_name = state.resolve_bus_name(event_bus_name);
1087        let key = (bus_name.clone(), rule_name.to_string());
1088
1089        let rule = state.rules.get_mut(&key).ok_or_else(|| {
1090            AwsServiceError::aws_error(
1091                StatusCode::BAD_REQUEST,
1092                "ResourceNotFoundException",
1093                format!("Rule {rule_name} does not exist on EventBus {bus_name}."),
1094            )
1095        })?;
1096
1097        rule.targets.retain(|t| !target_ids.contains(&t.id));
1098
1099        Ok(AwsResponse::ok_json(json!({
1100            "FailedEntryCount": 0,
1101            "FailedEntries": [],
1102        })))
1103    }
1104
1105    fn list_targets_by_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1106        let body = req.json_body();
1107        validate_required("Rule", &body["Rule"])?;
1108        let rule_name = body["Rule"].as_str().ok_or_else(|| missing("Rule"))?;
1109        validate_string_length("rule", rule_name, 1, 64)?;
1110        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
1111        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1112        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1113        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
1114        let limit = body["Limit"].as_u64().map(|n| n as usize);
1115        let next_token = body["NextToken"].as_str();
1116
1117        let accounts = self.state.read();
1118        let empty = EventBridgeState::new(&req.account_id, &req.region);
1119        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1120        let bus_name = state.resolve_bus_name(event_bus_name);
1121        let key = (bus_name, rule_name.to_string());
1122
1123        let rule = state.rules.get(&key).ok_or_else(|| {
1124            AwsServiceError::aws_error(
1125                StatusCode::BAD_REQUEST,
1126                "ResourceNotFoundException",
1127                format!("Rule {rule_name} does not exist."),
1128            )
1129        })?;
1130
1131        let all_targets = &rule.targets;
1132        let start = next_token
1133            .and_then(|t| t.parse::<usize>().ok())
1134            .unwrap_or(0)
1135            .min(all_targets.len());
1136        let slice = &all_targets[start..];
1137
1138        let (page, new_next_token) = if let Some(lim) = limit {
1139            if slice.len() > lim {
1140                (&slice[..lim], Some((start + lim).to_string()))
1141            } else {
1142                (slice, None)
1143            }
1144        } else {
1145            (slice, None)
1146        };
1147
1148        let targets: Vec<Value> = page.iter().map(target_to_json).collect();
1149
1150        let mut resp = json!({ "Targets": targets });
1151        if let Some(token) = new_next_token {
1152            resp["NextToken"] = json!(token);
1153        }
1154
1155        Ok(AwsResponse::ok_json(resp))
1156    }
1157
1158    fn list_rule_names_by_target(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1159        let body = req.json_body();
1160        validate_required("TargetArn", &body["TargetArn"])?;
1161        let target_arn = body["TargetArn"]
1162            .as_str()
1163            .ok_or_else(|| missing("TargetArn"))?;
1164        validate_string_length("targetArn", target_arn, 1, 1600)?;
1165        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
1166        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1167        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1168        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
1169        let limit = body["Limit"].as_u64().map(|n| n as usize);
1170        let next_token = body["NextToken"].as_str();
1171
1172        let accounts = self.state.read();
1173        let empty = EventBridgeState::new(&req.account_id, &req.region);
1174        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1175        let bus_name = state.resolve_bus_name(event_bus_name);
1176
1177        // Deduplicate rule names
1178        let mut rule_names: Vec<String> = Vec::new();
1179        for rule in state.rules.values() {
1180            if rule.event_bus_name == bus_name
1181                && rule.targets.iter().any(|t| t.arn == target_arn)
1182                && !rule_names.contains(&rule.name)
1183            {
1184                rule_names.push(rule.name.clone());
1185            }
1186        }
1187        rule_names.sort();
1188
1189        let start = next_token
1190            .and_then(|t| t.parse::<usize>().ok())
1191            .unwrap_or(0)
1192            .min(rule_names.len());
1193        let slice = &rule_names[start..];
1194
1195        let (page, new_next_token) = if let Some(lim) = limit {
1196            if slice.len() > lim {
1197                (&slice[..lim], Some((start + lim).to_string()))
1198            } else {
1199                (slice, None)
1200            }
1201        } else {
1202            (slice, None)
1203        };
1204
1205        let mut resp = json!({ "RuleNames": page });
1206        if let Some(token) = new_next_token {
1207            resp["NextToken"] = json!(token);
1208        }
1209
1210        Ok(AwsResponse::ok_json(resp))
1211    }
1212
1213    // ─── Partner Event Sources ────────────���───────────────────────────
1214
1215    fn create_partner_event_source(
1216        &self,
1217        req: &AwsRequest,
1218    ) -> Result<AwsResponse, AwsServiceError> {
1219        let body = req.json_body();
1220        validate_required("Name", &body["Name"])?;
1221        let name = body["Name"]
1222            .as_str()
1223            .ok_or_else(|| missing("Name"))?
1224            .to_string();
1225        validate_string_length("name", &name, 1, 256)?;
1226        validate_required("Account", &body["Account"])?;
1227        let account = body["Account"]
1228            .as_str()
1229            .ok_or_else(|| missing("Account"))?
1230            .to_string();
1231        validate_string_length("account", &account, 12, 12)?;
1232
1233        let mut accounts = self.state.write();
1234        let state = accounts.get_or_create(&req.account_id);
1235        if state.partner_event_sources.contains_key(&name) {
1236            return Err(AwsServiceError::aws_error(
1237                StatusCode::CONFLICT,
1238                "ResourceAlreadyExistsException",
1239                format!("Partner event source {name} already exists."),
1240            ));
1241        }
1242        let arn = format!(
1243            "arn:aws:events:{}::event-source/aws.partner/{}",
1244            state.region, name
1245        );
1246        let now = Utc::now();
1247        let ps = PartnerEventSource {
1248            name: name.clone(),
1249            arn: arn.clone(),
1250            account,
1251            creation_time: now,
1252            expiration_time: None,
1253            state: "ACTIVE".to_string(),
1254        };
1255        state.partner_event_sources.insert(name.clone(), ps);
1256
1257        Ok(AwsResponse::ok_json(json!({ "EventSourceArn": arn })))
1258    }
1259
1260    fn delete_partner_event_source(
1261        &self,
1262        req: &AwsRequest,
1263    ) -> Result<AwsResponse, AwsServiceError> {
1264        let body = req.json_body();
1265        validate_required("Name", &body["Name"])?;
1266        let name = body["Name"]
1267            .as_str()
1268            .ok_or_else(|| missing("Name"))?
1269            .to_string();
1270        validate_required("Account", &body["Account"])?;
1271        let account = body["Account"]
1272            .as_str()
1273            .ok_or_else(|| missing("Account"))?
1274            .to_string();
1275
1276        let mut accounts = self.state.write();
1277        let state = accounts.get_or_create(&req.account_id);
1278        match state.partner_event_sources.get(&name) {
1279            Some(ps) if ps.account == account => {
1280                state.partner_event_sources.remove(&name);
1281            }
1282            Some(_) => {
1283                return Err(AwsServiceError::aws_error(
1284                    StatusCode::NOT_FOUND,
1285                    "ResourceNotFoundException",
1286                    format!("Partner event source {name} does not exist for account {account}."),
1287                ));
1288            }
1289            None => {
1290                return Err(AwsServiceError::aws_error(
1291                    StatusCode::NOT_FOUND,
1292                    "ResourceNotFoundException",
1293                    format!("Partner event source {name} does not exist."),
1294                ));
1295            }
1296        }
1297
1298        Ok(AwsResponse::ok_json(json!({})))
1299    }
1300
1301    fn describe_partner_event_source(
1302        &self,
1303        req: &AwsRequest,
1304    ) -> Result<AwsResponse, AwsServiceError> {
1305        let body = req.json_body();
1306        validate_required("Name", &body["Name"])?;
1307        let name = body["Name"]
1308            .as_str()
1309            .ok_or_else(|| missing("Name"))?
1310            .to_string();
1311        validate_string_length("name", &name, 1, 256)?;
1312
1313        let accounts = self.state.read();
1314        let empty = EventBridgeState::new(&req.account_id, &req.region);
1315        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1316        let ps = state.partner_event_sources.get(&name).ok_or_else(|| {
1317            AwsServiceError::aws_error(
1318                StatusCode::NOT_FOUND,
1319                "ResourceNotFoundException",
1320                format!("Partner event source {name} does not exist."),
1321            )
1322        })?;
1323
1324        Ok(AwsResponse::ok_json(json!({
1325            "Arn": ps.arn,
1326            "Name": ps.name,
1327        })))
1328    }
1329
1330    fn list_partner_event_sources(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1331        let body = req.json_body();
1332        validate_required("namePrefix", &body["NamePrefix"])?;
1333        let name_prefix = body["NamePrefix"]
1334            .as_str()
1335            .ok_or_else(|| missing("NamePrefix"))?;
1336        validate_string_length("namePrefix", name_prefix, 1, 256)?;
1337        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1338        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1339        let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
1340
1341        let accounts = self.state.read();
1342        let empty = EventBridgeState::new(&req.account_id, &req.region);
1343        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1344        let all: Vec<Value> = state
1345            .partner_event_sources
1346            .values()
1347            .filter(|ps| ps.name.starts_with(name_prefix))
1348            .map(|ps| {
1349                json!({
1350                    "Arn": ps.arn,
1351                    "Name": ps.name,
1352                })
1353            })
1354            .collect();
1355
1356        let (sources, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
1357        let mut resp = json!({ "PartnerEventSources": sources });
1358        if let Some(token) = next_token {
1359            resp["NextToken"] = json!(token);
1360        }
1361
1362        Ok(AwsResponse::ok_json(resp))
1363    }
1364
1365    fn list_partner_event_source_accounts(
1366        &self,
1367        req: &AwsRequest,
1368    ) -> Result<AwsResponse, AwsServiceError> {
1369        let body = req.json_body();
1370        validate_required("EventSourceName", &body["EventSourceName"])?;
1371        let event_source_name = body["EventSourceName"]
1372            .as_str()
1373            .ok_or_else(|| missing("EventSourceName"))?;
1374        validate_string_length("eventSourceName", event_source_name, 1, 256)?;
1375        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1376        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1377
1378        let accounts = self.state.read();
1379        let empty = EventBridgeState::new(&req.account_id, &req.region);
1380        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1381        let accounts: Vec<Value> = state
1382            .partner_event_sources
1383            .values()
1384            .filter(|ps| ps.name == event_source_name)
1385            .map(|ps| json!({ "Account": ps.account }))
1386            .collect();
1387
1388        Ok(AwsResponse::ok_json(json!({
1389            "PartnerEventSourceAccounts": accounts
1390        })))
1391    }
1392
1393    fn activate_event_source(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1394        let body = req.json_body();
1395        validate_required("Name", &body["Name"])?;
1396        let name = body["Name"]
1397            .as_str()
1398            .ok_or_else(|| missing("Name"))?
1399            .to_string();
1400
1401        let mut accounts = self.state.write();
1402        let state = accounts.get_or_create(&req.account_id);
1403        let ps = state.partner_event_sources.get_mut(&name).ok_or_else(|| {
1404            AwsServiceError::aws_error(
1405                StatusCode::NOT_FOUND,
1406                "ResourceNotFoundException",
1407                format!("Event source {name} does not exist."),
1408            )
1409        })?;
1410        ps.state = "ACTIVE".to_string();
1411
1412        Ok(AwsResponse::ok_json(json!({})))
1413    }
1414
1415    fn deactivate_event_source(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1416        let body = req.json_body();
1417        validate_required("Name", &body["Name"])?;
1418        let name = body["Name"]
1419            .as_str()
1420            .ok_or_else(|| missing("Name"))?
1421            .to_string();
1422
1423        let mut accounts = self.state.write();
1424        let state = accounts.get_or_create(&req.account_id);
1425        let ps = state.partner_event_sources.get_mut(&name).ok_or_else(|| {
1426            AwsServiceError::aws_error(
1427                StatusCode::NOT_FOUND,
1428                "ResourceNotFoundException",
1429                format!("Event source {name} does not exist."),
1430            )
1431        })?;
1432        ps.state = "INACTIVE".to_string();
1433
1434        Ok(AwsResponse::ok_json(json!({})))
1435    }
1436
1437    fn describe_event_source(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1438        let body = req.json_body();
1439        validate_required("Name", &body["Name"])?;
1440        let name = body["Name"]
1441            .as_str()
1442            .ok_or_else(|| missing("Name"))?
1443            .to_string();
1444
1445        let accounts = self.state.read();
1446        let empty = EventBridgeState::new(&req.account_id, &req.region);
1447        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1448        let ps = state.partner_event_sources.get(&name).ok_or_else(|| {
1449            AwsServiceError::aws_error(
1450                StatusCode::NOT_FOUND,
1451                "ResourceNotFoundException",
1452                format!("Event source {name} does not exist."),
1453            )
1454        })?;
1455
1456        Ok(AwsResponse::ok_json(json!({
1457            "Arn": ps.arn,
1458            "Name": ps.name,
1459            "CreatedBy": ps.account,
1460            "CreationTime": ps.creation_time.timestamp() as f64,
1461            "State": ps.state,
1462        })))
1463    }
1464
1465    fn list_event_sources(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1466        let body = req.json_body();
1467        validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 256)?;
1468        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1469        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1470        let name_prefix = body["NamePrefix"].as_str();
1471        let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
1472
1473        let accounts = self.state.read();
1474        let empty = EventBridgeState::new(&req.account_id, &req.region);
1475        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1476        let all: Vec<Value> = state
1477            .partner_event_sources
1478            .values()
1479            .filter(|ps| match name_prefix {
1480                Some(prefix) => ps.name.starts_with(prefix),
1481                None => true,
1482            })
1483            .map(|ps| {
1484                json!({
1485                    "Arn": ps.arn,
1486                    "Name": ps.name,
1487                    "CreatedBy": ps.account,
1488                    "CreationTime": ps.creation_time.timestamp() as f64,
1489                    "State": ps.state,
1490                })
1491            })
1492            .collect();
1493
1494        let (sources, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
1495        let mut resp = json!({ "EventSources": sources });
1496        if let Some(token) = next_token {
1497            resp["NextToken"] = json!(token);
1498        }
1499
1500        Ok(AwsResponse::ok_json(resp))
1501    }
1502
1503    fn put_partner_events(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1504        let body = req.json_body();
1505        validate_required("Entries", &body["Entries"])?;
1506        let entries = body["Entries"]
1507            .as_array()
1508            .ok_or_else(|| missing("Entries"))?;
1509
1510        let mut result_entries = Vec::new();
1511        for _entry in entries {
1512            let event_id = uuid::Uuid::new_v4().to_string();
1513            result_entries.push(json!({ "EventId": event_id }));
1514        }
1515
1516        Ok(AwsResponse::ok_json(json!({
1517            "FailedEntryCount": 0,
1518            "Entries": result_entries,
1519        })))
1520    }
1521
1522    // ─── TestEventPattern ────────────────────────────────────────────────
1523
1524    fn test_event_pattern(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1525        let body = req.json_body();
1526        validate_required("EventPattern", &body["EventPattern"])?;
1527        validate_required("Event", &body["Event"])?;
1528        let event_pattern = body["EventPattern"]
1529            .as_str()
1530            .ok_or_else(|| missing("EventPattern"))?;
1531        let event_str = body["Event"].as_str().ok_or_else(|| missing("Event"))?;
1532
1533        // Parse the event JSON
1534        let event: Value = serde_json::from_str(event_str).map_err(|_| {
1535            AwsServiceError::aws_error(
1536                StatusCode::BAD_REQUEST,
1537                "InvalidEventPatternException",
1538                "Event is not valid JSON.",
1539            )
1540        })?;
1541
1542        // Parse the pattern JSON
1543        let _pattern: Value = serde_json::from_str(event_pattern).map_err(|_| {
1544            AwsServiceError::aws_error(
1545                StatusCode::BAD_REQUEST,
1546                "InvalidEventPatternException",
1547                "Event pattern is not valid JSON.",
1548            )
1549        })?;
1550
1551        let source = event["source"].as_str().unwrap_or("");
1552        let detail_type = event["detail-type"].as_str().unwrap_or("");
1553        let detail = event
1554            .get("detail")
1555            .map(|v| serde_json::to_string(v).unwrap_or_default())
1556            .unwrap_or_else(|| "{}".to_string());
1557        let account = event["account"].as_str().unwrap_or("");
1558        let region = event["region"].as_str().unwrap_or("");
1559        let resources: Vec<String> = event["resources"]
1560            .as_array()
1561            .map(|arr| {
1562                arr.iter()
1563                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
1564                    .collect()
1565            })
1566            .unwrap_or_default();
1567
1568        let result = matches_pattern(
1569            Some(event_pattern),
1570            source,
1571            detail_type,
1572            &detail,
1573            account,
1574            region,
1575            &resources,
1576        );
1577
1578        Ok(AwsResponse::ok_json(json!({ "Result": result })))
1579    }
1580
1581    // ─── UpdateEventBus ─────────────────────────────────────────────────
1582
1583    fn update_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1584        let body = req.json_body();
1585        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
1586        validate_optional_string_length(
1587            "kmsKeyIdentifier",
1588            body["KmsKeyIdentifier"].as_str(),
1589            0,
1590            2048,
1591        )?;
1592        let name = body["Name"].as_str().unwrap_or("default");
1593
1594        let mut accounts = self.state.write();
1595        let state = accounts.get_or_create(&req.account_id);
1596        let bus = state.buses.get_mut(name).ok_or_else(|| {
1597            AwsServiceError::aws_error(
1598                StatusCode::BAD_REQUEST,
1599                "ResourceNotFoundException",
1600                format!("Event bus {name} does not exist."),
1601            )
1602        })?;
1603
1604        if let Some(desc) = body["Description"].as_str() {
1605            bus.description = Some(desc.to_string());
1606        }
1607        if let Some(kms) = body["KmsKeyIdentifier"].as_str() {
1608            bus.kms_key_identifier = Some(kms.to_string());
1609        }
1610        if let Some(dlc) = body.get("DeadLetterConfig") {
1611            bus.dead_letter_config = Some(dlc.clone());
1612        }
1613        bus.last_modified_time = Utc::now();
1614
1615        let arn = bus.arn.clone();
1616        let bus_name = bus.name.clone();
1617
1618        Ok(AwsResponse::ok_json(json!({
1619            "Arn": arn,
1620            "Name": bus_name,
1621        })))
1622    }
1623
1624    // ─── Endpoint Operations ────────────────────────────────────────────
1625
1626    fn create_endpoint(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1627        let body = req.json_body();
1628        validate_required("Name", &body["Name"])?;
1629        let name = body["Name"]
1630            .as_str()
1631            .ok_or_else(|| missing("Name"))?
1632            .to_string();
1633        validate_string_length("name", &name, 1, 64)?;
1634        validate_required("RoutingConfig", &body["RoutingConfig"])?;
1635        validate_required("EventBuses", &body["EventBuses"])?;
1636
1637        let description = body["Description"].as_str().map(|s| s.to_string());
1638        let routing_config = body["RoutingConfig"].clone();
1639        let replication_config = body.get("ReplicationConfig").cloned();
1640        let event_buses = body["EventBuses"].as_array().cloned().unwrap_or_default();
1641        let role_arn = body["RoleArn"].as_str().map(|s| s.to_string());
1642
1643        let mut accounts = self.state.write();
1644        let state = accounts.get_or_create(&req.account_id);
1645        if state.endpoints.contains_key(&name) {
1646            return Err(AwsServiceError::aws_error(
1647                StatusCode::CONFLICT,
1648                "ResourceAlreadyExistsException",
1649                format!("Endpoint {name} already exists."),
1650            ));
1651        }
1652
1653        let endpoint_id = format!("{}.abc123", name);
1654        let arn = format!(
1655            "arn:aws:events:{}:{}:endpoint/{}",
1656            req.region, state.account_id, name
1657        );
1658        let endpoint_url = format!(
1659            "https://{}.endpoint.events.{}.amazonaws.com",
1660            endpoint_id, req.region
1661        );
1662        let now = Utc::now();
1663
1664        let endpoint = Endpoint {
1665            name: name.clone(),
1666            arn: arn.clone(),
1667            endpoint_id: endpoint_id.clone(),
1668            endpoint_url: Some(endpoint_url),
1669            description,
1670            routing_config: routing_config.clone(),
1671            replication_config: replication_config.clone(),
1672            event_buses: event_buses.clone(),
1673            role_arn: role_arn.clone(),
1674            state: "ACTIVE".to_string(),
1675            creation_time: now,
1676            last_modified_time: now,
1677        };
1678        state.endpoints.insert(name.clone(), endpoint);
1679
1680        let mut resp = json!({
1681            "Name": name,
1682            "Arn": arn,
1683            "State": "ACTIVE",
1684            "RoutingConfig": routing_config,
1685            "EventBuses": event_buses,
1686        });
1687        if let Some(ref rc) = replication_config {
1688            resp["ReplicationConfig"] = rc.clone();
1689        }
1690        if let Some(ref ra) = role_arn {
1691            resp["RoleArn"] = json!(ra);
1692        }
1693
1694        Ok(AwsResponse::ok_json(resp))
1695    }
1696
1697    fn delete_endpoint(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1698        let body = req.json_body();
1699        validate_required("Name", &body["Name"])?;
1700        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
1701
1702        let mut accounts = self.state.write();
1703        let state = accounts.get_or_create(&req.account_id);
1704        state.endpoints.remove(name).ok_or_else(|| {
1705            AwsServiceError::aws_error(
1706                StatusCode::BAD_REQUEST,
1707                "ResourceNotFoundException",
1708                format!("Endpoint '{name}' does not exist."),
1709            )
1710        })?;
1711
1712        Ok(AwsResponse::ok_json(json!({})))
1713    }
1714
1715    fn describe_endpoint(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1716        let body = req.json_body();
1717        validate_required("Name", &body["Name"])?;
1718        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
1719
1720        let accounts = self.state.read();
1721        let empty = EventBridgeState::new(&req.account_id, &req.region);
1722        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1723        let ep = state.endpoints.get(name).ok_or_else(|| {
1724            AwsServiceError::aws_error(
1725                StatusCode::BAD_REQUEST,
1726                "ResourceNotFoundException",
1727                format!("Endpoint '{name}' does not exist."),
1728            )
1729        })?;
1730
1731        let mut resp = json!({
1732            "Name": ep.name,
1733            "Arn": ep.arn,
1734            "EndpointId": ep.endpoint_id,
1735            "State": ep.state,
1736            "RoutingConfig": ep.routing_config,
1737            "EventBuses": ep.event_buses,
1738            "CreationTime": ep.creation_time.timestamp() as f64,
1739            "LastModifiedTime": ep.last_modified_time.timestamp() as f64,
1740        });
1741        if let Some(ref url) = ep.endpoint_url {
1742            resp["EndpointUrl"] = json!(url);
1743        }
1744        if let Some(ref desc) = ep.description {
1745            resp["Description"] = json!(desc);
1746        }
1747        if let Some(ref rc) = ep.replication_config {
1748            resp["ReplicationConfig"] = rc.clone();
1749        }
1750        if let Some(ref ra) = ep.role_arn {
1751            resp["RoleArn"] = json!(ra);
1752        }
1753
1754        Ok(AwsResponse::ok_json(resp))
1755    }
1756
1757    fn list_endpoints(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1758        let body = req.json_body();
1759        validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 64)?;
1760        validate_optional_string_length("homeRegion", body["HomeRegion"].as_str(), 9, 20)?;
1761        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1762        validate_optional_range_i64("maxResults", body["MaxResults"].as_i64(), 1, 100)?;
1763        let name_prefix = body["NamePrefix"].as_str();
1764        let limit = body["MaxResults"].as_i64().unwrap_or(100) as usize;
1765
1766        let accounts = self.state.read();
1767        let empty = EventBridgeState::new(&req.account_id, &req.region);
1768        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1769        let all: Vec<Value> = state
1770            .endpoints
1771            .values()
1772            .filter(|ep| match name_prefix {
1773                Some(prefix) => ep.name.starts_with(prefix),
1774                None => true,
1775            })
1776            .map(|ep| {
1777                let mut obj = json!({
1778                    "Name": ep.name,
1779                    "Arn": ep.arn,
1780                    "EndpointId": ep.endpoint_id,
1781                    "State": ep.state,
1782                    "RoutingConfig": ep.routing_config,
1783                    "EventBuses": ep.event_buses,
1784                    "CreationTime": ep.creation_time.timestamp() as f64,
1785                    "LastModifiedTime": ep.last_modified_time.timestamp() as f64,
1786                });
1787                if let Some(ref url) = ep.endpoint_url {
1788                    obj["EndpointUrl"] = json!(url);
1789                }
1790                obj
1791            })
1792            .collect();
1793
1794        let (endpoints, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
1795        let mut resp = json!({ "Endpoints": endpoints });
1796        if let Some(token) = next_token {
1797            resp["NextToken"] = json!(token);
1798        }
1799
1800        Ok(AwsResponse::ok_json(resp))
1801    }
1802
1803    fn update_endpoint(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1804        let body = req.json_body();
1805        validate_required("Name", &body["Name"])?;
1806        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
1807
1808        let mut accounts = self.state.write();
1809        let state = accounts.get_or_create(&req.account_id);
1810        let ep = state.endpoints.get_mut(name).ok_or_else(|| {
1811            AwsServiceError::aws_error(
1812                StatusCode::BAD_REQUEST,
1813                "ResourceNotFoundException",
1814                format!("Endpoint '{name}' does not exist."),
1815            )
1816        })?;
1817
1818        if let Some(desc) = body["Description"].as_str() {
1819            ep.description = Some(desc.to_string());
1820        }
1821        if !body["RoutingConfig"].is_null() {
1822            ep.routing_config = body["RoutingConfig"].clone();
1823        }
1824        if let Some(rc) = body.get("ReplicationConfig") {
1825            ep.replication_config = Some(rc.clone());
1826        }
1827        if let Some(buses) = body["EventBuses"].as_array() {
1828            ep.event_buses = buses.clone();
1829        }
1830        if let Some(ra) = body["RoleArn"].as_str() {
1831            ep.role_arn = Some(ra.to_string());
1832        }
1833        ep.last_modified_time = Utc::now();
1834
1835        let resp = json!({
1836            "Name": ep.name,
1837            "Arn": ep.arn,
1838            "EndpointId": ep.endpoint_id,
1839            "State": ep.state,
1840            "RoutingConfig": ep.routing_config,
1841            "EventBuses": ep.event_buses,
1842        });
1843
1844        Ok(AwsResponse::ok_json(resp))
1845    }
1846
1847    // ─── DeauthorizeConnection ──────────────────────────────────────────
1848
1849    fn deauthorize_connection(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1850        let body = req.json_body();
1851        validate_required("Name", &body["Name"])?;
1852        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
1853        validate_string_length("name", name, 1, 64)?;
1854
1855        let mut accounts = self.state.write();
1856        let state = accounts.get_or_create(&req.account_id);
1857        let conn = state.connections.get_mut(name).ok_or_else(|| {
1858            AwsServiceError::aws_error(
1859                StatusCode::BAD_REQUEST,
1860                "ResourceNotFoundException",
1861                format!("Connection '{name}' does not exist."),
1862            )
1863        })?;
1864
1865        conn.connection_state = "DEAUTHORIZING".to_string();
1866        conn.last_modified_time = Utc::now();
1867
1868        let resp = json!({
1869            "ConnectionArn": conn.arn,
1870            "ConnectionState": conn.connection_state,
1871            "CreationTime": conn.creation_time.timestamp() as f64,
1872            "LastModifiedTime": conn.last_modified_time.timestamp() as f64,
1873            "LastAuthorizedTime": conn.last_authorized_time.timestamp() as f64,
1874        });
1875
1876        Ok(AwsResponse::ok_json(resp))
1877    }
1878
1879    // ─── PutEvents ──────────────────────────────────────────────────────
1880
1881    fn put_events(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1882        let body = req.json_body();
1883        validate_required("Entries", &body["Entries"])?;
1884        validate_optional_string_length("endpointId", body["EndpointId"].as_str(), 1, 50)?;
1885        let entries = body["Entries"]
1886            .as_array()
1887            .ok_or_else(|| missing("Entries"))?;
1888
1889        // Validate entries count
1890        if entries.is_empty() {
1891            return Err(AwsServiceError::aws_error(
1892                StatusCode::BAD_REQUEST,
1893                "ValidationException",
1894                "1 validation error detected: Value '[PutEventsRequestEntry]' at 'entries' failed to satisfy constraint: Member must have length greater than or equal to 1",
1895            ));
1896        }
1897        if entries.len() > 10 {
1898            return Err(AwsServiceError::aws_error(
1899                StatusCode::BAD_REQUEST,
1900                "ValidationException",
1901                "1 validation error detected: Value '[PutEventsRequestEntry]' at 'entries' failed to satisfy constraint: Member must have length less than or equal to 10",
1902            ));
1903        }
1904
1905        let mut accounts = self.state.write();
1906        let state = accounts.get_or_create(&req.account_id);
1907        let mut result_entries = Vec::new();
1908        let mut events_to_deliver = Vec::new();
1909        let mut failed_count = 0;
1910
1911        for entry in entries {
1912            let source = entry["Source"].as_str().unwrap_or("").to_string();
1913            let detail_type = entry["DetailType"].as_str().unwrap_or("").to_string();
1914            let detail = entry["Detail"].as_str().unwrap_or("").to_string();
1915
1916            if let Err(error) = validate_put_events_entry(&source, &detail_type, &detail) {
1917                failed_count += 1;
1918                result_entries.push(error);
1919                continue;
1920            }
1921
1922            let event_id = uuid::Uuid::new_v4().to_string();
1923            let raw_bus = entry["EventBusName"]
1924                .as_str()
1925                .unwrap_or("default")
1926                .to_string();
1927            let event_bus_name = state.resolve_bus_name(&raw_bus);
1928            let time = parse_put_events_time(&entry["Time"]);
1929            let resources: Vec<String> = entry["Resources"]
1930                .as_array()
1931                .map(|arr| {
1932                    arr.iter()
1933                        .filter_map(|v| v.as_str().map(|s| s.to_string()))
1934                        .collect()
1935                })
1936                .unwrap_or_default();
1937
1938            let event = PutEvent {
1939                event_id: event_id.clone(),
1940                source: source.clone(),
1941                detail_type: detail_type.clone(),
1942                detail: detail.clone(),
1943                event_bus_name: event_bus_name.clone(),
1944                time,
1945                resources: resources.clone(),
1946            };
1947
1948            archive_matching_event(
1949                state,
1950                &event,
1951                &event_bus_name,
1952                &source,
1953                &detail_type,
1954                &detail,
1955                &req.account_id,
1956                &req.region,
1957                &resources,
1958            );
1959
1960            state.events.push(event);
1961
1962            // Find matching rules and their targets
1963            let matching_targets: Vec<EventTarget> = state
1964                .rules
1965                .values()
1966                .filter(|r| {
1967                    r.event_bus_name == event_bus_name
1968                        && r.state == "ENABLED"
1969                        && matches_pattern(
1970                            r.event_pattern.as_deref(),
1971                            &source,
1972                            &detail_type,
1973                            &detail,
1974                            &req.account_id,
1975                            &req.region,
1976                            &resources,
1977                        )
1978                })
1979                .flat_map(|r| r.targets.clone())
1980                .collect();
1981
1982            if !matching_targets.is_empty() {
1983                events_to_deliver.push((
1984                    event_id.clone(),
1985                    source,
1986                    detail_type,
1987                    detail,
1988                    time,
1989                    resources,
1990                    matching_targets,
1991                ));
1992            }
1993
1994            result_entries.push(json!({ "EventId": event_id }));
1995        }
1996
1997        // Drop the lock before delivering
1998        drop(accounts);
1999
2000        // Deliver to targets
2001        for (event_id, source, detail_type, detail, time, resources, targets) in events_to_deliver {
2002            let detail_value: Value = serde_json::from_str(&detail).unwrap_or(json!({}));
2003            let event_json = json!({
2004                "version": "0",
2005                "id": event_id,
2006                "source": source,
2007                "account": req.account_id,
2008                "detail-type": detail_type,
2009                "detail": detail_value,
2010                "time": time.format("%Y-%m-%dT%H:%M:%SZ").to_string(),
2011                "region": req.region,
2012                "resources": resources,
2013            });
2014            let event_str = event_json.to_string();
2015
2016            for target in targets {
2017                let arn = &target.arn;
2018                // Compute the message body, applying InputTransformer if present
2019                let body_str = if let Some(ref transformer) = target.input_transformer {
2020                    apply_input_transformer(transformer, &event_json)
2021                } else if let Some(ref input) = target.input {
2022                    input.clone()
2023                } else if let Some(ref input_path) = target.input_path {
2024                    resolve_json_path(&event_json, input_path)
2025                        .map(|v| v.to_string())
2026                        .unwrap_or_else(|| event_str.clone())
2027                } else {
2028                    event_str.clone()
2029                };
2030
2031                if arn.contains(":sqs:") {
2032                    // Extract FIFO parameters (MessageGroupId)
2033                    let group_id = target
2034                        .sqs_parameters
2035                        .as_ref()
2036                        .and_then(|p| p["MessageGroupId"].as_str())
2037                        .map(|s| s.to_string());
2038                    if group_id.is_some() {
2039                        // FIFO queue: send with group ID but no dedup ID.
2040                        // Queues with content-based dedup will auto-generate one;
2041                        // queues without it will reject the message.
2042                        self.delivery.send_to_sqs_with_attrs(
2043                            arn,
2044                            &body_str,
2045                            &HashMap::new(),
2046                            group_id.as_deref(),
2047                            None,
2048                        );
2049                    } else {
2050                        self.delivery.send_to_sqs(arn, &body_str, &HashMap::new());
2051                    }
2052                } else if arn.contains(":sns:") {
2053                    self.delivery
2054                        .publish_to_sns(arn, &body_str, Some(&detail_type));
2055                } else if arn.contains(":lambda:") {
2056                    tracing::info!(
2057                        function_arn = %arn,
2058                        payload = %body_str,
2059                        "EventBridge delivering to Lambda function"
2060                    );
2061                    let now = Utc::now();
2062                    let mut accounts = self.state.write();
2063                    let state = accounts.get_or_create(&req.account_id);
2064                    state
2065                        .lambda_invocations
2066                        .push(crate::state::LambdaInvocation {
2067                            function_arn: arn.clone(),
2068                            payload: body_str.clone(),
2069                            timestamp: now,
2070                        });
2071                    drop(accounts);
2072                    // Record in Lambda state for cross-service visibility
2073                    if let Some(ref ls) = self.lambda_state {
2074                        ls.write().default_mut().invocations.push(LambdaInvocation {
2075                            function_arn: arn.clone(),
2076                            payload: body_str.clone(),
2077                            timestamp: now,
2078                            source: "aws:events".to_string(),
2079                        });
2080                    }
2081                    // Actually invoke the Lambda function if a container runtime is available
2082                    invoke_lambda_async(
2083                        &self.container_runtime,
2084                        &self.lambda_state,
2085                        arn,
2086                        &body_str,
2087                    );
2088                } else if arn.contains(":logs:") {
2089                    tracing::info!(
2090                        log_group_arn = %arn,
2091                        payload = %body_str,
2092                        "EventBridge delivering to CloudWatch Logs"
2093                    );
2094                    let now = Utc::now();
2095                    let mut accounts = self.state.write();
2096                    let state = accounts.get_or_create(&req.account_id);
2097                    state.log_deliveries.push(crate::state::LogDelivery {
2098                        log_group_arn: arn.clone(),
2099                        payload: body_str.clone(),
2100                        timestamp: now,
2101                    });
2102                    drop(accounts);
2103                    // Write event to CloudWatch Logs state
2104                    if let Some(ref log_state) = self.logs_state {
2105                        deliver_to_logs(log_state, arn, &body_str, now);
2106                    }
2107                } else if arn.contains(":kinesis:") {
2108                    tracing::info!(
2109                        stream_arn = %arn,
2110                        "EventBridge delivering to Kinesis stream"
2111                    );
2112                    // Use event ID as partition key for even distribution
2113                    self.delivery.send_to_kinesis(arn, &body_str, &event_id);
2114                } else if arn.contains(":states:") {
2115                    tracing::info!(
2116                        state_machine_arn = %arn,
2117                        "EventBridge delivering to Step Functions"
2118                    );
2119                    self.delivery.start_stepfunctions_execution(arn, &body_str);
2120                    let mut accounts = self.state.write();
2121                    let state = accounts.get_or_create(&req.account_id);
2122                    state
2123                        .step_function_executions
2124                        .push(crate::state::StepFunctionExecution {
2125                            state_machine_arn: arn.clone(),
2126                            payload: body_str.clone(),
2127                            timestamp: Utc::now(),
2128                        });
2129                } else if arn.contains(":api-destination/") {
2130                    // ApiDestination target: look up destination + connection, then POST
2131                    let accounts = self.state.read();
2132                    let empty = EventBridgeState::new(&req.account_id, &req.region);
2133                    let state = accounts.get(&req.account_id).unwrap_or(&empty);
2134                    let dest = state.api_destinations.values().find(|d| d.arn == *arn);
2135                    if let Some(dest) = dest {
2136                        let url = dest.invocation_endpoint.clone();
2137                        let method = dest.http_method.clone();
2138                        let conn = state
2139                            .connections
2140                            .values()
2141                            .find(|c| c.arn == dest.connection_arn)
2142                            .cloned();
2143                        drop(accounts);
2144
2145                        let payload = body_str.clone();
2146                        tokio::spawn(async move {
2147                            let client = reqwest::Client::new();
2148                            let mut req_builder = match method.as_str() {
2149                                "GET" => client.get(&url),
2150                                "PUT" => client.put(&url),
2151                                "DELETE" => client.delete(&url),
2152                                "PATCH" => client.patch(&url),
2153                                "HEAD" => client.head(&url),
2154                                _ => client.post(&url),
2155                            };
2156                            req_builder = req_builder.header("Content-Type", "application/json");
2157
2158                            // Apply auth from connection
2159                            if let Some(conn) = conn {
2160                                req_builder = apply_connection_auth(req_builder, &conn);
2161                            }
2162
2163                            let result = req_builder.body(payload).send().await;
2164                            if let Err(e) = result {
2165                                tracing::warn!(
2166                                    endpoint = %url,
2167                                    error = %e,
2168                                    "EventBridge ApiDestination delivery failed"
2169                                );
2170                            }
2171                        });
2172                    }
2173                } else if arn.starts_with("https://") || arn.starts_with("http://") {
2174                    // HTTP target — fire-and-forget POST
2175                    let url = arn.clone();
2176                    let payload = body_str.clone();
2177                    tokio::spawn(async move {
2178                        let client = reqwest::Client::new();
2179                        let result = client
2180                            .post(&url)
2181                            .header("Content-Type", "application/json")
2182                            .body(payload)
2183                            .send()
2184                            .await;
2185                        if let Err(e) = result {
2186                            tracing::warn!(
2187                                endpoint = %url,
2188                                error = %e,
2189                                "EventBridge HTTP target delivery failed"
2190                            );
2191                        }
2192                    });
2193                }
2194            }
2195        }
2196
2197        let resp = json!({
2198            "FailedEntryCount": failed_count,
2199            "Entries": result_entries,
2200        });
2201
2202        Ok(AwsResponse::ok_json(resp))
2203    }
2204
2205    // ─── Tagging ────────────────────────────────────────────────────────
2206
2207    fn tag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2208        let body = req.json_body();
2209        validate_required("ResourceARN", &body["ResourceARN"])?;
2210        let arn = body["ResourceARN"]
2211            .as_str()
2212            .ok_or_else(|| missing("ResourceARN"))?;
2213        validate_string_length("resourceARN", arn, 1, 1600)?;
2214        validate_required("Tags", &body["Tags"])?;
2215
2216        let mut accounts = self.state.write();
2217        let state = accounts.get_or_create(&req.account_id);
2218        let tag_map = find_tags_mut(state, arn)?;
2219
2220        fakecloud_core::tags::apply_tags(tag_map, &body, "Tags", "Key", "Value").map_err(|f| {
2221            AwsServiceError::aws_error(
2222                StatusCode::BAD_REQUEST,
2223                "ValidationException",
2224                format!("{f} must be a list"),
2225            )
2226        })?;
2227
2228        Ok(AwsResponse::ok_json(json!({})))
2229    }
2230
2231    fn untag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2232        let body = req.json_body();
2233        validate_required("ResourceARN", &body["ResourceARN"])?;
2234        let arn = body["ResourceARN"]
2235            .as_str()
2236            .ok_or_else(|| missing("ResourceARN"))?;
2237        validate_string_length("resourceARN", arn, 1, 1600)?;
2238        validate_required("TagKeys", &body["TagKeys"])?;
2239
2240        let mut accounts = self.state.write();
2241        let state = accounts.get_or_create(&req.account_id);
2242        let tag_map = find_tags_mut(state, arn)?;
2243
2244        fakecloud_core::tags::remove_tags(tag_map, &body, "TagKeys").map_err(|f| {
2245            AwsServiceError::aws_error(
2246                StatusCode::BAD_REQUEST,
2247                "ValidationException",
2248                format!("{f} must be a list"),
2249            )
2250        })?;
2251
2252        Ok(AwsResponse::ok_json(json!({})))
2253    }
2254
2255    fn list_tags_for_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2256        let body = req.json_body();
2257        validate_required("ResourceARN", &body["ResourceARN"])?;
2258        let arn = body["ResourceARN"]
2259            .as_str()
2260            .ok_or_else(|| missing("ResourceARN"))?;
2261        validate_string_length("resourceARN", arn, 1, 1600)?;
2262
2263        let accounts = self.state.read();
2264        let empty = EventBridgeState::new(&req.account_id, &req.region);
2265        let state = accounts.get(&req.account_id).unwrap_or(&empty);
2266        let tag_map = find_tags(state, arn)?;
2267
2268        let tags = fakecloud_core::tags::tags_to_json(tag_map, "Key", "Value");
2269
2270        Ok(AwsResponse::ok_json(json!({ "Tags": tags })))
2271    }
2272
2273    // ─── Archive Operations ─────────────────────────────────────────────
2274
2275    fn create_archive(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2276        let body = req.json_body();
2277        validate_required("ArchiveName", &body["ArchiveName"])?;
2278        let name = body["ArchiveName"]
2279            .as_str()
2280            .ok_or_else(|| missing("ArchiveName"))?
2281            .to_string();
2282        validate_string_length("archiveName", &name, 1, 48)?;
2283        validate_required("EventSourceArn", &body["EventSourceArn"])?;
2284        let event_source_arn = body["EventSourceArn"]
2285            .as_str()
2286            .ok_or_else(|| missing("EventSourceArn"))?
2287            .to_string();
2288        validate_string_length("eventSourceArn", &event_source_arn, 1, 1600)?;
2289        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2290        validate_optional_string_length("eventPattern", body["EventPattern"].as_str(), 0, 4096)?;
2291        if let Some(rd) = body["RetentionDays"].as_i64() {
2292            validate_range_i64("retentionDays", rd, 0, i64::MAX)?;
2293        }
2294        let description = body["Description"].as_str().map(|s| s.to_string());
2295        let event_pattern = body["EventPattern"].as_str().map(|s| s.to_string());
2296        let retention_days = body["RetentionDays"].as_i64().unwrap_or(0);
2297
2298        // Validate event pattern if provided
2299        if let Some(ref pattern) = event_pattern {
2300            validate_event_pattern(pattern)?;
2301        }
2302
2303        let mut accounts = self.state.write();
2304        let state = accounts.get_or_create(&req.account_id);
2305
2306        // Validate event bus exists
2307        let bus_name = state.resolve_bus_name(&event_source_arn);
2308        if !state.buses.contains_key(&bus_name) {
2309            return Err(AwsServiceError::aws_error(
2310                StatusCode::BAD_REQUEST,
2311                "ResourceNotFoundException",
2312                format!("Event bus {bus_name} does not exist."),
2313            ));
2314        }
2315
2316        // Check duplicate
2317        if state.archives.contains_key(&name) {
2318            return Err(AwsServiceError::aws_error(
2319                StatusCode::BAD_REQUEST,
2320                "ResourceAlreadyExistsException",
2321                format!("Archive {name} already exists."),
2322            ));
2323        }
2324
2325        let now = Utc::now();
2326        let arn = format!(
2327            "arn:aws:events:{}:{}:archive/{}",
2328            req.region, state.account_id, name
2329        );
2330
2331        let archive = Archive {
2332            name: name.clone(),
2333            arn: arn.clone(),
2334            event_source_arn: event_source_arn.clone(),
2335            description,
2336            event_pattern: event_pattern.clone(),
2337            retention_days,
2338            state: "ENABLED".to_string(),
2339            creation_time: now,
2340            event_count: 0,
2341            size_bytes: 0,
2342            events: Vec::new(),
2343        };
2344        state.archives.insert(name.clone(), archive);
2345
2346        // Create the archive rule
2347        let rule_name = format!("Events-Archive-{name}");
2348        let rule_arn = format!(
2349            "arn:aws:events:{}:{}:rule/{}",
2350            req.region, state.account_id, rule_name
2351        );
2352        // Merge archive event pattern with replay-name filter
2353        let rule_event_pattern = {
2354            let mut merged = if let Some(ref ep) = event_pattern {
2355                serde_json::from_str::<Value>(ep).unwrap_or_else(|_| json!({}))
2356            } else {
2357                json!({})
2358            };
2359            if let Some(obj) = merged.as_object_mut() {
2360                obj.insert("replay-name".to_string(), json!([{"exists": false}]));
2361            }
2362            serde_json::to_string(&merged).unwrap_or_default()
2363        };
2364
2365        // Build the archive target with InputTransformer
2366        let archive_target = EventTarget {
2367            id: name.clone(),
2368            arn: Arn::new("events", &req.region, "", "").to_string(),
2369            input: None,
2370            input_path: None,
2371            input_transformer: Some(json!({
2372                "InputPathsMap": {},
2373                "InputTemplate": format!(
2374                    "{{\"archive-arn\": \"{}\", \"event\": <aws.events.event.json>, \"ingestion-time\": <aws.events.event.ingestion-time>}}",
2375                    arn
2376                )
2377            })),
2378            sqs_parameters: None,
2379        };
2380
2381        let archive_rule = EventRule {
2382            name: rule_name.clone(),
2383            arn: rule_arn,
2384            event_bus_name: bus_name.clone(),
2385            event_pattern: Some(rule_event_pattern),
2386            schedule_expression: None,
2387            state: "ENABLED".to_string(),
2388            description: None,
2389            role_arn: None,
2390            managed_by: Some("prod.vhs.events.aws.internal".to_string()),
2391            created_by: Some(state.account_id.clone()),
2392            targets: vec![archive_target],
2393            tags: HashMap::new(),
2394            last_fired: None,
2395        };
2396        let key = (bus_name, rule_name);
2397        state.rules.insert(key, archive_rule);
2398
2399        Ok(AwsResponse::ok_json(json!({
2400            "ArchiveArn": arn,
2401            "CreationTime": now.timestamp() as f64,
2402            "State": "ENABLED",
2403        })))
2404    }
2405
2406    fn describe_archive(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2407        let body = req.json_body();
2408        validate_required("ArchiveName", &body["ArchiveName"])?;
2409        let name = body["ArchiveName"]
2410            .as_str()
2411            .ok_or_else(|| missing("ArchiveName"))?;
2412        validate_string_length("archiveName", name, 1, 48)?;
2413
2414        let accounts = self.state.read();
2415        let empty = EventBridgeState::new(&req.account_id, &req.region);
2416        let state = accounts.get(&req.account_id).unwrap_or(&empty);
2417        let archive = state.archives.get(name).ok_or_else(|| {
2418            AwsServiceError::aws_error(
2419                StatusCode::BAD_REQUEST,
2420                "ResourceNotFoundException",
2421                format!("Archive {name} does not exist."),
2422            )
2423        })?;
2424
2425        let mut resp = json!({
2426            "ArchiveArn": archive.arn,
2427            "ArchiveName": archive.name,
2428            "CreationTime": archive.creation_time.timestamp() as f64,
2429            "EventCount": archive.event_count,
2430            "EventSourceArn": archive.event_source_arn,
2431            "RetentionDays": archive.retention_days,
2432            "SizeBytes": archive.size_bytes,
2433            "State": archive.state,
2434        });
2435        if let Some(ref desc) = archive.description {
2436            resp["Description"] = json!(desc);
2437        }
2438        if let Some(ref ep) = archive.event_pattern {
2439            resp["EventPattern"] = json!(ep);
2440        }
2441
2442        Ok(AwsResponse::ok_json(resp))
2443    }
2444
2445    fn list_archives(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2446        let body = req.json_body();
2447        validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 48)?;
2448        validate_optional_string_length(
2449            "eventSourceArn",
2450            body["EventSourceArn"].as_str(),
2451            1,
2452            1600,
2453        )?;
2454        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
2455        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
2456        let name_prefix = body["NamePrefix"].as_str();
2457        let source_arn = body["EventSourceArn"].as_str();
2458        let archive_state = body["State"].as_str();
2459
2460        // Validate at most one filter
2461        let filter_count = [
2462            name_prefix.is_some(),
2463            source_arn.is_some(),
2464            archive_state.is_some(),
2465        ]
2466        .iter()
2467        .filter(|&&x| x)
2468        .count();
2469        if filter_count > 1 {
2470            return Err(AwsServiceError::aws_error(
2471                StatusCode::BAD_REQUEST,
2472                "ValidationException",
2473                "At most one filter is allowed for ListArchives. Use either : State, EventSourceArn, or NamePrefix.",
2474            ));
2475        }
2476
2477        // Validate state
2478        if let Some(s) = archive_state {
2479            let valid = [
2480                "ENABLED",
2481                "DISABLED",
2482                "CREATING",
2483                "UPDATING",
2484                "CREATE_FAILED",
2485                "UPDATE_FAILED",
2486            ];
2487            if !valid.contains(&s) {
2488                return Err(AwsServiceError::aws_error(
2489                    StatusCode::BAD_REQUEST,
2490                    "ValidationException",
2491                    format!(
2492                        "1 validation error detected: Value '{}' at 'state' failed to satisfy constraint: Member must satisfy enum value set: [ENABLED, DISABLED, CREATING, UPDATING, CREATE_FAILED, UPDATE_FAILED]",
2493                        s
2494                    ),
2495                ));
2496            }
2497        }
2498
2499        let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
2500
2501        let accounts = self.state.read();
2502        let empty = EventBridgeState::new(&req.account_id, &req.region);
2503        let state = accounts.get(&req.account_id).unwrap_or(&empty);
2504        let all: Vec<Value> = state
2505            .archives
2506            .values()
2507            .filter(|a| {
2508                if let Some(prefix) = name_prefix {
2509                    a.name.starts_with(prefix)
2510                } else if let Some(arn) = source_arn {
2511                    a.event_source_arn == arn
2512                } else if let Some(s) = archive_state {
2513                    a.state == s
2514                } else {
2515                    true
2516                }
2517            })
2518            .map(|a| {
2519                json!({
2520                    "ArchiveName": a.name,
2521                    "CreationTime": a.creation_time.timestamp() as f64,
2522                    "EventCount": a.event_count,
2523                    "EventSourceArn": a.event_source_arn,
2524                    "RetentionDays": a.retention_days,
2525                    "SizeBytes": a.size_bytes,
2526                    "State": a.state,
2527                })
2528            })
2529            .collect();
2530
2531        let (archives, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
2532        let mut resp = json!({ "Archives": archives });
2533        if let Some(token) = next_token {
2534            resp["NextToken"] = json!(token);
2535        }
2536
2537        Ok(AwsResponse::ok_json(resp))
2538    }
2539
2540    fn update_archive(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2541        let body = req.json_body();
2542        validate_required("ArchiveName", &body["ArchiveName"])?;
2543        let name = body["ArchiveName"]
2544            .as_str()
2545            .ok_or_else(|| missing("ArchiveName"))?;
2546        validate_string_length("archiveName", name, 1, 48)?;
2547        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2548        validate_optional_string_length("eventPattern", body["EventPattern"].as_str(), 0, 4096)?;
2549        if let Some(rd) = body["RetentionDays"].as_i64() {
2550            validate_range_i64("retentionDays", rd, 0, i64::MAX)?;
2551        }
2552
2553        // Validate event pattern if provided
2554        if let Some(pattern) = body["EventPattern"].as_str() {
2555            validate_event_pattern(pattern)?;
2556        }
2557
2558        let mut accounts = self.state.write();
2559        let state = accounts.get_or_create(&req.account_id);
2560        let archive = state.archives.get_mut(name).ok_or_else(|| {
2561            AwsServiceError::aws_error(
2562                StatusCode::BAD_REQUEST,
2563                "ResourceNotFoundException",
2564                format!("Archive {name} does not exist."),
2565            )
2566        })?;
2567
2568        if let Some(desc) = body["Description"].as_str() {
2569            archive.description = Some(desc.to_string());
2570        }
2571        if let Some(pattern) = body["EventPattern"].as_str() {
2572            archive.event_pattern = Some(pattern.to_string());
2573        }
2574        if let Some(days) = body["RetentionDays"].as_i64() {
2575            archive.retention_days = days;
2576        }
2577
2578        Ok(AwsResponse::ok_json(json!({
2579            "ArchiveArn": archive.arn,
2580            "CreationTime": archive.creation_time.timestamp() as f64,
2581            "State": archive.state,
2582        })))
2583    }
2584
2585    fn delete_archive(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2586        let body = req.json_body();
2587        validate_required("ArchiveName", &body["ArchiveName"])?;
2588        let name = body["ArchiveName"]
2589            .as_str()
2590            .ok_or_else(|| missing("ArchiveName"))?;
2591        validate_string_length("archiveName", name, 1, 48)?;
2592
2593        let mut accounts = self.state.write();
2594        let state = accounts.get_or_create(&req.account_id);
2595        if !state.archives.contains_key(name) {
2596            return Err(AwsServiceError::aws_error(
2597                StatusCode::BAD_REQUEST,
2598                "ResourceNotFoundException",
2599                format!("Archive {name} does not exist."),
2600            ));
2601        }
2602
2603        state.archives.remove(name);
2604
2605        // Remove the archive rule
2606        let rule_name = format!("Events-Archive-{name}");
2607        state.rules.retain(|k, _| k.1 != rule_name);
2608
2609        Ok(AwsResponse::ok_json(json!({})))
2610    }
2611
2612    // ─── Connection Operations ──────────────────────────────────────────
2613
2614    fn create_connection(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2615        let body = req.json_body();
2616        validate_required("Name", &body["Name"])?;
2617        let name = body["Name"]
2618            .as_str()
2619            .ok_or_else(|| missing("Name"))?
2620            .to_string();
2621        validate_string_length("name", &name, 1, 64)?;
2622        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2623        validate_required("AuthorizationType", &body["AuthorizationType"])?;
2624        let description = body["Description"].as_str().map(|s| s.to_string());
2625        let auth_type = body["AuthorizationType"]
2626            .as_str()
2627            .ok_or_else(|| missing("AuthorizationType"))?
2628            .to_string();
2629        validate_enum(
2630            "authorizationType",
2631            &auth_type,
2632            &["BASIC", "OAUTH_CLIENT_CREDENTIALS", "API_KEY"],
2633        )?;
2634        validate_optional_string_length(
2635            "kmsKeyIdentifier",
2636            body["KmsKeyIdentifier"].as_str(),
2637            0,
2638            2048,
2639        )?;
2640        validate_required("AuthParameters", &body["AuthParameters"])?;
2641        let auth_params = body["AuthParameters"].clone();
2642
2643        let mut accounts = self.state.write();
2644        let state = accounts.get_or_create(&req.account_id);
2645        let now = Utc::now();
2646        let conn_uuid = uuid::Uuid::new_v4();
2647        let arn = format!(
2648            "arn:aws:events:{}:{}:connection/{}/{}",
2649            req.region, state.account_id, name, conn_uuid
2650        );
2651        let secret_arn = format!(
2652            "arn:aws:secretsmanager:{}:{}:secret:events!connection/{}/{}",
2653            req.region, state.account_id, name, conn_uuid
2654        );
2655
2656        let conn = Connection {
2657            name: name.clone(),
2658            arn: arn.clone(),
2659            description,
2660            authorization_type: auth_type.clone(),
2661            auth_parameters: auth_params,
2662            connection_state: "AUTHORIZED".to_string(),
2663            secret_arn: secret_arn.clone(),
2664            creation_time: now,
2665            last_modified_time: now,
2666            last_authorized_time: now,
2667        };
2668        state.connections.insert(name, conn);
2669
2670        Ok(AwsResponse::ok_json(json!({
2671            "ConnectionArn": arn,
2672            "ConnectionState": "AUTHORIZED",
2673            "CreationTime": now.timestamp() as f64,
2674            "LastModifiedTime": now.timestamp() as f64,
2675        })))
2676    }
2677
2678    fn describe_connection(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2679        let body = req.json_body();
2680        validate_required("Name", &body["Name"])?;
2681        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
2682        validate_string_length("name", name, 1, 64)?;
2683
2684        let accounts = self.state.read();
2685        let empty = EventBridgeState::new(&req.account_id, &req.region);
2686        let state = accounts.get(&req.account_id).unwrap_or(&empty);
2687        let conn = state.connections.get(name).ok_or_else(|| {
2688            AwsServiceError::aws_error(
2689                StatusCode::BAD_REQUEST,
2690                "ResourceNotFoundException",
2691                format!("Connection '{name}' does not exist."),
2692            )
2693        })?;
2694
2695        // Build auth parameters response - strip secrets
2696        let auth_params_response =
2697            build_auth_params_response(&conn.authorization_type, &conn.auth_parameters);
2698
2699        let mut resp = json!({
2700            "ConnectionArn": conn.arn,
2701            "Name": conn.name,
2702            "AuthorizationType": conn.authorization_type,
2703            "AuthParameters": auth_params_response,
2704            "ConnectionState": conn.connection_state,
2705            "SecretArn": conn.secret_arn,
2706            "CreationTime": conn.creation_time.timestamp() as f64,
2707            "LastModifiedTime": conn.last_modified_time.timestamp() as f64,
2708            "LastAuthorizedTime": conn.last_authorized_time.timestamp() as f64,
2709        });
2710        if let Some(ref desc) = conn.description {
2711            resp["Description"] = json!(desc);
2712        }
2713
2714        Ok(AwsResponse::ok_json(resp))
2715    }
2716
2717    fn list_connections(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2718        let body = req.json_body();
2719        validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 64)?;
2720        validate_optional_enum(
2721            "connectionState",
2722            body["ConnectionState"].as_str(),
2723            &[
2724                "CREATING",
2725                "UPDATING",
2726                "DELETING",
2727                "AUTHORIZED",
2728                "DEAUTHORIZED",
2729                "AUTHORIZING",
2730                "DEAUTHORIZING",
2731                "ACTIVE",
2732                "FAILED_CONNECTIVITY",
2733            ],
2734        )?;
2735        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
2736        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
2737
2738        let name_prefix = body["NamePrefix"].as_str();
2739        let connection_state = body["ConnectionState"].as_str();
2740        let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
2741
2742        let accounts = self.state.read();
2743        let empty = EventBridgeState::new(&req.account_id, &req.region);
2744        let state = accounts.get(&req.account_id).unwrap_or(&empty);
2745        let all: Vec<Value> = state
2746            .connections
2747            .values()
2748            .filter(|c| {
2749                if let Some(prefix) = name_prefix {
2750                    if !c.name.starts_with(prefix) {
2751                        return false;
2752                    }
2753                }
2754                if let Some(cs) = connection_state {
2755                    if c.connection_state != cs {
2756                        return false;
2757                    }
2758                }
2759                true
2760            })
2761            .map(|c| {
2762                json!({
2763                    "ConnectionArn": c.arn,
2764                    "Name": c.name,
2765                    "AuthorizationType": c.authorization_type,
2766                    "ConnectionState": c.connection_state,
2767                    "CreationTime": c.creation_time.timestamp() as f64,
2768                    "LastModifiedTime": c.last_modified_time.timestamp() as f64,
2769                    "LastAuthorizedTime": c.last_authorized_time.timestamp() as f64,
2770                })
2771            })
2772            .collect();
2773
2774        let (conns, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
2775        let mut resp = json!({ "Connections": conns });
2776        if let Some(token) = next_token {
2777            resp["NextToken"] = json!(token);
2778        }
2779
2780        Ok(AwsResponse::ok_json(resp))
2781    }
2782
2783    fn update_connection(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2784        let body = req.json_body();
2785        validate_required("Name", &body["Name"])?;
2786        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
2787        validate_string_length("name", name, 1, 64)?;
2788        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2789        validate_optional_enum(
2790            "authorizationType",
2791            body["AuthorizationType"].as_str(),
2792            &["BASIC", "OAUTH_CLIENT_CREDENTIALS", "API_KEY"],
2793        )?;
2794
2795        let mut accounts = self.state.write();
2796        let state = accounts.get_or_create(&req.account_id);
2797        let conn = state.connections.get_mut(name).ok_or_else(|| {
2798            AwsServiceError::aws_error(
2799                StatusCode::BAD_REQUEST,
2800                "ResourceNotFoundException",
2801                format!("Connection '{name}' does not exist."),
2802            )
2803        })?;
2804
2805        if let Some(desc) = body["Description"].as_str() {
2806            conn.description = Some(desc.to_string());
2807        }
2808        if let Some(auth_type) = body["AuthorizationType"].as_str() {
2809            conn.authorization_type = auth_type.to_string();
2810        }
2811        if body.get("AuthParameters").is_some() {
2812            conn.auth_parameters = body["AuthParameters"].clone();
2813        }
2814        conn.last_modified_time = Utc::now();
2815
2816        Ok(AwsResponse::ok_json(json!({
2817            "ConnectionArn": conn.arn,
2818            "ConnectionState": conn.connection_state,
2819            "CreationTime": conn.creation_time.timestamp() as f64,
2820            "LastModifiedTime": conn.last_modified_time.timestamp() as f64,
2821            "LastAuthorizedTime": conn.last_authorized_time.timestamp() as f64,
2822        })))
2823    }
2824
2825    fn delete_connection(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2826        let body = req.json_body();
2827        validate_required("Name", &body["Name"])?;
2828        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
2829        validate_string_length("name", name, 1, 64)?;
2830
2831        let mut accounts = self.state.write();
2832        let state = accounts.get_or_create(&req.account_id);
2833        let conn = state.connections.remove(name).ok_or_else(|| {
2834            AwsServiceError::aws_error(
2835                StatusCode::BAD_REQUEST,
2836                "ResourceNotFoundException",
2837                format!("Connection '{name}' does not exist."),
2838            )
2839        })?;
2840
2841        Ok(AwsResponse::ok_json(json!({
2842            "ConnectionArn": conn.arn,
2843            "ConnectionState": conn.connection_state,
2844            "CreationTime": conn.creation_time.timestamp() as f64,
2845            "LastModifiedTime": conn.last_modified_time.timestamp() as f64,
2846            "LastAuthorizedTime": conn.last_authorized_time.timestamp() as f64,
2847        })))
2848    }
2849
2850    // ─── API Destination Operations ─────────────────────────────────────
2851
2852    fn create_api_destination(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2853        let body = req.json_body();
2854        validate_required("Name", &body["Name"])?;
2855        let name = body["Name"]
2856            .as_str()
2857            .ok_or_else(|| missing("Name"))?
2858            .to_string();
2859        validate_string_length("name", &name, 1, 64)?;
2860        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2861        validate_required("ConnectionArn", &body["ConnectionArn"])?;
2862        let description = body["Description"].as_str().map(|s| s.to_string());
2863        let connection_arn = body["ConnectionArn"]
2864            .as_str()
2865            .ok_or_else(|| missing("ConnectionArn"))?
2866            .to_string();
2867        validate_string_length("connectionArn", &connection_arn, 1, 1600)?;
2868        validate_required("InvocationEndpoint", &body["InvocationEndpoint"])?;
2869        let endpoint = body["InvocationEndpoint"]
2870            .as_str()
2871            .ok_or_else(|| missing("InvocationEndpoint"))?
2872            .to_string();
2873        validate_string_length("invocationEndpoint", &endpoint, 1, 2048)?;
2874        validate_required("HttpMethod", &body["HttpMethod"])?;
2875        let http_method = body["HttpMethod"]
2876            .as_str()
2877            .ok_or_else(|| missing("HttpMethod"))?
2878            .to_string();
2879        validate_enum(
2880            "httpMethod",
2881            &http_method,
2882            &["POST", "GET", "HEAD", "OPTIONS", "PUT", "PATCH", "DELETE"],
2883        )?;
2884        let rate_limit = body["InvocationRateLimitPerSecond"].as_i64();
2885        if let Some(r) = rate_limit {
2886            validate_range_i64("invocationRateLimitPerSecond", r, 1, i64::MAX)?;
2887        }
2888
2889        let mut accounts = self.state.write();
2890        let state = accounts.get_or_create(&req.account_id);
2891        let now = Utc::now();
2892        let dest_uuid = uuid::Uuid::new_v4();
2893        let arn = format!(
2894            "arn:aws:events:{}:{}:api-destination/{}/{}",
2895            req.region, state.account_id, name, dest_uuid
2896        );
2897
2898        let dest = ApiDestination {
2899            name: name.clone(),
2900            arn: arn.clone(),
2901            description,
2902            connection_arn,
2903            invocation_endpoint: endpoint,
2904            http_method,
2905            invocation_rate_limit_per_second: rate_limit,
2906            state: "ACTIVE".to_string(),
2907            creation_time: now,
2908            last_modified_time: now,
2909        };
2910        state.api_destinations.insert(name, dest);
2911
2912        Ok(AwsResponse::ok_json(json!({
2913            "ApiDestinationArn": arn,
2914            "ApiDestinationState": "ACTIVE",
2915            "CreationTime": now.timestamp() as f64,
2916            "LastModifiedTime": now.timestamp() as f64,
2917        })))
2918    }
2919
2920    fn describe_api_destination(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2921        let body = req.json_body();
2922        validate_required("Name", &body["Name"])?;
2923        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
2924        validate_string_length("name", name, 1, 64)?;
2925
2926        let accounts = self.state.read();
2927        let empty = EventBridgeState::new(&req.account_id, &req.region);
2928        let state = accounts.get(&req.account_id).unwrap_or(&empty);
2929        let dest = state.api_destinations.get(name).ok_or_else(|| {
2930            AwsServiceError::aws_error(
2931                StatusCode::BAD_REQUEST,
2932                "ResourceNotFoundException",
2933                format!("An api-destination '{name}' does not exist."),
2934            )
2935        })?;
2936
2937        let mut resp = json!({
2938            "ApiDestinationArn": dest.arn,
2939            "Name": dest.name,
2940            "ConnectionArn": dest.connection_arn,
2941            "InvocationEndpoint": dest.invocation_endpoint,
2942            "HttpMethod": dest.http_method,
2943            "ApiDestinationState": dest.state,
2944            "CreationTime": dest.creation_time.timestamp() as f64,
2945            "LastModifiedTime": dest.last_modified_time.timestamp() as f64,
2946        });
2947        if let Some(ref desc) = dest.description {
2948            resp["Description"] = json!(desc);
2949        }
2950        if let Some(rate) = dest.invocation_rate_limit_per_second {
2951            resp["InvocationRateLimitPerSecond"] = json!(rate);
2952        }
2953
2954        Ok(AwsResponse::ok_json(resp))
2955    }
2956
2957    fn list_api_destinations(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2958        let body = req.json_body();
2959        validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 64)?;
2960        validate_optional_string_length("connectionArn", body["ConnectionArn"].as_str(), 1, 1600)?;
2961        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
2962        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
2963
2964        let name_prefix = body["NamePrefix"].as_str();
2965        let connection_arn = body["ConnectionArn"].as_str();
2966        let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
2967
2968        let accounts = self.state.read();
2969        let empty = EventBridgeState::new(&req.account_id, &req.region);
2970        let state = accounts.get(&req.account_id).unwrap_or(&empty);
2971        let all: Vec<Value> = state
2972            .api_destinations
2973            .values()
2974            .filter(|d| {
2975                if let Some(prefix) = name_prefix {
2976                    if !d.name.starts_with(prefix) {
2977                        return false;
2978                    }
2979                }
2980                if let Some(arn) = connection_arn {
2981                    if d.connection_arn != arn {
2982                        return false;
2983                    }
2984                }
2985                true
2986            })
2987            .map(|d| {
2988                let mut obj = json!({
2989                    "ApiDestinationArn": d.arn,
2990                    "Name": d.name,
2991                    "ConnectionArn": d.connection_arn,
2992                    "InvocationEndpoint": d.invocation_endpoint,
2993                    "HttpMethod": d.http_method,
2994                    "ApiDestinationState": d.state,
2995                    "CreationTime": d.creation_time.timestamp() as f64,
2996                    "LastModifiedTime": d.last_modified_time.timestamp() as f64,
2997                });
2998                if let Some(rate) = d.invocation_rate_limit_per_second {
2999                    obj["InvocationRateLimitPerSecond"] = json!(rate);
3000                }
3001                obj
3002            })
3003            .collect();
3004
3005        let (dests, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
3006        let mut resp = json!({ "ApiDestinations": dests });
3007        if let Some(token) = next_token {
3008            resp["NextToken"] = json!(token);
3009        }
3010
3011        Ok(AwsResponse::ok_json(resp))
3012    }
3013
3014    fn update_api_destination(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3015        let body = req.json_body();
3016        validate_required("Name", &body["Name"])?;
3017        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
3018        validate_string_length("name", name, 1, 64)?;
3019        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
3020        validate_optional_string_length("connectionArn", body["ConnectionArn"].as_str(), 1, 1600)?;
3021        validate_optional_string_length(
3022            "invocationEndpoint",
3023            body["InvocationEndpoint"].as_str(),
3024            1,
3025            2048,
3026        )?;
3027        validate_optional_enum(
3028            "httpMethod",
3029            body["HttpMethod"].as_str(),
3030            &["POST", "GET", "HEAD", "OPTIONS", "PUT", "PATCH", "DELETE"],
3031        )?;
3032        if let Some(r) = body["InvocationRateLimitPerSecond"].as_i64() {
3033            validate_range_i64("invocationRateLimitPerSecond", r, 1, i64::MAX)?;
3034        }
3035
3036        let mut accounts = self.state.write();
3037        let state = accounts.get_or_create(&req.account_id);
3038        let dest = state.api_destinations.get_mut(name).ok_or_else(|| {
3039            AwsServiceError::aws_error(
3040                StatusCode::BAD_REQUEST,
3041                "ResourceNotFoundException",
3042                format!("An api-destination '{name}' does not exist."),
3043            )
3044        })?;
3045
3046        if let Some(desc) = body["Description"].as_str() {
3047            dest.description = Some(desc.to_string());
3048        }
3049        if let Some(endpoint) = body["InvocationEndpoint"].as_str() {
3050            dest.invocation_endpoint = endpoint.to_string();
3051        }
3052        if let Some(method) = body["HttpMethod"].as_str() {
3053            dest.http_method = method.to_string();
3054        }
3055        if let Some(rate) = body["InvocationRateLimitPerSecond"].as_i64() {
3056            dest.invocation_rate_limit_per_second = Some(rate);
3057        }
3058        if let Some(conn) = body["ConnectionArn"].as_str() {
3059            dest.connection_arn = conn.to_string();
3060        }
3061        dest.last_modified_time = Utc::now();
3062
3063        Ok(AwsResponse::ok_json(json!({
3064            "ApiDestinationArn": dest.arn,
3065            "ApiDestinationState": dest.state,
3066            "CreationTime": dest.creation_time.timestamp() as f64,
3067            "LastModifiedTime": dest.last_modified_time.timestamp() as f64,
3068        })))
3069    }
3070
3071    fn delete_api_destination(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3072        let body = req.json_body();
3073        validate_required("Name", &body["Name"])?;
3074        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
3075        validate_string_length("name", name, 1, 64)?;
3076
3077        let mut accounts = self.state.write();
3078        let state = accounts.get_or_create(&req.account_id);
3079        if !state.api_destinations.contains_key(name) {
3080            return Err(AwsServiceError::aws_error(
3081                StatusCode::BAD_REQUEST,
3082                "ResourceNotFoundException",
3083                format!("An api-destination '{name}' does not exist."),
3084            ));
3085        }
3086        state.api_destinations.remove(name);
3087
3088        Ok(AwsResponse::ok_json(json!({})))
3089    }
3090
3091    // ─── Replay Operations ──────────────────────────────────────────────
3092
3093    fn start_replay(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3094        let input = StartReplayInput::from_body(&req.json_body())?;
3095
3096        let mut accounts = self.state.write();
3097        let state = accounts.get_or_create(&req.account_id);
3098
3099        // Validate event bus + archive, in the order the real service validates them.
3100        let bus_name = state.resolve_bus_name(&input.destination_arn);
3101        if !state.buses.contains_key(&bus_name) {
3102            return Err(AwsServiceError::aws_error(
3103                StatusCode::BAD_REQUEST,
3104                "ResourceNotFoundException",
3105                format!("Event bus {bus_name} does not exist."),
3106            ));
3107        }
3108
3109        let archive_name = input
3110            .event_source_arn
3111            .rsplit_once("archive/")
3112            .map(|(_, n)| n.to_string())
3113            .unwrap_or_default();
3114        let archive = state.archives.get(&archive_name).ok_or_else(|| {
3115            AwsServiceError::aws_error(
3116                StatusCode::BAD_REQUEST,
3117                "ValidationException",
3118                format!(
3119                    "Parameter EventSourceArn is not valid. Reason: Archive {archive_name} does not exist."
3120                ),
3121            )
3122        })?;
3123        let archive_bus = state.resolve_bus_name(&archive.event_source_arn);
3124        if archive_bus != bus_name {
3125            return Err(AwsServiceError::aws_error(
3126                StatusCode::BAD_REQUEST,
3127                "ValidationException",
3128                "Parameter Destination.Arn is not valid. Reason: Cross event bus replay is not permitted.",
3129            ));
3130        }
3131
3132        if input.event_end_time <= input.event_start_time {
3133            return Err(AwsServiceError::aws_error(
3134                StatusCode::BAD_REQUEST,
3135                "ValidationException",
3136                "Parameter EventEndTime is not valid. Reason: EventStartTime must be before EventEndTime.",
3137            ));
3138        }
3139
3140        if state.replays.contains_key(&input.name) {
3141            return Err(AwsServiceError::aws_error(
3142                StatusCode::BAD_REQUEST,
3143                "ResourceAlreadyExistsException",
3144                format!("Replay {} already exists.", input.name),
3145            ));
3146        }
3147
3148        let now = Utc::now();
3149        let arn = format!(
3150            "arn:aws:events:{}:{}:replay/{}",
3151            req.region, state.account_id, input.name
3152        );
3153
3154        let events_to_deliver = collect_replay_events_with_targets(
3155            state,
3156            &archive_name,
3157            &bus_name,
3158            input.event_start_time,
3159            input.event_end_time,
3160            &req.account_id,
3161            &req.region,
3162        );
3163
3164        let replay = Replay {
3165            name: input.name.clone(),
3166            arn: arn.clone(),
3167            description: input.description,
3168            event_source_arn: input.event_source_arn,
3169            destination: input.destination,
3170            event_start_time: input.event_start_time,
3171            event_end_time: input.event_end_time,
3172            state: "COMPLETED".to_string(),
3173            replay_start_time: now,
3174            replay_end_time: Some(now),
3175        };
3176        state.replays.insert(input.name, replay);
3177
3178        drop(accounts);
3179
3180        for (event, targets) in events_to_deliver {
3181            let detail_value: Value = serde_json::from_str(&event.detail).unwrap_or(json!({}));
3182            let event_json = json!({
3183                "version": "0",
3184                "id": event.event_id,
3185                "source": event.source,
3186                "account": req.account_id,
3187                "detail-type": event.detail_type,
3188                "detail": detail_value,
3189                "time": event.time.format("%Y-%m-%dT%H:%M:%SZ").to_string(),
3190                "region": req.region,
3191                "resources": event.resources,
3192                "replay-name": arn,
3193            });
3194            let event_str = event_json.to_string();
3195
3196            for target in targets {
3197                self.deliver_replay_event_to_target(
3198                    &target,
3199                    &event,
3200                    &event_json,
3201                    &event_str,
3202                    &req.account_id,
3203                );
3204            }
3205        }
3206
3207        Ok(AwsResponse::ok_json(json!({
3208            "ReplayArn": arn,
3209            "ReplayStartTime": now.timestamp() as f64,
3210            "State": "STARTING",
3211        })))
3212    }
3213
3214    fn deliver_replay_event_to_target(
3215        &self,
3216        target: &EventTarget,
3217        event: &PutEvent,
3218        event_json: &Value,
3219        event_str: &str,
3220        account_id: &str,
3221    ) {
3222        let target_arn = &target.arn;
3223        let body_str = if let Some(ref transformer) = target.input_transformer {
3224            apply_input_transformer(transformer, event_json)
3225        } else if let Some(ref input) = target.input {
3226            input.clone()
3227        } else if let Some(ref input_path) = target.input_path {
3228            resolve_json_path(event_json, input_path)
3229                .map(|v| v.to_string())
3230                .unwrap_or_else(|| event_str.to_string())
3231        } else {
3232            event_str.to_string()
3233        };
3234
3235        if target_arn.contains(":sqs:") {
3236            let group_id = target
3237                .sqs_parameters
3238                .as_ref()
3239                .and_then(|p| p["MessageGroupId"].as_str())
3240                .map(|s| s.to_string());
3241            if group_id.is_some() {
3242                self.delivery.send_to_sqs_with_attrs(
3243                    target_arn,
3244                    &body_str,
3245                    &HashMap::new(),
3246                    group_id.as_deref(),
3247                    None,
3248                );
3249            } else {
3250                self.delivery
3251                    .send_to_sqs(target_arn, &body_str, &HashMap::new());
3252            }
3253        } else if target_arn.contains(":sns:") {
3254            self.delivery
3255                .publish_to_sns(target_arn, &body_str, Some(&event.detail_type));
3256        } else if target_arn.contains(":lambda:") {
3257            let mut accounts = self.state.write();
3258            let state = accounts.get_or_create(account_id);
3259            state
3260                .lambda_invocations
3261                .push(crate::state::LambdaInvocation {
3262                    function_arn: target_arn.clone(),
3263                    payload: body_str.clone(),
3264                    timestamp: Utc::now(),
3265                });
3266            drop(accounts);
3267            if let Some(ref ls) = self.lambda_state {
3268                ls.write()
3269                    .get_or_create(account_id)
3270                    .invocations
3271                    .push(LambdaInvocation {
3272                        function_arn: target_arn.clone(),
3273                        payload: body_str.clone(),
3274                        timestamp: Utc::now(),
3275                        source: "aws:events".to_string(),
3276                    });
3277            }
3278            invoke_lambda_async(
3279                &self.container_runtime,
3280                &self.lambda_state,
3281                target_arn,
3282                &body_str,
3283            );
3284        } else if target_arn.contains(":logs:") {
3285            let mut accounts = self.state.write();
3286            let state = accounts.get_or_create(account_id);
3287            state.log_deliveries.push(crate::state::LogDelivery {
3288                log_group_arn: target_arn.clone(),
3289                payload: body_str.clone(),
3290                timestamp: Utc::now(),
3291            });
3292            drop(accounts);
3293            if let Some(ref log_state) = self.logs_state {
3294                deliver_to_logs(log_state, target_arn, &body_str, Utc::now());
3295            }
3296        } else if target_arn.contains(":states:") {
3297            self.delivery
3298                .start_stepfunctions_execution(target_arn, &body_str);
3299            let mut accounts = self.state.write();
3300            let state = accounts.get_or_create(account_id);
3301            state
3302                .step_function_executions
3303                .push(crate::state::StepFunctionExecution {
3304                    state_machine_arn: target_arn.clone(),
3305                    payload: body_str.clone(),
3306                    timestamp: Utc::now(),
3307                });
3308        } else if target_arn.starts_with("https://") || target_arn.starts_with("http://") {
3309            let url = target_arn.clone();
3310            let payload = body_str.clone();
3311            tokio::spawn(async move {
3312                let client = reqwest::Client::new();
3313                let _ = client
3314                    .post(&url)
3315                    .header("Content-Type", "application/json")
3316                    .body(payload)
3317                    .send()
3318                    .await;
3319            });
3320        }
3321    }
3322
3323    fn describe_replay(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3324        let body = req.json_body();
3325        validate_required("ReplayName", &body["ReplayName"])?;
3326        let name = body["ReplayName"]
3327            .as_str()
3328            .ok_or_else(|| missing("ReplayName"))?;
3329        validate_string_length("replayName", name, 1, 64)?;
3330
3331        let accounts = self.state.read();
3332        let empty = EventBridgeState::new(&req.account_id, &req.region);
3333        let state = accounts.get(&req.account_id).unwrap_or(&empty);
3334        let replay = state.replays.get(name).ok_or_else(|| {
3335            AwsServiceError::aws_error(
3336                StatusCode::BAD_REQUEST,
3337                "ResourceNotFoundException",
3338                format!("Replay {name} does not exist."),
3339            )
3340        })?;
3341
3342        let mut resp = json!({
3343            "Destination": replay.destination,
3344            "EventSourceArn": replay.event_source_arn,
3345            "EventStartTime": replay.event_start_time.timestamp() as f64,
3346            "EventEndTime": replay.event_end_time.timestamp() as f64,
3347            "ReplayArn": replay.arn,
3348            "ReplayName": replay.name,
3349            "ReplayStartTime": replay.replay_start_time.timestamp() as f64,
3350            "State": replay.state,
3351        });
3352        if let Some(ref desc) = replay.description {
3353            resp["Description"] = json!(desc);
3354        }
3355        if let Some(ref end) = replay.replay_end_time {
3356            resp["ReplayEndTime"] = json!(end.timestamp() as f64);
3357        }
3358
3359        Ok(AwsResponse::ok_json(resp))
3360    }
3361
3362    fn list_replays(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3363        let body = req.json_body();
3364        validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 64)?;
3365        validate_optional_string_length(
3366            "eventSourceArn",
3367            body["EventSourceArn"].as_str(),
3368            1,
3369            1600,
3370        )?;
3371        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
3372        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
3373        let name_prefix = body["NamePrefix"].as_str();
3374        let source_arn = body["EventSourceArn"].as_str();
3375        let replay_state = body["State"].as_str();
3376
3377        // Validate at most one filter
3378        let filter_count = [
3379            name_prefix.is_some(),
3380            source_arn.is_some(),
3381            replay_state.is_some(),
3382        ]
3383        .iter()
3384        .filter(|&&x| x)
3385        .count();
3386        if filter_count > 1 {
3387            return Err(AwsServiceError::aws_error(
3388                StatusCode::BAD_REQUEST,
3389                "ValidationException",
3390                "At most one filter is allowed for ListReplays. Use either : State, EventSourceArn, or NamePrefix.",
3391            ));
3392        }
3393
3394        // Validate state
3395        if let Some(s) = replay_state {
3396            let valid = [
3397                "CANCELLED",
3398                "CANCELLING",
3399                "COMPLETED",
3400                "FAILED",
3401                "RUNNING",
3402                "STARTING",
3403            ];
3404            if !valid.contains(&s) {
3405                return Err(AwsServiceError::aws_error(
3406                    StatusCode::BAD_REQUEST,
3407                    "ValidationException",
3408                    format!(
3409                        "1 validation error detected: Value '{}' at 'state' failed to satisfy constraint: Member must satisfy enum value set: [CANCELLED, CANCELLING, COMPLETED, FAILED, RUNNING, STARTING]",
3410                        s
3411                    ),
3412                ));
3413            }
3414        }
3415
3416        let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
3417
3418        let accounts = self.state.read();
3419        let empty = EventBridgeState::new(&req.account_id, &req.region);
3420        let state = accounts.get(&req.account_id).unwrap_or(&empty);
3421        let all: Vec<Value> = state
3422            .replays
3423            .values()
3424            .filter(|r| {
3425                if let Some(prefix) = name_prefix {
3426                    r.name.starts_with(prefix)
3427                } else if let Some(arn) = source_arn {
3428                    r.event_source_arn == arn
3429                } else if let Some(s) = replay_state {
3430                    r.state == s
3431                } else {
3432                    true
3433                }
3434            })
3435            .map(|r| {
3436                let mut obj = json!({
3437                    "EventSourceArn": r.event_source_arn,
3438                    "EventStartTime": r.event_start_time.timestamp() as f64,
3439                    "EventEndTime": r.event_end_time.timestamp() as f64,
3440                    "ReplayName": r.name,
3441                    "ReplayStartTime": r.replay_start_time.timestamp() as f64,
3442                    "State": r.state,
3443                });
3444                if let Some(ref end) = r.replay_end_time {
3445                    obj["ReplayEndTime"] = json!(end.timestamp() as f64);
3446                }
3447                obj
3448            })
3449            .collect();
3450
3451        let (replays, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
3452        let mut resp = json!({ "Replays": replays });
3453        if let Some(token) = next_token {
3454            resp["NextToken"] = json!(token);
3455        }
3456
3457        Ok(AwsResponse::ok_json(resp))
3458    }
3459
3460    fn cancel_replay(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3461        let body = req.json_body();
3462        validate_required("ReplayName", &body["ReplayName"])?;
3463        let name = body["ReplayName"]
3464            .as_str()
3465            .ok_or_else(|| missing("ReplayName"))?;
3466        validate_string_length("replayName", name, 1, 64)?;
3467
3468        let mut accounts = self.state.write();
3469        let state = accounts.get_or_create(&req.account_id);
3470        let replay = state.replays.get_mut(name).ok_or_else(|| {
3471            AwsServiceError::aws_error(
3472                StatusCode::BAD_REQUEST,
3473                "ResourceNotFoundException",
3474                format!("Replay {name} does not exist."),
3475            )
3476        })?;
3477
3478        // Can only cancel STARTING or RUNNING replays (or COMPLETED in our mock)
3479        if replay.state == "CANCELLED" || replay.state == "CANCELLING" {
3480            return Err(AwsServiceError::aws_error(
3481                StatusCode::BAD_REQUEST,
3482                "IllegalStatusException",
3483                format!("Replay {name} is not in a valid state for this operation."),
3484            ));
3485        }
3486
3487        let arn = replay.arn.clone();
3488        replay.state = "CANCELLED".to_string();
3489
3490        Ok(AwsResponse::ok_json(json!({
3491            "ReplayArn": arn,
3492            "State": "CANCELLING",
3493        })))
3494    }
3495}
3496
3497// ─── Tag Lookup Helpers ─────────────────────────────────────────────────
3498
3499fn find_tags_mut<'a>(
3500    state: &'a mut crate::state::EventBridgeState,
3501    arn: &str,
3502) -> Result<&'a mut HashMap<String, String>, AwsServiceError> {
3503    // Check buses
3504    for bus in state.buses.values_mut() {
3505        if bus.arn == arn {
3506            return Ok(&mut bus.tags);
3507        }
3508    }
3509    // Check rules
3510    for rule in state.rules.values_mut() {
3511        if rule.arn == arn {
3512            return Ok(&mut rule.tags);
3513        }
3514    }
3515
3516    // Parse ARN to give better error messages
3517    let error_msg = if arn.contains(":rule/") {
3518        // Extract rule name and bus from ARN
3519        let parts: Vec<&str> = arn.rsplitn(2, ":rule/").collect();
3520        if let Some(rule_path) = parts.first() {
3521            if let Some((bus, rule_name)) = rule_path.rsplit_once('/') {
3522                format!("Rule {rule_name} does not exist on EventBus {bus}.")
3523            } else {
3524                format!("Rule {} does not exist on EventBus default.", rule_path)
3525            }
3526        } else {
3527            format!("Resource {arn} not found.")
3528        }
3529    } else {
3530        format!("Resource {arn} not found.")
3531    };
3532
3533    Err(AwsServiceError::aws_error(
3534        StatusCode::BAD_REQUEST,
3535        "ResourceNotFoundException",
3536        error_msg,
3537    ))
3538}
3539
3540fn find_tags<'a>(
3541    state: &'a crate::state::EventBridgeState,
3542    arn: &str,
3543) -> Result<&'a HashMap<String, String>, AwsServiceError> {
3544    for bus in state.buses.values() {
3545        if bus.arn == arn {
3546            return Ok(&bus.tags);
3547        }
3548    }
3549    for rule in state.rules.values() {
3550        if rule.arn == arn {
3551            return Ok(&rule.tags);
3552        }
3553    }
3554
3555    let error_msg = if arn.contains(":rule/") {
3556        let parts: Vec<&str> = arn.rsplitn(2, ":rule/").collect();
3557        if let Some(rule_path) = parts.first() {
3558            if let Some((bus, rule_name)) = rule_path.rsplit_once('/') {
3559                format!("Rule {rule_name} does not exist on EventBus {bus}.")
3560            } else {
3561                format!("Rule {} does not exist on EventBus default.", rule_path)
3562            }
3563        } else {
3564            format!("Resource {arn} not found.")
3565        }
3566    } else {
3567        format!("Resource {arn} not found.")
3568    };
3569
3570    Err(AwsServiceError::aws_error(
3571        StatusCode::BAD_REQUEST,
3572        "ResourceNotFoundException",
3573        error_msg,
3574    ))
3575}
3576
3577// ─── Event Pattern Validation ────────────────────────────────────────
3578
3579fn validate_event_pattern(pattern: &str) -> Result<(), AwsServiceError> {
3580    let parsed: Value = serde_json::from_str(pattern).map_err(|_| {
3581        AwsServiceError::aws_error(
3582            StatusCode::BAD_REQUEST,
3583            "InvalidEventPatternException",
3584            "Event pattern is not valid. Reason: Invalid JSON",
3585        )
3586    })?;
3587
3588    validate_pattern_values(&parsed, "")?;
3589    Ok(())
3590}
3591
3592fn validate_pattern_values(value: &Value, path: &str) -> Result<(), AwsServiceError> {
3593    match value {
3594        Value::Object(obj) => {
3595            for (key, val) in obj {
3596                let new_path = if path.is_empty() {
3597                    key.clone()
3598                } else {
3599                    format!("{path}.{key}")
3600                };
3601                match val {
3602                    Value::Object(_) => validate_pattern_values(val, &new_path)?,
3603                    Value::Array(_) => {} // Arrays are fine at leaf level
3604                    _ => {
3605                        return Err(AwsServiceError::aws_error(
3606                            StatusCode::BAD_REQUEST,
3607                            "InvalidEventPatternException",
3608                            format!(
3609                                "Event pattern is not valid. Reason: '{}' must be an object or an array",
3610                                key
3611                            ),
3612                        ));
3613                    }
3614                }
3615            }
3616            Ok(())
3617        }
3618        _ => Ok(()),
3619    }
3620}
3621
3622// ─── Connection Auth Params Response Builder ────────────────────────
3623
3624fn build_auth_params_response(auth_type: &str, params: &Value) -> Value {
3625    match auth_type {
3626        "API_KEY" => {
3627            let mut resp = json!({});
3628            if let Some(api_key) = params.get("ApiKeyAuthParameters") {
3629                resp["ApiKeyAuthParameters"] = json!({
3630                    "ApiKeyName": api_key["ApiKeyName"],
3631                });
3632            }
3633            resp
3634        }
3635        "BASIC" => {
3636            let mut resp = json!({});
3637            if let Some(basic) = params.get("BasicAuthParameters") {
3638                resp["BasicAuthParameters"] = json!({
3639                    "Username": basic["Username"],
3640                });
3641            }
3642            resp
3643        }
3644        "OAUTH_CLIENT_CREDENTIALS" => {
3645            let mut resp = json!({});
3646            if let Some(oauth) = params.get("OAuthParameters") {
3647                resp["OAuthParameters"] = json!({
3648                    "AuthorizationEndpoint": oauth["AuthorizationEndpoint"],
3649                    "HttpMethod": oauth["HttpMethod"],
3650                    "ClientParameters": {
3651                        "ClientID": oauth.get("ClientParameters").and_then(|c| c.get("ClientID")),
3652                    },
3653                });
3654            }
3655            resp
3656        }
3657        _ => params.clone(),
3658    }
3659}
3660
3661// ─── Event Pattern Matching ─────────────────────────────────────────
3662
3663/// Match an event against an EventBridge event pattern.
3664pub fn matches_pattern(
3665    pattern_json: Option<&str>,
3666    source: &str,
3667    detail_type: &str,
3668    detail: &str,
3669    account: &str,
3670    region: &str,
3671    resources: &[String],
3672) -> bool {
3673    let pattern_json = match pattern_json {
3674        Some(p) => p,
3675        None => return true,
3676    };
3677
3678    let pattern: Value = match serde_json::from_str(pattern_json) {
3679        Ok(v) => v,
3680        Err(_) => return false,
3681    };
3682
3683    let pattern_obj = match pattern.as_object() {
3684        Some(o) => o,
3685        None => return false,
3686    };
3687
3688    let detail_value: Value = serde_json::from_str(detail).unwrap_or(json!({}));
3689    let event = json!({
3690        "source": source,
3691        "detail-type": detail_type,
3692        "detail": detail_value,
3693        "account": account,
3694        "region": region,
3695        "resources": resources,
3696    });
3697
3698    for (key, pattern_value) in pattern_obj {
3699        let event_value = &event[key];
3700        if !matches_value(pattern_value, event_value) {
3701            return false;
3702        }
3703    }
3704
3705    true
3706}
3707
3708fn matches_value(pattern: &Value, event_value: &Value) -> bool {
3709    match pattern {
3710        Value::Object(obj) => {
3711            for (key, sub_pattern) in obj {
3712                let sub_value = &event_value[key];
3713                if !matches_value(sub_pattern, sub_value) {
3714                    return false;
3715                }
3716            }
3717            true
3718        }
3719        Value::Array(arr) => arr.iter().any(|elem| matches_single(elem, event_value)),
3720        _ => false,
3721    }
3722}
3723
3724fn matches_single(pattern_elem: &Value, event_value: &Value) -> bool {
3725    match pattern_elem {
3726        Value::Object(obj) => {
3727            if let Some(prefix_val) = obj.get("prefix") {
3728                if let (Some(prefix), Some(actual)) = (prefix_val.as_str(), event_value.as_str()) {
3729                    return actual.starts_with(prefix);
3730                }
3731                return false;
3732            }
3733            if let Some(exists_val) = obj.get("exists") {
3734                let should_exist = exists_val.as_bool().unwrap_or(true);
3735                let does_exist = !event_value.is_null();
3736                return should_exist == does_exist;
3737            }
3738            if let Some(anything_but_val) = obj.get("anything-but") {
3739                return match anything_but_val {
3740                    Value::String(s) => event_value.as_str() != Some(s.as_str()),
3741                    Value::Array(arr) => !arr.iter().any(|v| values_equal(v, event_value)),
3742                    Value::Number(_) => event_value != anything_but_val,
3743                    _ => true,
3744                };
3745            }
3746            if let Some(numeric_val) = obj.get("numeric") {
3747                return matches_numeric(numeric_val, event_value);
3748            }
3749            false
3750        }
3751        _ => values_equal(pattern_elem, event_value),
3752    }
3753}
3754
3755/// For each archive on `event_bus_name` whose event pattern matches the
3756/// event, append a clone of it to the archive's stored events and bump
3757/// the archive's counters.
3758#[allow(clippy::too_many_arguments)]
3759fn archive_matching_event(
3760    state: &mut crate::state::EventBridgeState,
3761    event: &PutEvent,
3762    event_bus_name: &str,
3763    source: &str,
3764    detail_type: &str,
3765    detail: &str,
3766    account_id: &str,
3767    region: &str,
3768    resources: &[String],
3769) {
3770    let archive_keys: Vec<String> = state.archives.keys().cloned().collect();
3771    for akey in archive_keys {
3772        let (archive_bus, archive_pattern, archive_enabled) = {
3773            let a = &state.archives[&akey];
3774            (
3775                state.resolve_bus_name(&a.event_source_arn),
3776                a.event_pattern.clone(),
3777                a.state == "ENABLED",
3778            )
3779        };
3780        if archive_bus != event_bus_name || !archive_enabled {
3781            continue;
3782        }
3783        let pattern_matches = matches_pattern(
3784            archive_pattern.as_deref(),
3785            source,
3786            detail_type,
3787            detail,
3788            account_id,
3789            region,
3790            resources,
3791        );
3792        if !pattern_matches {
3793            continue;
3794        }
3795        if let Some(archive) = state.archives.get_mut(&akey) {
3796            archive.event_count += 1;
3797            archive.size_bytes += detail.len() as i64;
3798            archive.events.push(event.clone());
3799        }
3800    }
3801}
3802
3803/// Parsed + validated inputs for `StartReplay`.
3804struct StartReplayInput {
3805    name: String,
3806    description: Option<String>,
3807    event_source_arn: String,
3808    destination: Value,
3809    destination_arn: String,
3810    event_start_time: DateTime<Utc>,
3811    event_end_time: DateTime<Utc>,
3812}
3813
3814impl StartReplayInput {
3815    fn from_body(body: &Value) -> Result<Self, AwsServiceError> {
3816        validate_required("ReplayName", &body["ReplayName"])?;
3817        let name = body["ReplayName"]
3818            .as_str()
3819            .ok_or_else(|| missing("ReplayName"))?
3820            .to_string();
3821        validate_string_length("replayName", &name, 1, 64)?;
3822        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
3823        validate_required("EventSourceArn", &body["EventSourceArn"])?;
3824        let description = body["Description"].as_str().map(|s| s.to_string());
3825        let event_source_arn = body["EventSourceArn"]
3826            .as_str()
3827            .ok_or_else(|| missing("EventSourceArn"))?
3828            .to_string();
3829        validate_string_length("eventSourceArn", &event_source_arn, 1, 1600)?;
3830        validate_required("EventStartTime", &body["EventStartTime"])?;
3831        validate_required("EventEndTime", &body["EventEndTime"])?;
3832        validate_required("Destination", &body["Destination"])?;
3833        let destination = body["Destination"].clone();
3834
3835        let event_start_time = body["EventStartTime"]
3836            .as_f64()
3837            .and_then(|f| DateTime::from_timestamp(f as i64, 0))
3838            .unwrap_or_else(Utc::now);
3839        let event_end_time = body["EventEndTime"]
3840            .as_f64()
3841            .and_then(|f| DateTime::from_timestamp(f as i64, 0))
3842            .unwrap_or_else(Utc::now);
3843
3844        let destination_arn = destination["Arn"].as_str().unwrap_or("").to_string();
3845        if !destination_arn.contains(":event-bus/") {
3846            return Err(AwsServiceError::aws_error(
3847                StatusCode::BAD_REQUEST,
3848                "ValidationException",
3849                "Parameter Destination.Arn is not valid. Reason: Must contain an event bus ARN.",
3850            ));
3851        }
3852
3853        Ok(Self {
3854            name,
3855            description,
3856            event_source_arn,
3857            destination,
3858            destination_arn,
3859            event_start_time,
3860            event_end_time,
3861        })
3862    }
3863}
3864
3865/// Walk the named archive, filter events into the replay window, then
3866/// fan out each event against rules on `bus_name` to collect its
3867/// matching targets. Returns only events that matched at least one
3868/// target.
3869#[allow(clippy::too_many_arguments)]
3870fn collect_replay_events_with_targets(
3871    state: &crate::state::EventBridgeState,
3872    archive_name: &str,
3873    bus_name: &str,
3874    event_start_time: DateTime<Utc>,
3875    event_end_time: DateTime<Utc>,
3876    account_id: &str,
3877    region: &str,
3878) -> Vec<(PutEvent, Vec<EventTarget>)> {
3879    let Some(archive) = state.archives.get(archive_name) else {
3880        return Vec::new();
3881    };
3882
3883    let replay_events: Vec<PutEvent> = archive
3884        .events
3885        .iter()
3886        .filter(|e| e.time >= event_start_time && e.time < event_end_time)
3887        .cloned()
3888        .collect();
3889
3890    let mut events_to_deliver: Vec<(PutEvent, Vec<EventTarget>)> = Vec::new();
3891    for event in replay_events {
3892        let matching_targets: Vec<EventTarget> = state
3893            .rules
3894            .values()
3895            .filter(|r| {
3896                r.event_bus_name == bus_name
3897                    && r.state == "ENABLED"
3898                    && matches_pattern(
3899                        r.event_pattern.as_deref(),
3900                        &event.source,
3901                        &event.detail_type,
3902                        &event.detail,
3903                        account_id,
3904                        region,
3905                        &event.resources,
3906                    )
3907            })
3908            .flat_map(|r| r.targets.clone())
3909            .collect();
3910
3911        if !matching_targets.is_empty() {
3912            events_to_deliver.push((event, matching_targets));
3913        }
3914    }
3915    events_to_deliver
3916}
3917
3918fn matches_numeric(numeric_arr: &Value, event_value: &Value) -> bool {
3919    let arr = match numeric_arr.as_array() {
3920        Some(a) => a,
3921        None => return false,
3922    };
3923    let actual = match event_value.as_f64() {
3924        Some(n) => n,
3925        None => return false,
3926    };
3927    let mut i = 0;
3928    while i + 1 < arr.len() {
3929        let op = match arr[i].as_str() {
3930            Some(s) => s,
3931            None => return false,
3932        };
3933        let threshold = match arr[i + 1].as_f64() {
3934            Some(n) => n,
3935            None => return false,
3936        };
3937        let ok = match op {
3938            ">" => actual > threshold,
3939            ">=" => actual >= threshold,
3940            "<" => actual < threshold,
3941            "<=" => actual <= threshold,
3942            "=" => (actual - threshold).abs() < f64::EPSILON,
3943            _ => return false,
3944        };
3945        if !ok {
3946            return false;
3947        }
3948        i += 2;
3949    }
3950    true
3951}
3952
3953fn values_equal(a: &Value, b: &Value) -> bool {
3954    a == b
3955}
3956
3957/// Resolve a simple JSON path like `$.detail.name` against an event JSON value.
3958fn resolve_json_path(event: &Value, path: &str) -> Option<Value> {
3959    let path = path.strip_prefix('$').unwrap_or(path);
3960    let mut current = event;
3961    for segment in path.split('.') {
3962        if segment.is_empty() {
3963            continue;
3964        }
3965        current = current.get(segment)?;
3966    }
3967    Some(current.clone())
3968}
3969
3970/// Apply an EventBridge InputTransformer to an event.
3971fn apply_input_transformer(transformer: &Value, event: &Value) -> String {
3972    let input_paths_map = transformer
3973        .get("InputPathsMap")
3974        .and_then(|v| v.as_object())
3975        .cloned()
3976        .unwrap_or_default();
3977    let template = transformer
3978        .get("InputTemplate")
3979        .and_then(|v| v.as_str())
3980        .unwrap_or("")
3981        .to_string();
3982
3983    // Resolve all input paths
3984    let mut resolved: HashMap<String, Value> = HashMap::new();
3985    for (var_name, path_val) in &input_paths_map {
3986        if let Some(path_str) = path_val.as_str() {
3987            if let Some(val) = resolve_json_path(event, path_str) {
3988                resolved.insert(var_name.clone(), val);
3989            }
3990        }
3991    }
3992
3993    // Replace <varName> placeholders in template
3994    let mut result = template;
3995    for (var_name, val) in &resolved {
3996        let placeholder = format!("<{var_name}>");
3997        let replacement = match val {
3998            Value::String(s) => s.clone(),
3999            other => other.to_string(),
4000        };
4001        result = result.replace(&placeholder, &replacement);
4002    }
4003
4004    result
4005}
4006
4007fn missing(name: &str) -> AwsServiceError {
4008    AwsServiceError::aws_error(
4009        StatusCode::BAD_REQUEST,
4010        "ValidationException",
4011        format!("The request must contain the parameter {name}"),
4012    )
4013}
4014
4015/// Extract a Lambda function name from its ARN.
4016///
4017/// Handles both unqualified (`arn:aws:lambda:region:account:function:NAME`)
4018/// and qualified (`arn:aws:lambda:region:account:function:NAME:alias`) ARNs.
4019fn function_name_from_arn(arn: &str) -> &str {
4020    let parts: Vec<&str> = arn.split(':').collect();
4021    if parts.len() >= 7 && parts[5] == "function" {
4022        parts[6]
4023    } else {
4024        arn
4025    }
4026}
4027
4028/// Spawn a background task to invoke a Lambda function via ContainerRuntime.
4029/// This is fire-and-forget: EventBridge delivery is asynchronous.
4030pub fn invoke_lambda_async(
4031    container_runtime: &Option<Arc<ContainerRuntime>>,
4032    lambda_state: &Option<SharedLambdaState>,
4033    function_arn: &str,
4034    payload: &str,
4035) {
4036    let runtime = match container_runtime {
4037        Some(rt) => rt.clone(),
4038        None => return,
4039    };
4040    let lambda_state = match lambda_state {
4041        Some(ls) => ls.clone(),
4042        None => return,
4043    };
4044    let func_name = function_name_from_arn(function_arn).to_string();
4045    let payload = payload.as_bytes().to_vec();
4046
4047    tokio::spawn(async move {
4048        let resolved = {
4049            let accounts = lambda_state.read();
4050            let state = accounts.default_ref();
4051            state.functions.get(&func_name).cloned().map(|func| {
4052                let mut layer_zips: Vec<Vec<u8>> = Vec::with_capacity(func.layers.len());
4053                for attached in &func.layers {
4054                    if let Some(bytes) = fakecloud_lambda::extras::parse_layer_version_arn(
4055                        &attached.arn,
4056                    )
4057                    .and_then(|(acct, name, ver)| {
4058                        accounts
4059                            .get(&acct)
4060                            .and_then(|s| s.layers.get(&name))
4061                            .and_then(|l| l.versions.iter().find(|v| v.version == ver))
4062                            .and_then(|v| v.code_zip.clone())
4063                    }) {
4064                        layer_zips.push(bytes);
4065                    }
4066                }
4067                (func, layer_zips)
4068            })
4069        };
4070        let (func, layer_zips) = match resolved {
4071            Some(pair) => pair,
4072            None => {
4073                tracing::warn!(
4074                    function = %func_name,
4075                    "EventBridge Lambda target not found, skipping invocation"
4076                );
4077                return;
4078            }
4079        };
4080        match runtime.invoke(&func, &payload, &layer_zips).await {
4081            Ok(_) => {
4082                tracing::info!(function = %func_name, "EventBridge Lambda invocation succeeded");
4083            }
4084            Err(e) => {
4085                tracing::warn!(
4086                    function = %func_name,
4087                    error = %e,
4088                    "EventBridge Lambda invocation failed"
4089                );
4090            }
4091        }
4092    });
4093}
4094
4095/// Deliver an EventBridge event to CloudWatch Logs by writing a log event
4096/// to the appropriate log group and stream.
4097pub fn deliver_to_logs(
4098    logs_state: &SharedLogsState,
4099    log_group_arn: &str,
4100    payload: &str,
4101    timestamp: chrono::DateTime<chrono::Utc>,
4102) {
4103    // Extract log group name from ARN: arn:aws:logs:region:account:log-group:NAME
4104    // or just the name if it's not an ARN
4105    let group_name = if log_group_arn.contains(":log-group:") {
4106        log_group_arn
4107            .split(":log-group:")
4108            .nth(1)
4109            .unwrap_or(log_group_arn)
4110            .trim_end_matches(":*")
4111    } else {
4112        log_group_arn
4113    };
4114
4115    let stream_name = "events".to_string();
4116    let ts_millis = timestamp.timestamp_millis();
4117
4118    let mut accounts = logs_state.write();
4119    let state = accounts.default_mut();
4120    let region = state.region.clone();
4121    let account_id = state.account_id.clone();
4122
4123    // Auto-create log group and stream if they don't exist
4124    let group = state
4125        .log_groups
4126        .entry(group_name.to_string())
4127        .or_insert_with(|| fakecloud_logs::state::LogGroup {
4128            name: group_name.to_string(),
4129            arn: Arn::new(
4130                "logs",
4131                &region,
4132                &account_id,
4133                &format!("log-group:{group_name}"),
4134            )
4135            .to_string(),
4136            creation_time: ts_millis,
4137            retention_in_days: None,
4138            kms_key_id: None,
4139            tags: std::collections::BTreeMap::new(),
4140            log_streams: std::collections::BTreeMap::new(),
4141            stored_bytes: 0,
4142            subscription_filters: Vec::new(),
4143            data_protection_policy: None,
4144            index_policies: Vec::new(),
4145            transformer: None,
4146            deletion_protection: false,
4147            log_group_class: Some("STANDARD".to_string()),
4148        });
4149
4150    let stream = group
4151        .log_streams
4152        .entry(stream_name.clone())
4153        .or_insert_with(|| fakecloud_logs::state::LogStream {
4154            name: stream_name,
4155            arn: format!("{}:log-stream:events", group.arn),
4156            creation_time: ts_millis,
4157            first_event_timestamp: None,
4158            last_event_timestamp: None,
4159            last_ingestion_time: None,
4160            upload_sequence_token: "1".to_string(),
4161            events: Vec::new(),
4162        });
4163
4164    stream.events.push(fakecloud_logs::state::LogEvent {
4165        timestamp: ts_millis,
4166        message: payload.to_string(),
4167        ingestion_time: ts_millis,
4168    });
4169    stream.last_event_timestamp = Some(ts_millis);
4170    stream.last_ingestion_time = Some(ts_millis);
4171    if stream.first_event_timestamp.is_none() {
4172        stream.first_event_timestamp = Some(ts_millis);
4173    }
4174}
4175
4176/// Apply connection auth parameters to an outgoing HTTP request.
4177fn apply_connection_auth(
4178    mut builder: reqwest::RequestBuilder,
4179    conn: &Connection,
4180) -> reqwest::RequestBuilder {
4181    match conn.authorization_type.as_str() {
4182        "API_KEY" => {
4183            if let Some(params) = conn.auth_parameters.get("ApiKeyAuthParameters") {
4184                if let (Some(name), Some(value)) = (
4185                    params["ApiKeyName"].as_str(),
4186                    params["ApiKeyValue"].as_str(),
4187                ) {
4188                    builder = builder.header(name, value);
4189                }
4190            }
4191        }
4192        "BASIC" => {
4193            if let Some(params) = conn.auth_parameters.get("BasicAuthParameters") {
4194                if let (Some(user), Some(pass)) =
4195                    (params["Username"].as_str(), params["Password"].as_str())
4196                {
4197                    builder = builder.basic_auth(user, Some(pass));
4198                }
4199            }
4200        }
4201        "OAUTH_CLIENT_CREDENTIALS" => {
4202            // For OAuth, in a real implementation we'd exchange credentials for a token.
4203            // Here we pass client credentials as basic auth as a reasonable approximation.
4204            if let Some(params) = conn.auth_parameters.get("OAuthParameters") {
4205                if let (Some(client_id), Some(client_secret)) = (
4206                    params["ClientParameters"]["ClientID"].as_str(),
4207                    params["ClientParameters"]["ClientSecret"].as_str(),
4208                ) {
4209                    builder = builder.basic_auth(client_id, Some(client_secret));
4210                }
4211            }
4212        }
4213        _ => {}
4214    }
4215    builder
4216}
4217
4218#[cfg(test)]
4219mod tests {
4220    use super::*;
4221
4222    /// Test helper that calls matches_pattern with default account/region/resources
4223    fn test_matches(
4224        pattern_json: Option<&str>,
4225        source: &str,
4226        detail_type: &str,
4227        detail: &str,
4228    ) -> bool {
4229        matches_pattern(
4230            pattern_json,
4231            source,
4232            detail_type,
4233            detail,
4234            "123456789012",
4235            "us-east-1",
4236            &[],
4237        )
4238    }
4239
4240    #[test]
4241    fn pattern_matches_source() {
4242        assert!(test_matches(
4243            Some(r#"{"source": ["my.app"]}"#),
4244            "my.app",
4245            "OrderPlaced",
4246            "{}"
4247        ));
4248        assert!(!test_matches(
4249            Some(r#"{"source": ["other.app"]}"#),
4250            "my.app",
4251            "OrderPlaced",
4252            "{}"
4253        ));
4254    }
4255
4256    #[test]
4257    fn pattern_matches_detail_type() {
4258        assert!(test_matches(
4259            Some(r#"{"detail-type": ["OrderPlaced"]}"#),
4260            "my.app",
4261            "OrderPlaced",
4262            "{}"
4263        ));
4264        assert!(!test_matches(
4265            Some(r#"{"detail-type": ["OrderShipped"]}"#),
4266            "my.app",
4267            "OrderPlaced",
4268            "{}"
4269        ));
4270    }
4271
4272    #[test]
4273    fn pattern_matches_detail_field() {
4274        assert!(test_matches(
4275            Some(r#"{"detail": {"status": ["ACTIVE"]}}"#),
4276            "my.app",
4277            "StatusChange",
4278            r#"{"status": "ACTIVE"}"#
4279        ));
4280        assert!(!test_matches(
4281            Some(r#"{"detail": {"status": ["ACTIVE"]}}"#),
4282            "my.app",
4283            "StatusChange",
4284            r#"{"status": "INACTIVE"}"#
4285        ));
4286    }
4287
4288    #[test]
4289    fn no_pattern_matches_everything() {
4290        assert!(test_matches(None, "any", "any", "{}"));
4291    }
4292
4293    #[test]
4294    fn combined_pattern() {
4295        let pattern = r#"{"source": ["orders"], "detail-type": ["OrderPlaced"]}"#;
4296        assert!(test_matches(Some(pattern), "orders", "OrderPlaced", "{}"));
4297        assert!(!test_matches(Some(pattern), "orders", "OrderShipped", "{}"));
4298        assert!(!test_matches(Some(pattern), "other", "OrderPlaced", "{}"));
4299    }
4300
4301    #[test]
4302    fn nested_detail_pattern() {
4303        let pattern = r#"{"detail": {"order": {"status": ["PLACED"]}}}"#;
4304        assert!(test_matches(
4305            Some(pattern),
4306            "my.app",
4307            "OrderEvent",
4308            r#"{"order": {"status": "PLACED", "id": "123"}}"#
4309        ));
4310        assert!(!test_matches(
4311            Some(pattern),
4312            "my.app",
4313            "OrderEvent",
4314            r#"{"order": {"status": "SHIPPED", "id": "123"}}"#
4315        ));
4316        assert!(!test_matches(
4317            Some(pattern),
4318            "my.app",
4319            "OrderEvent",
4320            r#"{"order": {"id": "123"}}"#
4321        ));
4322    }
4323
4324    #[test]
4325    fn deeply_nested_detail_pattern() {
4326        let pattern = r#"{"detail": {"a": {"b": {"c": ["deep"]}}}}"#;
4327        assert!(test_matches(
4328            Some(pattern),
4329            "src",
4330            "type",
4331            r#"{"a": {"b": {"c": "deep"}}}"#
4332        ));
4333        assert!(!test_matches(
4334            Some(pattern),
4335            "src",
4336            "type",
4337            r#"{"a": {"b": {"c": "shallow"}}}"#
4338        ));
4339    }
4340
4341    #[test]
4342    fn prefix_matcher() {
4343        let pattern = r#"{"source": [{"prefix": "com.myapp"}]}"#;
4344        assert!(test_matches(
4345            Some(pattern),
4346            "com.myapp.orders",
4347            "OrderPlaced",
4348            "{}"
4349        ));
4350        assert!(test_matches(
4351            Some(pattern),
4352            "com.myapp",
4353            "OrderPlaced",
4354            "{}"
4355        ));
4356        assert!(!test_matches(
4357            Some(pattern),
4358            "com.other",
4359            "OrderPlaced",
4360            "{}"
4361        ));
4362    }
4363
4364    #[test]
4365    fn prefix_matcher_in_detail() {
4366        let pattern = r#"{"detail": {"region": [{"prefix": "us-"}]}}"#;
4367        assert!(test_matches(
4368            Some(pattern),
4369            "src",
4370            "type",
4371            r#"{"region": "us-east-1"}"#
4372        ));
4373        assert!(!test_matches(
4374            Some(pattern),
4375            "src",
4376            "type",
4377            r#"{"region": "eu-west-1"}"#
4378        ));
4379    }
4380
4381    #[test]
4382    fn exists_matcher() {
4383        let pattern = r#"{"detail": {"error": [{"exists": true}]}}"#;
4384        assert!(test_matches(
4385            Some(pattern),
4386            "src",
4387            "type",
4388            r#"{"error": "something broke"}"#
4389        ));
4390        assert!(!test_matches(
4391            Some(pattern),
4392            "src",
4393            "type",
4394            r#"{"status": "ok"}"#
4395        ));
4396
4397        let pattern = r#"{"detail": {"error": [{"exists": false}]}}"#;
4398        assert!(test_matches(
4399            Some(pattern),
4400            "src",
4401            "type",
4402            r#"{"status": "ok"}"#
4403        ));
4404        assert!(!test_matches(
4405            Some(pattern),
4406            "src",
4407            "type",
4408            r#"{"error": "something broke"}"#
4409        ));
4410    }
4411
4412    #[test]
4413    fn anything_but_matcher() {
4414        let pattern = r#"{"source": [{"anything-but": "internal"}]}"#;
4415        assert!(test_matches(Some(pattern), "external", "Event", "{}"));
4416        assert!(!test_matches(Some(pattern), "internal", "Event", "{}"));
4417
4418        let pattern = r#"{"source": [{"anything-but": ["internal", "test"]}]}"#;
4419        assert!(test_matches(Some(pattern), "external", "Event", "{}"));
4420        assert!(!test_matches(Some(pattern), "internal", "Event", "{}"));
4421        assert!(!test_matches(Some(pattern), "test", "Event", "{}"));
4422    }
4423
4424    #[test]
4425    fn anything_but_in_detail() {
4426        let pattern = r#"{"detail": {"env": [{"anything-but": "prod"}]}}"#;
4427        assert!(test_matches(
4428            Some(pattern),
4429            "src",
4430            "type",
4431            r#"{"env": "staging"}"#
4432        ));
4433        assert!(!test_matches(
4434            Some(pattern),
4435            "src",
4436            "type",
4437            r#"{"env": "prod"}"#
4438        ));
4439    }
4440
4441    #[test]
4442    fn numeric_greater_than() {
4443        let pattern = r#"{"detail": {"count": [{"numeric": [">", 100]}]}}"#;
4444        assert!(test_matches(
4445            Some(pattern),
4446            "src",
4447            "type",
4448            r#"{"count": 150}"#
4449        ));
4450        assert!(!test_matches(
4451            Some(pattern),
4452            "src",
4453            "type",
4454            r#"{"count": 100}"#
4455        ));
4456        assert!(!test_matches(
4457            Some(pattern),
4458            "src",
4459            "type",
4460            r#"{"count": 50}"#
4461        ));
4462    }
4463
4464    #[test]
4465    fn numeric_less_than() {
4466        let pattern = r#"{"detail": {"count": [{"numeric": ["<", 10]}]}}"#;
4467        assert!(test_matches(
4468            Some(pattern),
4469            "src",
4470            "type",
4471            r#"{"count": 5}"#
4472        ));
4473        assert!(!test_matches(
4474            Some(pattern),
4475            "src",
4476            "type",
4477            r#"{"count": 10}"#
4478        ));
4479        assert!(!test_matches(
4480            Some(pattern),
4481            "src",
4482            "type",
4483            r#"{"count": 15}"#
4484        ));
4485    }
4486
4487    #[test]
4488    fn numeric_range() {
4489        let pattern = r#"{"detail": {"count": [{"numeric": [">=", 50, "<", 200]}]}}"#;
4490        assert!(test_matches(
4491            Some(pattern),
4492            "src",
4493            "type",
4494            r#"{"count": 50}"#
4495        ));
4496        assert!(test_matches(
4497            Some(pattern),
4498            "src",
4499            "type",
4500            r#"{"count": 100}"#
4501        ));
4502        assert!(!test_matches(
4503            Some(pattern),
4504            "src",
4505            "type",
4506            r#"{"count": 200}"#
4507        ));
4508        assert!(!test_matches(
4509            Some(pattern),
4510            "src",
4511            "type",
4512            r#"{"count": 49}"#
4513        ));
4514    }
4515
4516    #[test]
4517    fn mixed_matchers_and_literals() {
4518        let pattern = r#"{"source": ["exact.match", {"prefix": "com.myapp"}]}"#;
4519        assert!(test_matches(Some(pattern), "exact.match", "Event", "{}"));
4520        assert!(test_matches(
4521            Some(pattern),
4522            "com.myapp.orders",
4523            "Event",
4524            "{}"
4525        ));
4526        assert!(!test_matches(Some(pattern), "other.source", "Event", "{}"));
4527    }
4528
4529    // ---- list_connections / list_api_destinations filtering & pagination ----
4530
4531    use fakecloud_core::delivery::DeliveryBus;
4532    use parking_lot::RwLock;
4533
4534    fn make_service() -> EventBridgeService {
4535        let state = Arc::new(RwLock::new(
4536            fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
4537        ));
4538        let delivery = Arc::new(DeliveryBus::new());
4539        EventBridgeService::new(state, delivery)
4540    }
4541
4542    fn make_request(action: &str, body: Value) -> AwsRequest {
4543        AwsRequest {
4544            service: "events".to_string(),
4545            action: action.to_string(),
4546            region: "us-east-1".to_string(),
4547            account_id: "123456789012".to_string(),
4548            request_id: "test-id".to_string(),
4549            headers: http::HeaderMap::new(),
4550            query_params: HashMap::new(),
4551            body: serde_json::to_vec(&body).unwrap().into(),
4552            body_stream: parking_lot::Mutex::new(None),
4553            path_segments: vec![],
4554            raw_path: "/".to_string(),
4555            raw_query: String::new(),
4556            method: http::Method::POST,
4557            is_query_protocol: false,
4558            access_key_id: None,
4559            principal: None,
4560        }
4561    }
4562
4563    fn create_connection(svc: &EventBridgeService, name: &str) {
4564        let req = make_request(
4565            "CreateConnection",
4566            json!({
4567                "Name": name,
4568                "AuthorizationType": "API_KEY",
4569                "AuthParameters": {
4570                    "ApiKeyAuthParameters": {
4571                        "ApiKeyName": "x-api-key",
4572                        "ApiKeyValue": "secret"
4573                    }
4574                }
4575            }),
4576        );
4577        svc.create_connection(&req).unwrap();
4578    }
4579
4580    fn create_api_destination(svc: &EventBridgeService, name: &str, conn_name: &str) {
4581        let conn_arn_field = {
4582            let _mas = svc.state.read();
4583            let state = _mas.default_ref();
4584            state.connections.get(conn_name).unwrap().arn.clone()
4585        };
4586        let req = make_request(
4587            "CreateApiDestination",
4588            json!({
4589                "Name": name,
4590                "ConnectionArn": conn_arn_field,
4591                "InvocationEndpoint": "https://example.com",
4592                "HttpMethod": "POST"
4593            }),
4594        );
4595        svc.create_api_destination(&req).unwrap();
4596    }
4597
4598    // -- ListConnections tests --
4599
4600    #[test]
4601    fn list_connections_returns_all_by_default() {
4602        let svc = make_service();
4603        create_connection(&svc, "conn-alpha");
4604        create_connection(&svc, "conn-beta");
4605        create_connection(&svc, "conn-gamma");
4606
4607        let req = make_request("ListConnections", json!({}));
4608        let resp = svc.list_connections(&req).unwrap();
4609        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4610        assert_eq!(body["Connections"].as_array().unwrap().len(), 3);
4611        assert!(body["NextToken"].is_null());
4612    }
4613
4614    #[test]
4615    fn list_connections_name_prefix_filter() {
4616        let svc = make_service();
4617        create_connection(&svc, "prod-conn-1");
4618        create_connection(&svc, "prod-conn-2");
4619        create_connection(&svc, "dev-conn-1");
4620
4621        let req = make_request("ListConnections", json!({ "NamePrefix": "prod-" }));
4622        let resp = svc.list_connections(&req).unwrap();
4623        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4624        let names: Vec<&str> = body["Connections"]
4625            .as_array()
4626            .unwrap()
4627            .iter()
4628            .map(|c| c["Name"].as_str().unwrap())
4629            .collect();
4630        assert_eq!(names.len(), 2);
4631        assert!(names.iter().all(|n| n.starts_with("prod-")));
4632    }
4633
4634    #[test]
4635    fn list_connections_state_filter() {
4636        let svc = make_service();
4637        create_connection(&svc, "conn-a");
4638        create_connection(&svc, "conn-b");
4639
4640        // All connections start as AUTHORIZED; change one
4641        {
4642            let mut _mas = svc.state.write();
4643            let state = _mas.default_mut();
4644            state
4645                .connections
4646                .get_mut("conn-b")
4647                .unwrap()
4648                .connection_state = "DEAUTHORIZED".to_string();
4649        }
4650
4651        let req = make_request(
4652            "ListConnections",
4653            json!({ "ConnectionState": "AUTHORIZED" }),
4654        );
4655        let resp = svc.list_connections(&req).unwrap();
4656        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4657        let conns = body["Connections"].as_array().unwrap();
4658        assert_eq!(conns.len(), 1);
4659        assert_eq!(conns[0]["Name"].as_str().unwrap(), "conn-a");
4660    }
4661
4662    #[test]
4663    fn list_connections_pagination() {
4664        let svc = make_service();
4665        for i in 0..5 {
4666            create_connection(&svc, &format!("conn-{i:02}"));
4667        }
4668
4669        // First page: limit 2
4670        let req = make_request("ListConnections", json!({ "Limit": 2 }));
4671        let resp = svc.list_connections(&req).unwrap();
4672        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4673        assert_eq!(body["Connections"].as_array().unwrap().len(), 2);
4674        let token = body["NextToken"].as_str().unwrap();
4675        assert_eq!(token, "2");
4676
4677        // Second page
4678        let req = make_request("ListConnections", json!({ "Limit": 2, "NextToken": token }));
4679        let resp = svc.list_connections(&req).unwrap();
4680        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4681        assert_eq!(body["Connections"].as_array().unwrap().len(), 2);
4682        let token = body["NextToken"].as_str().unwrap();
4683        assert_eq!(token, "4");
4684
4685        // Third page (only 1 remaining)
4686        let req = make_request("ListConnections", json!({ "Limit": 2, "NextToken": token }));
4687        let resp = svc.list_connections(&req).unwrap();
4688        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4689        assert_eq!(body["Connections"].as_array().unwrap().len(), 1);
4690        assert!(body["NextToken"].is_null());
4691    }
4692
4693    #[test]
4694    fn list_connections_pagination_with_filter() {
4695        let svc = make_service();
4696        for i in 0..4 {
4697            create_connection(&svc, &format!("prod-{i:02}"));
4698        }
4699        create_connection(&svc, "dev-00");
4700
4701        let req = make_request(
4702            "ListConnections",
4703            json!({ "NamePrefix": "prod-", "Limit": 2 }),
4704        );
4705        let resp = svc.list_connections(&req).unwrap();
4706        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4707        assert_eq!(body["Connections"].as_array().unwrap().len(), 2);
4708        assert!(body["NextToken"].as_str().is_some());
4709    }
4710
4711    // -- ListApiDestinations tests --
4712
4713    #[test]
4714    fn list_api_destinations_returns_all_by_default() {
4715        let svc = make_service();
4716        create_connection(&svc, "my-conn");
4717        create_api_destination(&svc, "dest-alpha", "my-conn");
4718        create_api_destination(&svc, "dest-beta", "my-conn");
4719
4720        let req = make_request("ListApiDestinations", json!({}));
4721        let resp = svc.list_api_destinations(&req).unwrap();
4722        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4723        assert_eq!(body["ApiDestinations"].as_array().unwrap().len(), 2);
4724        assert!(body["NextToken"].is_null());
4725    }
4726
4727    #[test]
4728    fn list_api_destinations_name_prefix_filter() {
4729        let svc = make_service();
4730        create_connection(&svc, "my-conn");
4731        create_api_destination(&svc, "prod-dest-1", "my-conn");
4732        create_api_destination(&svc, "prod-dest-2", "my-conn");
4733        create_api_destination(&svc, "dev-dest-1", "my-conn");
4734
4735        let req = make_request("ListApiDestinations", json!({ "NamePrefix": "prod-" }));
4736        let resp = svc.list_api_destinations(&req).unwrap();
4737        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4738        let names: Vec<&str> = body["ApiDestinations"]
4739            .as_array()
4740            .unwrap()
4741            .iter()
4742            .map(|d| d["Name"].as_str().unwrap())
4743            .collect();
4744        assert_eq!(names.len(), 2);
4745        assert!(names.iter().all(|n| n.starts_with("prod-")));
4746    }
4747
4748    #[test]
4749    fn list_api_destinations_connection_arn_filter() {
4750        let svc = make_service();
4751        create_connection(&svc, "conn-a");
4752        create_connection(&svc, "conn-b");
4753        create_api_destination(&svc, "dest-1", "conn-a");
4754        create_api_destination(&svc, "dest-2", "conn-b");
4755        create_api_destination(&svc, "dest-3", "conn-a");
4756
4757        let conn_a_arn = {
4758            let _mas = svc.state.read();
4759            let state = _mas.default_ref();
4760            state.connections.get("conn-a").unwrap().arn.clone()
4761        };
4762
4763        let req = make_request(
4764            "ListApiDestinations",
4765            json!({ "ConnectionArn": conn_a_arn }),
4766        );
4767        let resp = svc.list_api_destinations(&req).unwrap();
4768        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4769        let names: Vec<&str> = body["ApiDestinations"]
4770            .as_array()
4771            .unwrap()
4772            .iter()
4773            .map(|d| d["Name"].as_str().unwrap())
4774            .collect();
4775        assert_eq!(names.len(), 2);
4776        assert!(names.contains(&"dest-1"));
4777        assert!(names.contains(&"dest-3"));
4778    }
4779
4780    #[test]
4781    fn list_api_destinations_pagination() {
4782        let svc = make_service();
4783        create_connection(&svc, "my-conn");
4784        for i in 0..5 {
4785            create_api_destination(&svc, &format!("dest-{i:02}"), "my-conn");
4786        }
4787
4788        // First page
4789        let req = make_request("ListApiDestinations", json!({ "Limit": 2 }));
4790        let resp = svc.list_api_destinations(&req).unwrap();
4791        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4792        assert_eq!(body["ApiDestinations"].as_array().unwrap().len(), 2);
4793        let token = body["NextToken"].as_str().unwrap();
4794        assert_eq!(token, "2");
4795
4796        // Second page
4797        let req = make_request(
4798            "ListApiDestinations",
4799            json!({ "Limit": 2, "NextToken": token }),
4800        );
4801        let resp = svc.list_api_destinations(&req).unwrap();
4802        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4803        assert_eq!(body["ApiDestinations"].as_array().unwrap().len(), 2);
4804        let token = body["NextToken"].as_str().unwrap();
4805        assert_eq!(token, "4");
4806
4807        // Last page
4808        let req = make_request(
4809            "ListApiDestinations",
4810            json!({ "Limit": 2, "NextToken": token }),
4811        );
4812        let resp = svc.list_api_destinations(&req).unwrap();
4813        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4814        assert_eq!(body["ApiDestinations"].as_array().unwrap().len(), 1);
4815        assert!(body["NextToken"].is_null());
4816    }
4817
4818    // -- ListEventBuses pagination tests --
4819
4820    fn create_event_bus(svc: &EventBridgeService, name: &str) {
4821        let req = make_request("CreateEventBus", json!({ "Name": name }));
4822        svc.create_event_bus(&req).unwrap();
4823    }
4824
4825    #[test]
4826    fn list_event_buses_pagination() {
4827        let svc = make_service();
4828        // "default" bus already exists, create 4 more
4829        for i in 0..4 {
4830            create_event_bus(&svc, &format!("bus-{i:02}"));
4831        }
4832
4833        // First page: limit 2
4834        let req = make_request("ListEventBuses", json!({ "Limit": 2 }));
4835        let resp = svc.list_event_buses(&req).unwrap();
4836        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4837        assert_eq!(body["EventBuses"].as_array().unwrap().len(), 2);
4838        let token = body["NextToken"].as_str().unwrap();
4839        assert_eq!(token, "2");
4840
4841        // Second page
4842        let req = make_request("ListEventBuses", json!({ "Limit": 2, "NextToken": token }));
4843        let resp = svc.list_event_buses(&req).unwrap();
4844        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4845        assert_eq!(body["EventBuses"].as_array().unwrap().len(), 2);
4846        let token = body["NextToken"].as_str().unwrap();
4847        assert_eq!(token, "4");
4848
4849        // Third page (only 1 remaining)
4850        let req = make_request("ListEventBuses", json!({ "Limit": 2, "NextToken": token }));
4851        let resp = svc.list_event_buses(&req).unwrap();
4852        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4853        assert_eq!(body["EventBuses"].as_array().unwrap().len(), 1);
4854        assert!(body["NextToken"].is_null());
4855    }
4856
4857    #[test]
4858    fn list_event_buses_no_pagination_returns_all() {
4859        let svc = make_service();
4860        create_event_bus(&svc, "bus-alpha");
4861        create_event_bus(&svc, "bus-beta");
4862
4863        let req = make_request("ListEventBuses", json!({}));
4864        let resp = svc.list_event_buses(&req).unwrap();
4865        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4866        // default + 2 custom = 3
4867        assert_eq!(body["EventBuses"].as_array().unwrap().len(), 3);
4868        assert!(body["NextToken"].is_null());
4869    }
4870
4871    // -- PutEvents EndpointId tests --
4872
4873    #[test]
4874    fn put_events_never_includes_endpoint_id_in_response() {
4875        let svc = make_service();
4876        // Even when EndpointId is provided in the request, it must not appear in the response
4877        let req = make_request(
4878            "PutEvents",
4879            json!({
4880                "EndpointId": "my-endpoint.abc123",
4881                "Entries": [{
4882                    "Source": "my.source",
4883                    "DetailType": "MyType",
4884                    "Detail": "{}",
4885                    "EventBusName": "default"
4886                }]
4887            }),
4888        );
4889        let resp = svc.put_events(&req).unwrap();
4890        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4891        assert!(
4892            !body.as_object().unwrap().contains_key("EndpointId"),
4893            "EndpointId should never be in the PutEvents response"
4894        );
4895        assert_eq!(body["FailedEntryCount"], 0);
4896    }
4897
4898    // -- ListArchives pagination tests --
4899
4900    fn create_archive(svc: &EventBridgeService, name: &str) {
4901        let req = make_request(
4902            "CreateArchive",
4903            json!({
4904                "ArchiveName": name,
4905                "EventSourceArn": "arn:aws:events:us-east-1:123456789012:event-bus/default"
4906            }),
4907        );
4908        svc.create_archive(&req).unwrap();
4909    }
4910
4911    #[test]
4912    fn list_archives_pagination() {
4913        let svc = make_service();
4914        for i in 0..5 {
4915            create_archive(&svc, &format!("archive-{i:02}"));
4916        }
4917
4918        // First page: limit 2
4919        let req = make_request("ListArchives", json!({ "Limit": 2 }));
4920        let resp = svc.list_archives(&req).unwrap();
4921        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4922        assert_eq!(body["Archives"].as_array().unwrap().len(), 2);
4923        let token = body["NextToken"].as_str().unwrap();
4924        assert_eq!(token, "2");
4925
4926        // Second page
4927        let req = make_request("ListArchives", json!({ "Limit": 2, "NextToken": token }));
4928        let resp = svc.list_archives(&req).unwrap();
4929        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4930        assert_eq!(body["Archives"].as_array().unwrap().len(), 2);
4931        let token = body["NextToken"].as_str().unwrap();
4932        assert_eq!(token, "4");
4933
4934        // Third page (only 1 remaining)
4935        let req = make_request("ListArchives", json!({ "Limit": 2, "NextToken": token }));
4936        let resp = svc.list_archives(&req).unwrap();
4937        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4938        assert_eq!(body["Archives"].as_array().unwrap().len(), 1);
4939        assert!(body["NextToken"].is_null());
4940    }
4941
4942    // -- ListReplays pagination tests --
4943
4944    fn create_replay(svc: &EventBridgeService, name: &str) {
4945        // Need an archive first for the replay's event source
4946        let archive_arn = {
4947            let guard = svc.state.read();
4948            let st = guard.default_ref();
4949            if st.archives.contains_key("replay-archive") {
4950                st.archives["replay-archive"].arn.clone()
4951            } else {
4952                drop(guard);
4953                create_archive(svc, "replay-archive");
4954                svc.state.read().default_ref().archives["replay-archive"]
4955                    .arn
4956                    .clone()
4957            }
4958        };
4959        let req = make_request(
4960            "StartReplay",
4961            json!({
4962                "ReplayName": name,
4963                "EventSourceArn": archive_arn,
4964                "EventStartTime": 1000000.0,
4965                "EventEndTime": 2000000.0,
4966                "Destination": {
4967                    "Arn": "arn:aws:events:us-east-1:123456789012:event-bus/default"
4968                }
4969            }),
4970        );
4971        svc.start_replay(&req).unwrap();
4972    }
4973
4974    #[test]
4975    fn list_replays_pagination() {
4976        let svc = make_service();
4977        for i in 0..5 {
4978            create_replay(&svc, &format!("replay-{i:02}"));
4979        }
4980
4981        // First page: limit 2
4982        let req = make_request("ListReplays", json!({ "Limit": 2 }));
4983        let resp = svc.list_replays(&req).unwrap();
4984        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4985        assert_eq!(body["Replays"].as_array().unwrap().len(), 2);
4986        let token = body["NextToken"].as_str().unwrap();
4987        assert_eq!(token, "2");
4988
4989        // Second page
4990        let req = make_request("ListReplays", json!({ "Limit": 2, "NextToken": token }));
4991        let resp = svc.list_replays(&req).unwrap();
4992        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4993        assert_eq!(body["Replays"].as_array().unwrap().len(), 2);
4994        let token = body["NextToken"].as_str().unwrap();
4995        assert_eq!(token, "4");
4996
4997        // Third page (only 1 remaining)
4998        let req = make_request("ListReplays", json!({ "Limit": 2, "NextToken": token }));
4999        let resp = svc.list_replays(&req).unwrap();
5000        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5001        assert_eq!(body["Replays"].as_array().unwrap().len(), 1);
5002        assert!(body["NextToken"].is_null());
5003    }
5004
5005    #[test]
5006    fn list_event_buses_invalid_next_token_returns_error() {
5007        let svc = make_service();
5008
5009        let req = make_request("ListEventBuses", json!({ "NextToken": "not-a-number" }));
5010        let result = svc.list_event_buses(&req);
5011        assert!(
5012            result.is_err(),
5013            "non-numeric NextToken should return an error"
5014        );
5015    }
5016
5017    // ---- TestEventPattern tests ----
5018
5019    #[test]
5020    fn test_event_pattern_match() {
5021        let svc = make_service();
5022        let req = make_request(
5023            "TestEventPattern",
5024            json!({
5025                "EventPattern": r#"{"source": ["my.app"]}"#,
5026                "Event": r#"{"source": "my.app", "detail-type": "Test", "detail": {}}"#
5027            }),
5028        );
5029        let resp = svc.test_event_pattern(&req).unwrap();
5030        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5031        assert_eq!(body["Result"], true);
5032    }
5033
5034    #[test]
5035    fn test_event_pattern_no_match() {
5036        let svc = make_service();
5037        let req = make_request(
5038            "TestEventPattern",
5039            json!({
5040                "EventPattern": r#"{"source": ["other.app"]}"#,
5041                "Event": r#"{"source": "my.app", "detail-type": "Test", "detail": {}}"#
5042            }),
5043        );
5044        let resp = svc.test_event_pattern(&req).unwrap();
5045        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5046        assert_eq!(body["Result"], false);
5047    }
5048
5049    #[test]
5050    fn test_event_pattern_detail_match() {
5051        let svc = make_service();
5052        let req = make_request(
5053            "TestEventPattern",
5054            json!({
5055                "EventPattern": r#"{"detail": {"status": ["PLACED"]}}"#,
5056                "Event": r#"{"source": "my.app", "detail-type": "Order", "detail": {"status": "PLACED", "id": "123"}}"#
5057            }),
5058        );
5059        let resp = svc.test_event_pattern(&req).unwrap();
5060        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5061        assert_eq!(body["Result"], true);
5062    }
5063
5064    // ---- UpdateEventBus tests ----
5065
5066    #[test]
5067    fn update_event_bus_description() {
5068        let svc = make_service();
5069        create_event_bus(&svc, "my-bus");
5070
5071        let req = make_request(
5072            "UpdateEventBus",
5073            json!({ "Name": "my-bus", "Description": "Updated desc" }),
5074        );
5075        let resp = svc.update_event_bus(&req).unwrap();
5076        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5077        assert_eq!(body["Name"], "my-bus");
5078
5079        // Verify via describe
5080        let req = make_request("DescribeEventBus", json!({ "Name": "my-bus" }));
5081        let resp = svc.describe_event_bus(&req).unwrap();
5082        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5083        assert_eq!(body["Description"], "Updated desc");
5084    }
5085
5086    #[test]
5087    fn update_event_bus_not_found() {
5088        let svc = make_service();
5089        let req = make_request(
5090            "UpdateEventBus",
5091            json!({ "Name": "ghost-bus", "Description": "nope" }),
5092        );
5093        assert!(svc.update_event_bus(&req).is_err());
5094    }
5095
5096    // ---- Endpoint CRUD tests ----
5097
5098    fn create_endpoint_helper(svc: &EventBridgeService, name: &str) {
5099        let req = make_request(
5100            "CreateEndpoint",
5101            json!({
5102                "Name": name,
5103                "RoutingConfig": {
5104                    "FailoverConfig": {
5105                        "Primary": { "HealthCheck": "" },
5106                        "Secondary": { "Route": "us-west-2" }
5107                    }
5108                },
5109                "EventBuses": [
5110                    { "EventBusArn": "arn:aws:events:us-east-1:123456789012:event-bus/default" }
5111                ]
5112            }),
5113        );
5114        svc.create_endpoint(&req).unwrap();
5115    }
5116
5117    #[test]
5118    fn endpoint_create_describe_delete() {
5119        let svc = make_service();
5120        create_endpoint_helper(&svc, "my-endpoint");
5121
5122        // Describe
5123        let req = make_request("DescribeEndpoint", json!({ "Name": "my-endpoint" }));
5124        let resp = svc.describe_endpoint(&req).unwrap();
5125        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5126        assert_eq!(body["Name"], "my-endpoint");
5127        assert_eq!(body["State"], "ACTIVE");
5128        assert!(body["EndpointId"].as_str().unwrap().contains("my-endpoint"));
5129
5130        // Delete
5131        let req = make_request("DeleteEndpoint", json!({ "Name": "my-endpoint" }));
5132        svc.delete_endpoint(&req).unwrap();
5133
5134        // Verify gone
5135        let req = make_request("DescribeEndpoint", json!({ "Name": "my-endpoint" }));
5136        assert!(svc.describe_endpoint(&req).is_err());
5137    }
5138
5139    #[test]
5140    fn endpoint_list_and_update() {
5141        let svc = make_service();
5142        create_endpoint_helper(&svc, "ep-alpha");
5143        create_endpoint_helper(&svc, "ep-beta");
5144
5145        // List all
5146        let req = make_request("ListEndpoints", json!({}));
5147        let resp = svc.list_endpoints(&req).unwrap();
5148        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5149        assert_eq!(body["Endpoints"].as_array().unwrap().len(), 2);
5150
5151        // Update
5152        let req = make_request(
5153            "UpdateEndpoint",
5154            json!({ "Name": "ep-alpha", "Description": "updated" }),
5155        );
5156        let resp = svc.update_endpoint(&req).unwrap();
5157        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5158        assert_eq!(body["Name"], "ep-alpha");
5159
5160        // Verify description
5161        let req = make_request("DescribeEndpoint", json!({ "Name": "ep-alpha" }));
5162        let resp = svc.describe_endpoint(&req).unwrap();
5163        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5164        assert_eq!(body["Description"], "updated");
5165    }
5166
5167    #[test]
5168    fn endpoint_duplicate_fails() {
5169        let svc = make_service();
5170        create_endpoint_helper(&svc, "dup-ep");
5171        let req = make_request(
5172            "CreateEndpoint",
5173            json!({
5174                "Name": "dup-ep",
5175                "RoutingConfig": {},
5176                "EventBuses": []
5177            }),
5178        );
5179        assert!(svc.create_endpoint(&req).is_err());
5180    }
5181
5182    // ---- DeauthorizeConnection tests ----
5183
5184    #[test]
5185    fn deauthorize_connection_sets_state() {
5186        let svc = make_service();
5187        create_connection(&svc, "deauth-conn");
5188
5189        let req = make_request("DeauthorizeConnection", json!({ "Name": "deauth-conn" }));
5190        let resp = svc.deauthorize_connection(&req).unwrap();
5191        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5192        assert_eq!(body["ConnectionState"], "DEAUTHORIZING");
5193        assert!(body["ConnectionArn"]
5194            .as_str()
5195            .unwrap()
5196            .contains("deauth-conn"));
5197
5198        // Verify via describe
5199        let req = make_request("DescribeConnection", json!({ "Name": "deauth-conn" }));
5200        let resp = svc.describe_connection(&req).unwrap();
5201        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5202        assert_eq!(body["ConnectionState"], "DEAUTHORIZING");
5203    }
5204
5205    #[test]
5206    fn deauthorize_connection_not_found() {
5207        let svc = make_service();
5208        let req = make_request("DeauthorizeConnection", json!({ "Name": "ghost-conn" }));
5209        assert!(svc.deauthorize_connection(&req).is_err());
5210    }
5211
5212    // ---- Partner event source tests ----
5213
5214    #[test]
5215    fn partner_event_source_crud() {
5216        let svc = make_service();
5217
5218        // Create
5219        let req = make_request(
5220            "CreatePartnerEventSource",
5221            json!({ "Name": "partner/test", "Account": "123456789012" }),
5222        );
5223        svc.create_partner_event_source(&req).unwrap();
5224
5225        // Describe
5226        let req = make_request(
5227            "DescribePartnerEventSource",
5228            json!({ "Name": "partner/test" }),
5229        );
5230        let resp = svc.describe_partner_event_source(&req).unwrap();
5231        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5232        assert_eq!(body["Name"], "partner/test");
5233
5234        // List
5235        let req = make_request("ListPartnerEventSources", json!({"NamePrefix": "partner/"}));
5236        let resp = svc.list_partner_event_sources(&req).unwrap();
5237        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5238        assert_eq!(body["PartnerEventSources"].as_array().unwrap().len(), 1);
5239
5240        // ListPartnerEventSourceAccounts
5241        let req = make_request(
5242            "ListPartnerEventSourceAccounts",
5243            json!({ "EventSourceName": "partner/test" }),
5244        );
5245        let resp = svc.list_partner_event_source_accounts(&req).unwrap();
5246        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5247        assert_eq!(
5248            body["PartnerEventSourceAccounts"].as_array().unwrap().len(),
5249            1
5250        );
5251
5252        // DescribeEventSource
5253        let req = make_request("DescribeEventSource", json!({ "Name": "partner/test" }));
5254        let resp = svc.describe_event_source(&req).unwrap();
5255        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5256        assert_eq!(body["Name"], "partner/test");
5257        assert_eq!(body["State"], "ACTIVE");
5258
5259        // ListEventSources
5260        let req = make_request("ListEventSources", json!({}));
5261        let resp = svc.list_event_sources(&req).unwrap();
5262        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5263        assert_eq!(body["EventSources"].as_array().unwrap().len(), 1);
5264
5265        // Delete
5266        let req = make_request(
5267            "DeletePartnerEventSource",
5268            json!({ "Name": "partner/test", "Account": "123456789012" }),
5269        );
5270        svc.delete_partner_event_source(&req).unwrap();
5271
5272        // Verify gone
5273        let req = make_request(
5274            "DescribePartnerEventSource",
5275            json!({ "Name": "partner/test" }),
5276        );
5277        assert!(svc.describe_partner_event_source(&req).is_err());
5278    }
5279
5280    #[test]
5281    fn activate_deactivate_event_source() {
5282        let svc = make_service();
5283
5284        // Create a partner event source first
5285        let req = make_request(
5286            "CreatePartnerEventSource",
5287            json!({ "Name": "aws.partner/test", "Account": "123456789012" }),
5288        );
5289        svc.create_partner_event_source(&req).unwrap();
5290
5291        // Deactivate it
5292        let req = make_request(
5293            "DeactivateEventSource",
5294            json!({ "Name": "aws.partner/test" }),
5295        );
5296        svc.deactivate_event_source(&req).unwrap();
5297        {
5298            let _mas = svc.state.read();
5299            let state = _mas.default_ref();
5300            assert_eq!(
5301                state.partner_event_sources["aws.partner/test"].state,
5302                "INACTIVE"
5303            );
5304        }
5305
5306        // Activate it
5307        let req = make_request("ActivateEventSource", json!({ "Name": "aws.partner/test" }));
5308        svc.activate_event_source(&req).unwrap();
5309        {
5310            let _mas = svc.state.read();
5311            let state = _mas.default_ref();
5312            assert_eq!(
5313                state.partner_event_sources["aws.partner/test"].state,
5314                "ACTIVE"
5315            );
5316        }
5317
5318        // Not-found returns error
5319        let req = make_request("ActivateEventSource", json!({ "Name": "nonexistent" }));
5320        assert!(svc.activate_event_source(&req).is_err());
5321
5322        let req = make_request("DeactivateEventSource", json!({ "Name": "nonexistent" }));
5323        assert!(svc.deactivate_event_source(&req).is_err());
5324    }
5325
5326    #[test]
5327    fn delete_partner_event_source_verifies_account() {
5328        let svc = make_service();
5329
5330        // Create a partner event source
5331        let req = make_request(
5332            "CreatePartnerEventSource",
5333            json!({ "Name": "aws.partner/test", "Account": "123456789012" }),
5334        );
5335        svc.create_partner_event_source(&req).unwrap();
5336
5337        // Deleting with wrong account fails
5338        let req = make_request(
5339            "DeletePartnerEventSource",
5340            json!({ "Name": "aws.partner/test", "Account": "999999999999" }),
5341        );
5342        assert!(svc.delete_partner_event_source(&req).is_err());
5343        // Source still exists
5344        assert!(svc
5345            .state
5346            .read()
5347            .default_ref()
5348            .partner_event_sources
5349            .contains_key("aws.partner/test"));
5350
5351        // Deleting with correct account succeeds
5352        let req = make_request(
5353            "DeletePartnerEventSource",
5354            json!({ "Name": "aws.partner/test", "Account": "123456789012" }),
5355        );
5356        svc.delete_partner_event_source(&req).unwrap();
5357        assert!(!svc
5358            .state
5359            .read()
5360            .default_ref()
5361            .partner_event_sources
5362            .contains_key("aws.partner/test"));
5363
5364        // Deleting non-existent source returns error
5365        let req = make_request(
5366            "DeletePartnerEventSource",
5367            json!({ "Name": "aws.partner/test", "Account": "123456789012" }),
5368        );
5369        assert!(svc.delete_partner_event_source(&req).is_err());
5370    }
5371
5372    #[test]
5373    fn put_partner_events() {
5374        let svc = make_service();
5375        let req = make_request(
5376            "PutPartnerEvents",
5377            json!({
5378                "Entries": [
5379                    { "Source": "partner.app", "DetailType": "Test", "Detail": "{}" }
5380                ]
5381            }),
5382        );
5383        let resp = svc.put_partner_events(&req).unwrap();
5384        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5385        assert_eq!(body["FailedEntryCount"], 0);
5386        assert_eq!(body["Entries"].as_array().unwrap().len(), 1);
5387        assert!(body["Entries"][0]["EventId"].as_str().is_some());
5388    }
5389
5390    // ---- Archive + Replay delivery tests ----
5391
5392    /// Helper: create a service with a mock SQS delivery that records messages.
5393    #[allow(clippy::type_complexity)]
5394    fn make_service_with_sqs_recorder() -> (
5395        EventBridgeService,
5396        Arc<parking_lot::Mutex<Vec<(String, String)>>>,
5397    ) {
5398        use fakecloud_core::delivery::SqsDelivery;
5399
5400        struct RecordingSqsDelivery {
5401            messages: Arc<parking_lot::Mutex<Vec<(String, String)>>>,
5402        }
5403
5404        impl SqsDelivery for RecordingSqsDelivery {
5405            fn deliver_to_queue(
5406                &self,
5407                queue_arn: &str,
5408                message_body: &str,
5409                _attributes: &HashMap<String, String>,
5410            ) {
5411                self.messages
5412                    .lock()
5413                    .push((queue_arn.to_string(), message_body.to_string()));
5414            }
5415        }
5416
5417        let messages: Arc<parking_lot::Mutex<Vec<(String, String)>>> =
5418            Arc::new(parking_lot::Mutex::new(Vec::new()));
5419        let state = Arc::new(RwLock::new(
5420            fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
5421        ));
5422        let delivery = Arc::new(DeliveryBus::new().with_sqs(Arc::new(RecordingSqsDelivery {
5423            messages: messages.clone(),
5424        })));
5425        let svc = EventBridgeService::new(state, delivery);
5426        (svc, messages)
5427    }
5428
5429    #[test]
5430    fn start_replay_delivers_archived_events_to_sqs_target() {
5431        let (svc, messages) = make_service_with_sqs_recorder();
5432        let queue_arn = "arn:aws:sqs:us-east-1:123456789012:replay-queue";
5433
5434        // Create a rule with an SQS target
5435        let req = make_request(
5436            "PutRule",
5437            json!({
5438                "Name": "replay-test-rule",
5439                "EventPattern": r#"{"source": ["my.app"]}"#,
5440                "State": "ENABLED"
5441            }),
5442        );
5443        svc.put_rule(&req).unwrap();
5444
5445        let req = make_request(
5446            "PutTargets",
5447            json!({
5448                "Rule": "replay-test-rule",
5449                "Targets": [{
5450                    "Id": "sqs-target",
5451                    "Arn": queue_arn
5452                }]
5453            }),
5454        );
5455        svc.put_targets(&req).unwrap();
5456
5457        // Create an archive on the default bus
5458        let req = make_request(
5459            "CreateArchive",
5460            json!({
5461                "ArchiveName": "test-archive",
5462                "EventSourceArn": "arn:aws:events:us-east-1:123456789012:event-bus/default"
5463            }),
5464        );
5465        svc.create_archive(&req).unwrap();
5466
5467        // PutEvents: these should get archived and delivered
5468        let req = make_request(
5469            "PutEvents",
5470            json!({
5471                "Entries": [
5472                    {
5473                        "Source": "my.app",
5474                        "DetailType": "OrderCreated",
5475                        "Detail": "{\"orderId\": \"1\"}",
5476                        "EventBusName": "default"
5477                    },
5478                    {
5479                        "Source": "my.app",
5480                        "DetailType": "OrderShipped",
5481                        "Detail": "{\"orderId\": \"2\"}",
5482                        "EventBusName": "default"
5483                    }
5484                ]
5485            }),
5486        );
5487        let resp = svc.put_events(&req).unwrap();
5488        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5489        assert_eq!(body["FailedEntryCount"], 0);
5490
5491        // Verify archive has 2 events
5492        {
5493            let _mas = svc.state.read();
5494            let state = _mas.default_ref();
5495            let archive = state.archives.get("test-archive").unwrap();
5496            assert_eq!(archive.events.len(), 2);
5497            assert_eq!(archive.event_count, 2);
5498        }
5499
5500        // Clear recorded messages from PutEvents delivery
5501        messages.lock().clear();
5502
5503        // StartReplay: should re-deliver the archived events
5504        let archive_arn = {
5505            let _mas = svc.state.read();
5506            let state = _mas.default_ref();
5507            state.archives.get("test-archive").unwrap().arn.clone()
5508        };
5509
5510        // Use a wide time range to capture all events
5511        let start_ts = 0.0_f64;
5512        let end_ts = (chrono::Utc::now().timestamp() + 3600) as f64;
5513
5514        let req = make_request(
5515            "StartReplay",
5516            json!({
5517                "ReplayName": "my-replay",
5518                "EventSourceArn": archive_arn,
5519                "Destination": {
5520                    "Arn": "arn:aws:events:us-east-1:123456789012:event-bus/default"
5521                },
5522                "EventStartTime": start_ts,
5523                "EventEndTime": end_ts
5524            }),
5525        );
5526        let resp = svc.start_replay(&req).unwrap();
5527        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5528        assert_eq!(body["State"], "STARTING");
5529
5530        // Verify the replay delivered events to SQS
5531        let delivered = messages.lock();
5532        assert_eq!(
5533            delivered.len(),
5534            2,
5535            "expected 2 replayed events delivered to SQS"
5536        );
5537        for (arn, msg) in delivered.iter() {
5538            assert_eq!(arn, queue_arn);
5539            let event: Value = serde_json::from_str(msg).unwrap();
5540            assert_eq!(event["source"], "my.app");
5541            // Replayed events should include replay-name
5542            assert!(event["replay-name"].as_str().is_some());
5543        }
5544
5545        // Verify replay is marked as COMPLETED
5546        let _mas = svc.state.read();
5547        let state = _mas.default_ref();
5548        let replay = state.replays.get("my-replay").unwrap();
5549        assert_eq!(replay.state, "COMPLETED");
5550    }
5551
5552    #[test]
5553    fn apply_connection_auth_api_key() {
5554        let conn = Connection {
5555            name: "test-conn".to_string(),
5556            arn: "arn:aws:events:us-east-1:123456789012:connection/test-conn/uuid".to_string(),
5557            description: None,
5558            authorization_type: "API_KEY".to_string(),
5559            auth_parameters: json!({
5560                "ApiKeyAuthParameters": {
5561                    "ApiKeyName": "x-api-key",
5562                    "ApiKeyValue": "my-secret"
5563                }
5564            }),
5565            connection_state: "AUTHORIZED".to_string(),
5566            secret_arn: "arn:aws:secretsmanager:us-east-1:123456789012:secret:test".to_string(),
5567            creation_time: Utc::now(),
5568            last_modified_time: Utc::now(),
5569            last_authorized_time: Utc::now(),
5570        };
5571
5572        let client = reqwest::Client::new();
5573        let builder = client
5574            .post("http://localhost:12345/test")
5575            .header("Content-Type", "application/json");
5576        let builder = apply_connection_auth(builder, &conn);
5577
5578        // Build and verify the header was applied
5579        let request = builder.body("{}").build().unwrap();
5580        assert_eq!(
5581            request
5582                .headers()
5583                .get("x-api-key")
5584                .unwrap()
5585                .to_str()
5586                .unwrap(),
5587            "my-secret"
5588        );
5589    }
5590
5591    #[test]
5592    fn apply_connection_auth_basic() {
5593        let conn = Connection {
5594            name: "basic-conn".to_string(),
5595            arn: "arn:aws:events:us-east-1:123456789012:connection/basic-conn/uuid".to_string(),
5596            description: None,
5597            authorization_type: "BASIC".to_string(),
5598            auth_parameters: json!({
5599                "BasicAuthParameters": {
5600                    "Username": "user",
5601                    "Password": "pass"
5602                }
5603            }),
5604            connection_state: "AUTHORIZED".to_string(),
5605            secret_arn: "arn:aws:secretsmanager:us-east-1:123456789012:secret:test".to_string(),
5606            creation_time: Utc::now(),
5607            last_modified_time: Utc::now(),
5608            last_authorized_time: Utc::now(),
5609        };
5610
5611        let client = reqwest::Client::new();
5612        let builder = client.post("http://localhost:12345/test");
5613        let builder = apply_connection_auth(builder, &conn);
5614
5615        let request = builder.body("{}").build().unwrap();
5616        let auth_header = request
5617            .headers()
5618            .get("authorization")
5619            .unwrap()
5620            .to_str()
5621            .unwrap();
5622        assert!(
5623            auth_header.starts_with("Basic "),
5624            "Expected Basic auth header, got: {auth_header}"
5625        );
5626    }
5627
5628    #[tokio::test]
5629    async fn put_events_with_api_destination_target_resolves_destination() {
5630        // This test verifies that the PutEvents code path correctly identifies
5631        // api-destination ARN targets and resolves the destination metadata.
5632        // The actual HTTP call goes to a non-existent host (fire-and-forget).
5633        let state = Arc::new(RwLock::new(
5634            fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
5635        ));
5636        let delivery = Arc::new(DeliveryBus::new());
5637        let svc = EventBridgeService::new(state, delivery);
5638
5639        // Create connection and api destination
5640        create_connection(&svc, "my-conn");
5641        let conn_arn = {
5642            let _mas = svc.state.read();
5643            let state = _mas.default_ref();
5644            state.connections.get("my-conn").unwrap().arn.clone()
5645        };
5646        let req = make_request(
5647            "CreateApiDestination",
5648            json!({
5649                "Name": "my-dest",
5650                "ConnectionArn": conn_arn,
5651                "InvocationEndpoint": "http://127.0.0.1:1/noop",
5652                "HttpMethod": "POST"
5653            }),
5654        );
5655        svc.create_api_destination(&req).unwrap();
5656
5657        let dest_arn = {
5658            let _mas = svc.state.read();
5659            let state = _mas.default_ref();
5660            state.api_destinations.get("my-dest").unwrap().arn.clone()
5661        };
5662
5663        // Create a rule that targets the api-destination
5664        let req = make_request(
5665            "PutRule",
5666            json!({
5667                "Name": "api-dest-rule",
5668                "EventPattern": r#"{"source":["test.app"]}"#,
5669                "State": "ENABLED"
5670            }),
5671        );
5672        svc.put_rule(&req).unwrap();
5673
5674        let req = make_request(
5675            "PutTargets",
5676            json!({
5677                "Rule": "api-dest-rule",
5678                "Targets": [{ "Id": "dest-target", "Arn": dest_arn }]
5679            }),
5680        );
5681        svc.put_targets(&req).unwrap();
5682
5683        // PutEvents - should match the rule and attempt delivery to ApiDestination
5684        let req = make_request(
5685            "PutEvents",
5686            json!({
5687                "Entries": [{
5688                    "Source": "test.app",
5689                    "DetailType": "TestEvent",
5690                    "Detail": r#"{"key":"value"}"#
5691                }]
5692            }),
5693        );
5694        let resp = svc.put_events(&req).unwrap();
5695        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5696        assert_eq!(body["FailedEntryCount"], 0);
5697        assert_eq!(body["Entries"].as_array().unwrap().len(), 1);
5698        assert!(body["Entries"][0]["EventId"].as_str().is_some());
5699    }
5700
5701    #[test]
5702    fn test_function_name_from_arn() {
5703        // Unqualified ARN
5704        assert_eq!(
5705            super::function_name_from_arn("arn:aws:lambda:us-east-1:123456789012:function:my-func"),
5706            "my-func"
5707        );
5708        // Qualified ARN with alias
5709        assert_eq!(
5710            super::function_name_from_arn(
5711                "arn:aws:lambda:us-east-1:123456789012:function:my-func:prod"
5712            ),
5713            "my-func"
5714        );
5715        // Qualified ARN with version
5716        assert_eq!(
5717            super::function_name_from_arn(
5718                "arn:aws:lambda:us-east-1:123456789012:function:my-func:42"
5719            ),
5720            "my-func"
5721        );
5722        // Plain function name (not an ARN)
5723        assert_eq!(super::function_name_from_arn("my-func"), "my-func");
5724    }
5725
5726    // ── Rules / targets / tags handler tests ────────────────────────
5727
5728    fn put_rule_simple(svc: &EventBridgeService, name: &str) {
5729        let req = make_request(
5730            "PutRule",
5731            json!({ "Name": name, "EventPattern": r#"{"source":["a"]}"# }),
5732        );
5733        svc.put_rule(&req).unwrap();
5734    }
5735
5736    #[test]
5737    fn put_rule_persists_event_pattern_and_state() {
5738        let svc = make_service();
5739        put_rule_simple(&svc, "r1");
5740        let _mas = svc.state.read();
5741        let state = _mas.default_ref();
5742        let rule = state
5743            .rules
5744            .get(&("default".to_string(), "r1".to_string()))
5745            .unwrap();
5746        assert_eq!(rule.state, "ENABLED");
5747        assert!(rule.event_pattern.is_some());
5748        assert!(rule.arn.contains("rule/r1"));
5749    }
5750
5751    #[test]
5752    fn put_rule_rejects_schedule_on_non_default_bus() {
5753        let svc = make_service();
5754        // Create a custom bus first.
5755        let bus_req = make_request("CreateEventBus", json!({ "Name": "custom" }));
5756        svc.create_event_bus(&bus_req).unwrap();
5757
5758        let req = make_request(
5759            "PutRule",
5760            json!({
5761                "Name": "r1",
5762                "EventBusName": "custom",
5763                "ScheduleExpression": "rate(5 minutes)"
5764            }),
5765        );
5766        let err = svc.put_rule(&req).err().expect("expected error");
5767        assert_eq!(err.code(), "ValidationException");
5768    }
5769
5770    #[test]
5771    fn put_rule_rejects_unknown_event_bus() {
5772        let svc = make_service();
5773        let req = make_request(
5774            "PutRule",
5775            json!({ "Name": "r1", "EventBusName": "ghost", "EventPattern": r#"{"source":["a"]}"# }),
5776        );
5777        let err = svc.put_rule(&req).err().expect("expected error");
5778        assert_eq!(err.code(), "ResourceNotFoundException");
5779    }
5780
5781    #[test]
5782    fn put_rule_overlay_preserves_existing_targets() {
5783        let svc = make_service();
5784        put_rule_simple(&svc, "r1");
5785        // Inject a target directly.
5786        {
5787            let mut _mas = svc.state.write();
5788            let state = _mas.default_mut();
5789            let rule = state
5790                .rules
5791                .get_mut(&("default".to_string(), "r1".to_string()))
5792                .unwrap();
5793            rule.targets.push(crate::state::EventTarget {
5794                id: "t1".to_string(),
5795                arn: "arn:aws:sqs:us-east-1:123456789012:q".to_string(),
5796                input: None,
5797                input_path: None,
5798                input_transformer: None,
5799                sqs_parameters: None,
5800            });
5801        }
5802
5803        // Re-PutRule with new description; targets should survive.
5804        let req = make_request(
5805            "PutRule",
5806            json!({ "Name": "r1", "Description": "updated", "EventPattern": r#"{"source":["a"]}"# }),
5807        );
5808        svc.put_rule(&req).unwrap();
5809        let _mas = svc.state.read();
5810        let state = _mas.default_ref();
5811        let rule = state
5812            .rules
5813            .get(&("default".to_string(), "r1".to_string()))
5814            .unwrap();
5815        assert_eq!(rule.description.as_deref(), Some("updated"));
5816        assert_eq!(rule.targets.len(), 1);
5817    }
5818
5819    #[test]
5820    fn delete_rule_with_targets_errors() {
5821        let svc = make_service();
5822        put_rule_simple(&svc, "r1");
5823        let put_targets_req = make_request(
5824            "PutTargets",
5825            json!({
5826                "Rule": "r1",
5827                "Targets": [{ "Id": "t1", "Arn": "arn:aws:sqs:us-east-1:123456789012:q" }]
5828            }),
5829        );
5830        svc.put_targets(&put_targets_req).unwrap();
5831
5832        let req = make_request("DeleteRule", json!({ "Name": "r1" }));
5833        let err = svc.delete_rule(&req).err().expect("expected error");
5834        assert_eq!(err.code(), "ValidationException");
5835    }
5836
5837    #[test]
5838    fn delete_rule_after_remove_targets_succeeds() {
5839        let svc = make_service();
5840        put_rule_simple(&svc, "r1");
5841        let put_t = make_request(
5842            "PutTargets",
5843            json!({
5844                "Rule": "r1",
5845                "Targets": [{ "Id": "t1", "Arn": "arn:aws:sqs:us-east-1:123456789012:q" }]
5846            }),
5847        );
5848        svc.put_targets(&put_t).unwrap();
5849        let rm_t = make_request("RemoveTargets", json!({ "Rule": "r1", "Ids": ["t1"] }));
5850        svc.remove_targets(&rm_t).unwrap();
5851        let del = make_request("DeleteRule", json!({ "Name": "r1" }));
5852        svc.delete_rule(&del).unwrap();
5853        assert!(!svc
5854            .state
5855            .read()
5856            .default_ref()
5857            .rules
5858            .contains_key(&("default".to_string(), "r1".to_string())));
5859    }
5860
5861    #[test]
5862    fn enable_disable_rule_toggles_state() {
5863        let svc = make_service();
5864        put_rule_simple(&svc, "r1");
5865        let dis = make_request("DisableRule", json!({ "Name": "r1" }));
5866        svc.disable_rule(&dis).unwrap();
5867        assert_eq!(
5868            svc.state
5869                .read()
5870                .default_ref()
5871                .rules
5872                .get(&("default".to_string(), "r1".to_string()))
5873                .unwrap()
5874                .state,
5875            "DISABLED"
5876        );
5877        let en = make_request("EnableRule", json!({ "Name": "r1" }));
5878        svc.enable_rule(&en).unwrap();
5879        assert_eq!(
5880            svc.state
5881                .read()
5882                .default_ref()
5883                .rules
5884                .get(&("default".to_string(), "r1".to_string()))
5885                .unwrap()
5886                .state,
5887            "ENABLED"
5888        );
5889    }
5890
5891    #[test]
5892    fn enable_rule_unknown_errors() {
5893        let svc = make_service();
5894        let req = make_request("EnableRule", json!({ "Name": "ghost" }));
5895        let err = svc.enable_rule(&req).err().expect("expected error");
5896        assert_eq!(err.code(), "ResourceNotFoundException");
5897    }
5898
5899    #[test]
5900    fn list_rules_with_name_prefix_filter() {
5901        let svc = make_service();
5902        put_rule_simple(&svc, "prod-orders");
5903        put_rule_simple(&svc, "prod-shipping");
5904        put_rule_simple(&svc, "dev-orders");
5905
5906        let req = make_request("ListRules", json!({ "NamePrefix": "prod-" }));
5907        let resp = svc.list_rules(&req).unwrap();
5908        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5909        let names: Vec<&str> = body["Rules"]
5910            .as_array()
5911            .unwrap()
5912            .iter()
5913            .map(|r| r["Name"].as_str().unwrap())
5914            .collect();
5915        assert_eq!(names.len(), 2);
5916        assert!(names.iter().all(|n| n.starts_with("prod-")));
5917    }
5918
5919    #[test]
5920    fn list_rules_pagination_emits_next_token() {
5921        let svc = make_service();
5922        for i in 0..5 {
5923            put_rule_simple(&svc, &format!("r{i}"));
5924        }
5925        let req = make_request("ListRules", json!({ "Limit": 2 }));
5926        let resp = svc.list_rules(&req).unwrap();
5927        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5928        assert_eq!(body["Rules"].as_array().unwrap().len(), 2);
5929        assert!(body["NextToken"].is_string());
5930    }
5931
5932    #[test]
5933    fn describe_rule_returns_persisted_fields() {
5934        let svc = make_service();
5935        let put = make_request(
5936            "PutRule",
5937            json!({
5938                "Name": "r1",
5939                "EventPattern": r#"{"source":["a"]}"#,
5940                "Description": "hi",
5941                "State": "DISABLED"
5942            }),
5943        );
5944        svc.put_rule(&put).unwrap();
5945        let desc = make_request("DescribeRule", json!({ "Name": "r1" }));
5946        let resp = svc.describe_rule(&desc).unwrap();
5947        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5948        assert_eq!(body["Name"], json!("r1"));
5949        assert_eq!(body["State"], json!("DISABLED"));
5950        assert_eq!(body["Description"], json!("hi"));
5951    }
5952
5953    #[test]
5954    fn describe_rule_unknown_errors() {
5955        let svc = make_service();
5956        let req = make_request("DescribeRule", json!({ "Name": "ghost" }));
5957        let err = svc.describe_rule(&req).err().expect("expected error");
5958        assert_eq!(err.code(), "ResourceNotFoundException");
5959    }
5960
5961    #[test]
5962    fn put_targets_rejects_fifo_without_sqs_parameters() {
5963        let svc = make_service();
5964        put_rule_simple(&svc, "r1");
5965        let req = make_request(
5966            "PutTargets",
5967            json!({
5968                "Rule": "r1",
5969                "Targets": [{ "Id": "t1", "Arn": "arn:aws:sqs:us-east-1:123456789012:q.fifo" }]
5970            }),
5971        );
5972        let err = svc.put_targets(&req).err().expect("expected error");
5973        assert_eq!(err.code(), "ValidationException");
5974    }
5975
5976    #[test]
5977    fn put_targets_rejects_invalid_arn() {
5978        let svc = make_service();
5979        put_rule_simple(&svc, "r1");
5980        let req = make_request(
5981            "PutTargets",
5982            json!({
5983                "Rule": "r1",
5984                "Targets": [{ "Id": "t1", "Arn": "not-an-arn" }]
5985            }),
5986        );
5987        let err = svc.put_targets(&req).err().expect("expected error");
5988        assert_eq!(err.code(), "ValidationException");
5989    }
5990
5991    #[test]
5992    fn put_targets_unknown_rule_errors() {
5993        let svc = make_service();
5994        let req = make_request(
5995            "PutTargets",
5996            json!({
5997                "Rule": "ghost",
5998                "Targets": [{ "Id": "t1", "Arn": "arn:aws:sqs:us-east-1:123456789012:q" }]
5999            }),
6000        );
6001        let err = svc.put_targets(&req).err().expect("expected error");
6002        assert_eq!(err.code(), "ResourceNotFoundException");
6003    }
6004
6005    #[test]
6006    fn put_targets_replaces_existing_with_same_id() {
6007        let svc = make_service();
6008        put_rule_simple(&svc, "r1");
6009        let first = make_request(
6010            "PutTargets",
6011            json!({
6012                "Rule": "r1",
6013                "Targets": [{ "Id": "t1", "Arn": "arn:aws:sqs:us-east-1:123456789012:q1" }]
6014            }),
6015        );
6016        svc.put_targets(&first).unwrap();
6017        let second = make_request(
6018            "PutTargets",
6019            json!({
6020                "Rule": "r1",
6021                "Targets": [{ "Id": "t1", "Arn": "arn:aws:sqs:us-east-1:123456789012:q2" }]
6022            }),
6023        );
6024        svc.put_targets(&second).unwrap();
6025
6026        let _mas = svc.state.read();
6027        let state = _mas.default_ref();
6028        let rule = state
6029            .rules
6030            .get(&("default".to_string(), "r1".to_string()))
6031            .unwrap();
6032        assert_eq!(rule.targets.len(), 1);
6033        assert!(rule.targets[0].arn.ends_with("q2"));
6034    }
6035
6036    #[test]
6037    fn list_targets_by_rule_returns_pagination_token() {
6038        let svc = make_service();
6039        put_rule_simple(&svc, "r1");
6040        for i in 0..4 {
6041            let req = make_request(
6042                "PutTargets",
6043                json!({
6044                    "Rule": "r1",
6045                    "Targets": [{
6046                        "Id": format!("t{i}"),
6047                        "Arn": format!("arn:aws:sqs:us-east-1:123456789012:q{i}")
6048                    }]
6049                }),
6050            );
6051            svc.put_targets(&req).unwrap();
6052        }
6053        let req = make_request("ListTargetsByRule", json!({ "Rule": "r1", "Limit": 2 }));
6054        let resp = svc.list_targets_by_rule(&req).unwrap();
6055        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6056        assert_eq!(body["Targets"].as_array().unwrap().len(), 2);
6057        assert!(body["NextToken"].is_string());
6058    }
6059
6060    #[test]
6061    fn list_rule_names_by_target_groups_by_arn() {
6062        let svc = make_service();
6063        put_rule_simple(&svc, "r1");
6064        put_rule_simple(&svc, "r2");
6065        for rule in ["r1", "r2"] {
6066            let req = make_request(
6067                "PutTargets",
6068                json!({
6069                    "Rule": rule,
6070                    "Targets": [{
6071                        "Id": "t1",
6072                        "Arn": "arn:aws:sqs:us-east-1:123456789012:shared"
6073                    }]
6074                }),
6075            );
6076            svc.put_targets(&req).unwrap();
6077        }
6078        let req = make_request(
6079            "ListRuleNamesByTarget",
6080            json!({ "TargetArn": "arn:aws:sqs:us-east-1:123456789012:shared" }),
6081        );
6082        let resp = svc.list_rule_names_by_target(&req).unwrap();
6083        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6084        let names: Vec<&str> = body["RuleNames"]
6085            .as_array()
6086            .unwrap()
6087            .iter()
6088            .map(|v| v.as_str().unwrap())
6089            .collect();
6090        assert_eq!(names, vec!["r1", "r2"]);
6091    }
6092
6093    // ── Tag operations ───────────────────────────────────────────────
6094
6095    #[test]
6096    fn tag_then_list_tags_for_rule() {
6097        let svc = make_service();
6098        put_rule_simple(&svc, "r1");
6099        let arn = svc
6100            .state
6101            .read()
6102            .default_ref()
6103            .rules
6104            .get(&("default".to_string(), "r1".to_string()))
6105            .unwrap()
6106            .arn
6107            .clone();
6108
6109        let tag_req = make_request(
6110            "TagResource",
6111            json!({
6112                "ResourceARN": arn,
6113                "Tags": [{ "Key": "env", "Value": "prod" }]
6114            }),
6115        );
6116        svc.tag_resource(&tag_req).unwrap();
6117
6118        let list_req = make_request("ListTagsForResource", json!({ "ResourceARN": arn }));
6119        let resp = svc.list_tags_for_resource(&list_req).unwrap();
6120        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6121        let tags = body["Tags"].as_array().unwrap();
6122        assert_eq!(tags.len(), 1);
6123        assert_eq!(tags[0]["Key"], json!("env"));
6124        assert_eq!(tags[0]["Value"], json!("prod"));
6125    }
6126
6127    #[test]
6128    fn untag_resource_removes_listed_keys() {
6129        let svc = make_service();
6130        put_rule_simple(&svc, "r1");
6131        let arn = svc
6132            .state
6133            .read()
6134            .default_ref()
6135            .rules
6136            .get(&("default".to_string(), "r1".to_string()))
6137            .unwrap()
6138            .arn
6139            .clone();
6140        let tag_req = make_request(
6141            "TagResource",
6142            json!({
6143                "ResourceARN": &arn,
6144                "Tags": [{ "Key": "env", "Value": "prod" }, { "Key": "team", "Value": "core" }]
6145            }),
6146        );
6147        svc.tag_resource(&tag_req).unwrap();
6148
6149        let untag = make_request(
6150            "UntagResource",
6151            json!({ "ResourceARN": &arn, "TagKeys": ["env"] }),
6152        );
6153        svc.untag_resource(&untag).unwrap();
6154
6155        let _mas = svc.state.read();
6156        let state = _mas.default_ref();
6157        let rule = state
6158            .rules
6159            .get(&("default".to_string(), "r1".to_string()))
6160            .unwrap();
6161        assert!(!rule.tags.contains_key("env"));
6162        assert_eq!(rule.tags.get("team").map(String::as_str), Some("core"));
6163    }
6164
6165    // ── TestEventPattern ─────────────────────────────────────────────
6166
6167    #[test]
6168    fn test_event_pattern_returns_result_field() {
6169        let svc = make_service();
6170        let req = make_request(
6171            "TestEventPattern",
6172            json!({
6173                "EventPattern": r#"{"source":["my.app"]}"#,
6174                "Event": r#"{"source":"my.app","detail-type":"x","detail":{}}"#
6175            }),
6176        );
6177        let resp = svc.test_event_pattern(&req).unwrap();
6178        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6179        assert_eq!(body["Result"], json!(true));
6180    }
6181
6182    // ── Event bus describe / delete ──────────────────────────────────
6183
6184    #[test]
6185    fn describe_event_bus_default_returns_arn() {
6186        let svc = make_service();
6187        let req = make_request("DescribeEventBus", json!({}));
6188        let resp = svc.describe_event_bus(&req).unwrap();
6189        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6190        assert_eq!(body["Name"], json!("default"));
6191        assert!(body["Arn"].as_str().unwrap().contains("event-bus/default"));
6192    }
6193
6194    #[test]
6195    fn delete_event_bus_default_fails() {
6196        let svc = make_service();
6197        let req = make_request("DeleteEventBus", json!({ "Name": "default" }));
6198        let err = svc.delete_event_bus(&req).err().expect("expected error");
6199        assert_eq!(err.code(), "ValidationException");
6200    }
6201
6202    // ── Error branch tests ──
6203
6204    #[test]
6205    fn describe_rule_not_found() {
6206        let svc = make_service();
6207        let req = make_request("DescribeRule", json!({"Name": "nonexistent"}));
6208        let err = svc.describe_rule(&req).err().expect("expected error");
6209        assert_eq!(err.code(), "ResourceNotFoundException");
6210    }
6211
6212    #[test]
6213    fn delete_rule_nonexistent_is_noop() {
6214        let svc = make_service();
6215        let req = make_request("DeleteRule", json!({"Name": "nope"}));
6216        // EventBridge returns success for deleting nonexistent rules
6217        svc.delete_rule(&req).unwrap();
6218    }
6219
6220    #[test]
6221    fn put_targets_rule_not_found() {
6222        let svc = make_service();
6223        let req = make_request(
6224            "PutTargets",
6225            json!({"Rule": "ghost", "Targets": [{"Id": "t1", "Arn": "arn:a"}]}),
6226        );
6227        let err = svc.put_targets(&req).err().expect("expected error");
6228        assert_eq!(err.code(), "ResourceNotFoundException");
6229    }
6230
6231    #[test]
6232    fn remove_targets_rule_not_found() {
6233        let svc = make_service();
6234        let req = make_request("RemoveTargets", json!({"Rule": "ghost", "Ids": ["t1"]}));
6235        let err = svc.remove_targets(&req).err().expect("expected error");
6236        assert_eq!(err.code(), "ResourceNotFoundException");
6237    }
6238
6239    #[test]
6240    fn list_targets_by_rule_not_found() {
6241        let svc = make_service();
6242        let req = make_request("ListTargetsByRule", json!({"Rule": "ghost"}));
6243        let err = svc
6244            .list_targets_by_rule(&req)
6245            .err()
6246            .expect("expected error");
6247        assert_eq!(err.code(), "ResourceNotFoundException");
6248    }
6249
6250    #[test]
6251    fn enable_rule_not_found() {
6252        let svc = make_service();
6253        let req = make_request("EnableRule", json!({"Name": "ghost"}));
6254        let err = svc.enable_rule(&req).err().expect("expected error");
6255        assert_eq!(err.code(), "ResourceNotFoundException");
6256    }
6257
6258    #[test]
6259    fn disable_rule_not_found() {
6260        let svc = make_service();
6261        let req = make_request("DisableRule", json!({"Name": "ghost"}));
6262        let err = svc.disable_rule(&req).err().expect("expected error");
6263        assert_eq!(err.code(), "ResourceNotFoundException");
6264    }
6265
6266    #[test]
6267    fn describe_event_bus_not_found() {
6268        let svc = make_service();
6269        let req = make_request("DescribeEventBus", json!({"Name": "nonexistent-bus"}));
6270        let err = svc.describe_event_bus(&req).err().expect("expected error");
6271        assert_eq!(err.code(), "ResourceNotFoundException");
6272    }
6273
6274    #[test]
6275    fn tag_resource_not_found() {
6276        let svc = make_service();
6277        let req = make_request(
6278            "TagResource",
6279            json!({"ResourceARN": "arn:aws:events:us-east-1:123:nope", "Tags": [{"Key": "k", "Value": "v"}]}),
6280        );
6281        let err = svc.tag_resource(&req).err().expect("expected error");
6282        assert_eq!(err.code(), "ResourceNotFoundException");
6283    }
6284
6285    #[test]
6286    fn untag_resource_not_found() {
6287        let svc = make_service();
6288        let req = make_request(
6289            "UntagResource",
6290            json!({"ResourceARN": "arn:aws:events:us-east-1:123:nope", "TagKeys": ["k"]}),
6291        );
6292        let err = svc.untag_resource(&req).err().expect("expected error");
6293        assert_eq!(err.code(), "ResourceNotFoundException");
6294    }
6295
6296    #[test]
6297    fn describe_archive_not_found() {
6298        let svc = make_service();
6299        let req = make_request("DescribeArchive", json!({"ArchiveName": "ghost"}));
6300        let err = svc.describe_archive(&req).err().expect("expected error");
6301        assert_eq!(err.code(), "ResourceNotFoundException");
6302    }
6303
6304    #[test]
6305    fn delete_archive_not_found() {
6306        let svc = make_service();
6307        let req = make_request("DeleteArchive", json!({"ArchiveName": "ghost"}));
6308        let err = svc.delete_archive(&req).err().expect("expected error");
6309        assert_eq!(err.code(), "ResourceNotFoundException");
6310    }
6311
6312    #[test]
6313    fn describe_connection_not_found() {
6314        let svc = make_service();
6315        let req = make_request("DescribeConnection", json!({"Name": "ghost"}));
6316        let err = svc.describe_connection(&req).err().expect("expected error");
6317        assert_eq!(err.code(), "ResourceNotFoundException");
6318    }
6319
6320    #[test]
6321    fn describe_api_destination_not_found() {
6322        let svc = make_service();
6323        let req = make_request("DescribeApiDestination", json!({"Name": "ghost"}));
6324        let err = svc
6325            .describe_api_destination(&req)
6326            .err()
6327            .expect("expected error");
6328        assert_eq!(err.code(), "ResourceNotFoundException");
6329    }
6330
6331    #[test]
6332    fn describe_replay_not_found() {
6333        let svc = make_service();
6334        let req = make_request("DescribeReplay", json!({"ReplayName": "ghost"}));
6335        let err = svc.describe_replay(&req).err().expect("expected error");
6336        assert_eq!(err.code(), "ResourceNotFoundException");
6337    }
6338
6339    #[test]
6340    fn create_event_bus_duplicate() {
6341        let svc = make_service();
6342        let req = make_request("CreateEventBus", json!({"Name": "dup-bus"}));
6343        svc.create_event_bus(&req).unwrap();
6344        let err = svc.create_event_bus(&req).err().expect("expected error");
6345        assert_eq!(err.code(), "ResourceAlreadyExistsException");
6346    }
6347
6348    // ── Rule lifecycle ──
6349
6350    #[test]
6351    fn rule_put_describe_enable_disable_delete() {
6352        let svc = make_service();
6353        svc.put_rule(&make_request(
6354            "PutRule",
6355            json!({"Name": "my-rule", "EventPattern": "{\"source\":[\"aws.s3\"]}", "State": "ENABLED"}),
6356        ))
6357        .unwrap();
6358
6359        let resp = svc
6360            .describe_rule(&make_request("DescribeRule", json!({"Name": "my-rule"})))
6361            .unwrap();
6362        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6363        assert_eq!(body["State"], "ENABLED");
6364
6365        svc.disable_rule(&make_request("DisableRule", json!({"Name": "my-rule"})))
6366            .unwrap();
6367        svc.enable_rule(&make_request("EnableRule", json!({"Name": "my-rule"})))
6368            .unwrap();
6369        svc.delete_rule(&make_request("DeleteRule", json!({"Name": "my-rule"})))
6370            .unwrap();
6371    }
6372
6373    #[test]
6374    fn list_rules_returns_created() {
6375        let svc = make_service();
6376        for name in &["r1", "r2", "r3"] {
6377            svc.put_rule(&make_request(
6378                "PutRule",
6379                json!({"Name": name, "EventPattern": "{\"source\":[\"aws.s3\"]}"}),
6380            ))
6381            .unwrap();
6382        }
6383        let resp = svc
6384            .list_rules(&make_request("ListRules", json!({})))
6385            .unwrap();
6386        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6387        assert_eq!(body["Rules"].as_array().unwrap().len(), 3);
6388    }
6389
6390    // ── Targets ──
6391
6392    #[test]
6393    fn put_list_remove_targets() {
6394        let svc = make_service();
6395        svc.put_rule(&make_request(
6396            "PutRule",
6397            json!({"Name": "tr", "EventPattern": "{\"source\":[\"aws.s3\"]}"}),
6398        ))
6399        .unwrap();
6400
6401        svc.put_targets(&make_request(
6402            "PutTargets",
6403            json!({
6404                "Rule": "tr",
6405                "Targets": [
6406                    {"Id": "t1", "Arn": "arn:aws:sqs:us-east-1:123456789012:q1"},
6407                    {"Id": "t2", "Arn": "arn:aws:lambda:us-east-1:123456789012:function:fn1"},
6408                ]
6409            }),
6410        ))
6411        .unwrap();
6412
6413        let resp = svc
6414            .list_targets_by_rule(&make_request("ListTargetsByRule", json!({"Rule": "tr"})))
6415            .unwrap();
6416        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6417        assert_eq!(body["Targets"].as_array().unwrap().len(), 2);
6418
6419        svc.remove_targets(&make_request(
6420            "RemoveTargets",
6421            json!({"Rule": "tr", "Ids": ["t1"]}),
6422        ))
6423        .unwrap();
6424
6425        let resp = svc
6426            .list_targets_by_rule(&make_request("ListTargetsByRule", json!({"Rule": "tr"})))
6427            .unwrap();
6428        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6429        assert_eq!(body["Targets"].as_array().unwrap().len(), 1);
6430    }
6431
6432    // ── PutEvents ──
6433
6434    #[test]
6435    fn put_events_basic() {
6436        let svc = make_service();
6437        let resp = svc
6438            .put_events(&make_request(
6439                "PutEvents",
6440                json!({
6441                    "Entries": [
6442                        {"Source": "aws.s3", "DetailType": "Object Created", "Detail": "{}"},
6443                    ]
6444                }),
6445            ))
6446            .unwrap();
6447        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6448        assert_eq!(body["FailedEntryCount"], 0);
6449    }
6450
6451    // ── Archives ──
6452
6453    #[test]
6454    fn archive_create_describe_list_delete() {
6455        let svc = make_service();
6456
6457        svc.create_archive(&make_request(
6458            "CreateArchive",
6459            json!({
6460                "ArchiveName": "my-archive",
6461                "EventSourceArn": "arn:aws:events:us-east-1:123456789012:event-bus/default",
6462            }),
6463        ))
6464        .unwrap();
6465
6466        let resp = svc
6467            .describe_archive(&make_request(
6468                "DescribeArchive",
6469                json!({"ArchiveName": "my-archive"}),
6470            ))
6471            .unwrap();
6472        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6473        assert_eq!(body["ArchiveName"], "my-archive");
6474
6475        let resp = svc
6476            .list_archives(&make_request("ListArchives", json!({})))
6477            .unwrap();
6478        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6479        assert!(!body["Archives"].as_array().unwrap().is_empty());
6480
6481        svc.delete_archive(&make_request(
6482            "DeleteArchive",
6483            json!({"ArchiveName": "my-archive"}),
6484        ))
6485        .unwrap();
6486    }
6487
6488    // ── Connections ──
6489
6490    #[test]
6491    fn connection_create_list_describe_deauthorize() {
6492        let svc = make_service();
6493
6494        svc.create_connection(&make_request(
6495            "CreateConnection",
6496            json!({
6497                "Name": "my-conn",
6498                "AuthorizationType": "API_KEY",
6499                "AuthParameters": {
6500                    "ApiKeyAuthParameters": {"ApiKeyName": "x-key", "ApiKeyValue": "secret"}
6501                }
6502            }),
6503        ))
6504        .unwrap();
6505
6506        let resp = svc
6507            .list_connections(&make_request("ListConnections", json!({})))
6508            .unwrap();
6509        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6510        assert!(!body["Connections"].as_array().unwrap().is_empty());
6511
6512        svc.describe_connection(&make_request(
6513            "DescribeConnection",
6514            json!({"Name": "my-conn"}),
6515        ))
6516        .unwrap();
6517        svc.deauthorize_connection(&make_request(
6518            "DeauthorizeConnection",
6519            json!({"Name": "my-conn"}),
6520        ))
6521        .unwrap();
6522    }
6523
6524    // ── Event bus list ──
6525
6526    #[test]
6527    fn list_event_buses_returns_default_and_custom() {
6528        let svc = make_service();
6529        svc.create_event_bus(&make_request(
6530            "CreateEventBus",
6531            json!({"Name": "custom-bus"}),
6532        ))
6533        .unwrap();
6534
6535        let resp = svc
6536            .list_event_buses(&make_request("ListEventBuses", json!({})))
6537            .unwrap();
6538        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6539        let names: Vec<&str> = body["EventBuses"]
6540            .as_array()
6541            .unwrap()
6542            .iter()
6543            .map(|v| v["Name"].as_str().unwrap())
6544            .collect();
6545        assert!(names.contains(&"default"));
6546        assert!(names.contains(&"custom-bus"));
6547    }
6548
6549    // ── Tags ──
6550
6551    #[test]
6552    fn tag_list_untag_rule_resource() {
6553        let svc = make_service();
6554        svc.put_rule(&make_request(
6555            "PutRule",
6556            json!({"Name": "tagged-rule", "EventPattern": "{\"source\":[\"aws.s3\"]}"}),
6557        ))
6558        .unwrap();
6559
6560        let arn = "arn:aws:events:us-east-1:123456789012:rule/tagged-rule";
6561
6562        svc.tag_resource(&make_request(
6563            "TagResource",
6564            json!({"ResourceARN": arn, "Tags": [{"Key": "env", "Value": "prod"}]}),
6565        ))
6566        .unwrap();
6567
6568        let resp = svc
6569            .list_tags_for_resource(&make_request(
6570                "ListTagsForResource",
6571                json!({"ResourceARN": arn}),
6572            ))
6573            .unwrap();
6574        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6575        assert_eq!(body["Tags"].as_array().unwrap().len(), 1);
6576
6577        svc.untag_resource(&make_request(
6578            "UntagResource",
6579            json!({"ResourceARN": arn, "TagKeys": ["env"]}),
6580        ))
6581        .unwrap();
6582    }
6583
6584    // ── put_permission / remove_permission ──
6585
6586    #[test]
6587    fn put_permission_with_policy_json() {
6588        let svc = make_service();
6589        let policy = r#"{"Version":"2012-10-17","Statement":[]}"#;
6590        let req = make_request("PutPermission", json!({"Policy": policy}));
6591        svc.put_permission(&req).unwrap();
6592    }
6593
6594    #[test]
6595    fn put_permission_invalid_action_errors() {
6596        let svc = make_service();
6597        let req = make_request(
6598            "PutPermission",
6599            json!({
6600                "Action": "events:NotARealAction",
6601                "Principal": "123456789012",
6602                "StatementId": "s1"
6603            }),
6604        );
6605        assert!(svc.put_permission(&req).is_err());
6606    }
6607
6608    #[test]
6609    fn put_permission_unknown_bus_errors() {
6610        let svc = make_service();
6611        let req = make_request(
6612            "PutPermission",
6613            json!({
6614                "EventBusName": "missing",
6615                "Action": "events:PutEvents",
6616                "Principal": "123456789012",
6617                "StatementId": "s1"
6618            }),
6619        );
6620        assert!(svc.put_permission(&req).is_err());
6621    }
6622
6623    #[test]
6624    fn put_permission_add_and_remove_statement() {
6625        let svc = make_service();
6626        let req = make_request(
6627            "PutPermission",
6628            json!({
6629                "Action": "events:PutEvents",
6630                "Principal": "123456789012",
6631                "StatementId": "s1"
6632            }),
6633        );
6634        svc.put_permission(&req).unwrap();
6635
6636        let req = make_request("RemovePermission", json!({"StatementId": "s1"}));
6637        svc.remove_permission(&req).unwrap();
6638    }
6639
6640    #[test]
6641    fn remove_permission_remove_all_flag() {
6642        let svc = make_service();
6643        let req = make_request(
6644            "PutPermission",
6645            json!({
6646                "Action": "events:PutEvents",
6647                "Principal": "123456789012",
6648                "StatementId": "s1"
6649            }),
6650        );
6651        svc.put_permission(&req).unwrap();
6652
6653        let req = make_request("RemovePermission", json!({"RemoveAllPermissions": true}));
6654        svc.remove_permission(&req).unwrap();
6655    }
6656
6657    #[test]
6658    fn remove_permission_unknown_bus_errors() {
6659        let svc = make_service();
6660        let req = make_request(
6661            "RemovePermission",
6662            json!({"EventBusName": "missing", "StatementId": "s1"}),
6663        );
6664        assert!(svc.remove_permission(&req).is_err());
6665    }
6666
6667    #[test]
6668    fn remove_permission_no_policy_errors() {
6669        let svc = make_service();
6670        let req = make_request("RemovePermission", json!({"StatementId": "s1"}));
6671        assert!(svc.remove_permission(&req).is_err());
6672    }
6673
6674    #[test]
6675    fn remove_permission_unknown_statement_errors() {
6676        let svc = make_service();
6677        svc.put_permission(&make_request(
6678            "PutPermission",
6679            json!({
6680                "Action": "events:PutEvents",
6681                "Principal": "123456789012",
6682                "StatementId": "s1"
6683            }),
6684        ))
6685        .unwrap();
6686
6687        let req = make_request("RemovePermission", json!({"StatementId": "ghost"}));
6688        assert!(svc.remove_permission(&req).is_err());
6689    }
6690
6691    // ── put_rule invalid schedule expression ──
6692
6693    #[test]
6694    fn put_rule_missing_name_errors() {
6695        let svc = make_service();
6696        let req = make_request("PutRule", json!({}));
6697        assert!(svc.put_rule(&req).is_err());
6698    }
6699
6700    #[test]
6701    fn put_rule_name_too_long_errors() {
6702        let svc = make_service();
6703        let name = "x".repeat(65);
6704        let req = make_request("PutRule", json!({"Name": name}));
6705        assert!(svc.put_rule(&req).is_err());
6706    }
6707
6708    #[test]
6709    fn put_rule_invalid_state_errors() {
6710        let svc = make_service();
6711        let req = make_request("PutRule", json!({"Name": "r1", "State": "BOGUS"}));
6712        assert!(svc.put_rule(&req).is_err());
6713    }
6714
6715    // ── create_connection variants ──
6716
6717    #[test]
6718    fn create_connection_api_key_auth() {
6719        let svc = make_service();
6720        let req = make_request(
6721            "CreateConnection",
6722            json!({
6723                "Name": "conn-apikey",
6724                "AuthorizationType": "API_KEY",
6725                "AuthParameters": {
6726                    "ApiKeyAuthParameters": {
6727                        "ApiKeyName": "X-Api-Key",
6728                        "ApiKeyValue": "secret"
6729                    }
6730                }
6731            }),
6732        );
6733        svc.create_connection(&req).unwrap();
6734    }
6735
6736    #[test]
6737    fn create_connection_basic_auth() {
6738        let svc = make_service();
6739        let req = make_request(
6740            "CreateConnection",
6741            json!({
6742                "Name": "conn-basic",
6743                "AuthorizationType": "BASIC",
6744                "AuthParameters": {
6745                    "BasicAuthParameters": {
6746                        "Username": "u",
6747                        "Password": "p"
6748                    }
6749                }
6750            }),
6751        );
6752        svc.create_connection(&req).unwrap();
6753    }
6754
6755    #[test]
6756    fn create_connection_missing_name_errors() {
6757        let svc = make_service();
6758        let req = make_request("CreateConnection", json!({"AuthorizationType": "API_KEY"}));
6759        assert!(svc.create_connection(&req).is_err());
6760    }
6761
6762    #[test]
6763    fn create_connection_missing_auth_type_errors() {
6764        let svc = make_service();
6765        let req = make_request("CreateConnection", json!({"Name": "c-noauth"}));
6766        assert!(svc.create_connection(&req).is_err());
6767    }
6768
6769    #[test]
6770    fn delete_connection_not_found() {
6771        let svc = make_service();
6772        let req = make_request("DeleteConnection", json!({"Name": "ghost"}));
6773        assert!(svc.delete_connection(&req).is_err());
6774    }
6775
6776    // ── api destination validation ──
6777
6778    #[test]
6779    fn create_api_destination_missing_name_errors() {
6780        let svc = make_service();
6781        let req = make_request(
6782            "CreateApiDestination",
6783            json!({
6784                "ConnectionArn": "arn:aws:events:us-east-1:123456789012:connection/c",
6785                "InvocationEndpoint": "https://example.com",
6786                "HttpMethod": "POST"
6787            }),
6788        );
6789        assert!(svc.create_api_destination(&req).is_err());
6790    }
6791
6792    #[test]
6793    fn create_api_destination_invalid_method_errors() {
6794        let svc = make_service();
6795        create_connection(&svc, "conn-m");
6796        let guard = svc.state.read();
6797        let st = guard.default_ref();
6798        let conn_arn = st
6799            .connections
6800            .get("conn-m")
6801            .map(|c| c.arn.clone())
6802            .unwrap_or_default();
6803        drop(guard);
6804
6805        let req = make_request(
6806            "CreateApiDestination",
6807            json!({
6808                "Name": "d1",
6809                "ConnectionArn": conn_arn,
6810                "InvocationEndpoint": "https://example.com",
6811                "HttpMethod": "FLY"
6812            }),
6813        );
6814        assert!(svc.create_api_destination(&req).is_err());
6815    }
6816
6817    #[test]
6818    fn delete_api_destination_not_found() {
6819        let svc = make_service();
6820        let req = make_request("DeleteApiDestination", json!({"Name": "ghost"}));
6821        assert!(svc.delete_api_destination(&req).is_err());
6822    }
6823
6824    // ── archive error paths ──
6825
6826    #[test]
6827    fn create_archive_missing_name_errors() {
6828        let svc = make_service();
6829        let req = make_request(
6830            "CreateArchive",
6831            json!({"EventSourceArn": "arn:aws:events:us-east-1:123456789012:event-bus/default"}),
6832        );
6833        assert!(svc.create_archive(&req).is_err());
6834    }
6835
6836    #[test]
6837    fn create_archive_missing_source_arn_errors() {
6838        let svc = make_service();
6839        let req = make_request("CreateArchive", json!({"ArchiveName": "arc1"}));
6840        assert!(svc.create_archive(&req).is_err());
6841    }
6842
6843    #[test]
6844    fn delete_archive_missing_errors() {
6845        let svc = make_service();
6846        let req = make_request("DeleteArchive", json!({"ArchiveName": "ghost"}));
6847        assert!(svc.delete_archive(&req).is_err());
6848    }
6849
6850    // ── replay error paths ──
6851
6852    #[test]
6853    fn cancel_replay_not_found() {
6854        let svc = make_service();
6855        let req = make_request("CancelReplay", json!({"ReplayName": "ghost"}));
6856        assert!(svc.cancel_replay(&req).is_err());
6857    }
6858
6859    // ── put_events empty ──
6860
6861    #[test]
6862    fn put_events_empty_entries_errors() {
6863        let svc = make_service();
6864        let req = make_request("PutEvents", json!({"Entries": []}));
6865        assert!(svc.put_events(&req).is_err());
6866    }
6867
6868    #[test]
6869    fn put_events_success_count() {
6870        let svc = make_service();
6871        let req = make_request(
6872            "PutEvents",
6873            json!({
6874                "Entries": [
6875                    {"Source": "my.app", "DetailType": "Test", "Detail": "{}"}
6876                ]
6877            }),
6878        );
6879        let resp = svc.put_events(&req).unwrap();
6880        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6881        assert_eq!(body["FailedEntryCount"], 0);
6882        assert_eq!(body["Entries"].as_array().unwrap().len(), 1);
6883    }
6884
6885    // ── list_tags_for_resource on unknown ARN ──
6886
6887    #[test]
6888    fn list_tags_for_resource_unknown_errors() {
6889        let svc = make_service();
6890        let req = make_request(
6891            "ListTagsForResource",
6892            json!({
6893                "ResourceARN": "arn:aws:events:us-east-1:123456789012:rule/ghost"
6894            }),
6895        );
6896        assert!(svc.list_tags_for_resource(&req).is_err());
6897    }
6898
6899    // ── describe_rule with EventBusName ──
6900
6901    #[test]
6902    fn describe_rule_custom_bus() {
6903        let svc = make_service();
6904        svc.create_event_bus(&make_request("CreateEventBus", json!({"Name": "cb"})))
6905            .unwrap();
6906
6907        svc.put_rule(&make_request(
6908            "PutRule",
6909            json!({
6910                "Name": "r-cb",
6911                "EventPattern": "{\"source\":[\"aws.s3\"]}",
6912                "EventBusName": "cb"
6913            }),
6914        ))
6915        .unwrap();
6916
6917        let resp = svc
6918            .describe_rule(&make_request(
6919                "DescribeRule",
6920                json!({"Name": "r-cb", "EventBusName": "cb"}),
6921            ))
6922            .unwrap();
6923        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6924        assert_eq!(body["Name"], "r-cb");
6925    }
6926
6927    // ── enable/disable rule on custom bus ──
6928
6929    #[test]
6930    fn disable_rule_on_custom_bus() {
6931        let svc = make_service();
6932        svc.create_event_bus(&make_request("CreateEventBus", json!({"Name": "dcb"})))
6933            .unwrap();
6934        svc.put_rule(&make_request(
6935            "PutRule",
6936            json!({
6937                "Name": "r-d",
6938                "EventPattern": "{\"source\":[\"s\"]}",
6939                "EventBusName": "dcb"
6940            }),
6941        ))
6942        .unwrap();
6943        svc.disable_rule(&make_request(
6944            "DisableRule",
6945            json!({"Name": "r-d", "EventBusName": "dcb"}),
6946        ))
6947        .unwrap();
6948    }
6949
6950    // ── describe_event_bus with custom bus ──
6951
6952    #[test]
6953    fn describe_event_bus_custom() {
6954        let svc = make_service();
6955        svc.create_event_bus(&make_request("CreateEventBus", json!({"Name": "deb"})))
6956            .unwrap();
6957        let resp = svc
6958            .describe_event_bus(&make_request("DescribeEventBus", json!({"Name": "deb"})))
6959            .unwrap();
6960        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6961        assert_eq!(body["Name"], "deb");
6962    }
6963
6964    #[test]
6965    fn list_event_buses_with_name_prefix() {
6966        let svc = make_service();
6967        for name in &["dev-x", "dev-y", "prod-z"] {
6968            svc.create_event_bus(&make_request("CreateEventBus", json!({"Name": name})))
6969                .unwrap();
6970        }
6971        let resp = svc
6972            .list_event_buses(&make_request(
6973                "ListEventBuses",
6974                json!({"NamePrefix": "dev-"}),
6975            ))
6976            .unwrap();
6977        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6978        assert_eq!(body["EventBuses"].as_array().unwrap().len(), 2);
6979    }
6980
6981    #[test]
6982    fn list_rules_on_custom_bus() {
6983        let svc = make_service();
6984        svc.create_event_bus(&make_request("CreateEventBus", json!({"Name": "lrcb"})))
6985            .unwrap();
6986        svc.put_rule(&make_request(
6987            "PutRule",
6988            json!({
6989                "Name": "r1",
6990                "EventPattern": "{\"source\":[\"s\"]}",
6991                "EventBusName": "lrcb"
6992            }),
6993        ))
6994        .unwrap();
6995
6996        let resp = svc
6997            .list_rules(&make_request("ListRules", json!({"EventBusName": "lrcb"})))
6998            .unwrap();
6999        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
7000        assert_eq!(body["Rules"].as_array().unwrap().len(), 1);
7001    }
7002
7003    // ── put_targets on custom bus ──
7004
7005    #[test]
7006    fn put_targets_on_custom_bus() {
7007        let svc = make_service();
7008        svc.create_event_bus(&make_request("CreateEventBus", json!({"Name": "ptcb"})))
7009            .unwrap();
7010        svc.put_rule(&make_request(
7011            "PutRule",
7012            json!({
7013                "Name": "rt",
7014                "EventPattern": "{\"source\":[\"s\"]}",
7015                "EventBusName": "ptcb"
7016            }),
7017        ))
7018        .unwrap();
7019
7020        svc.put_targets(&make_request(
7021            "PutTargets",
7022            json!({
7023                "Rule": "rt",
7024                "EventBusName": "ptcb",
7025                "Targets": [{"Id": "t1", "Arn": "arn:aws:sqs:us-east-1:123456789012:q1"}]
7026            }),
7027        ))
7028        .unwrap();
7029    }
7030
7031    // ── remove_targets unknown target ids ──
7032
7033    #[test]
7034    fn remove_targets_unknown_ids_returns_failed() {
7035        let svc = make_service();
7036        svc.put_rule(&make_request(
7037            "PutRule",
7038            json!({"Name": "rmt", "EventPattern": "{\"source\":[\"s\"]}"}),
7039        ))
7040        .unwrap();
7041
7042        let resp = svc
7043            .remove_targets(&make_request(
7044                "RemoveTargets",
7045                json!({"Rule": "rmt", "Ids": ["ghost1", "ghost2"]}),
7046            ))
7047            .unwrap();
7048        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
7049        // Unknown ids are silently ok in many implementations; at least we hit the code path
7050        assert!(body.is_object());
7051    }
7052
7053    #[test]
7054    fn describe_event_source_unknown_errors() {
7055        let svc = make_service();
7056        let req = make_request("DescribeEventSource", json!({"Name": "ghost"}));
7057        assert!(svc.describe_event_source(&req).is_err());
7058    }
7059
7060    #[test]
7061    fn describe_partner_event_source_unknown_errors() {
7062        let svc = make_service();
7063        let req = make_request("DescribePartnerEventSource", json!({"Name": "ghost"}));
7064        assert!(svc.describe_partner_event_source(&req).is_err());
7065    }
7066
7067    #[test]
7068    fn list_partner_event_sources_empty_ok() {
7069        let svc = make_service();
7070        let req = make_request(
7071            "ListPartnerEventSources",
7072            json!({"NamePrefix": "aws.partner"}),
7073        );
7074        let resp = svc.list_partner_event_sources(&req).unwrap();
7075        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
7076        assert!(body["PartnerEventSources"].is_array());
7077    }
7078
7079    #[test]
7080    fn list_event_sources_empty_ok() {
7081        let svc = make_service();
7082        let req = make_request("ListEventSources", json!({}));
7083        let resp = svc.list_event_sources(&req).unwrap();
7084        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
7085        assert!(body["EventSources"].is_array());
7086    }
7087
7088    #[test]
7089    fn update_connection_unknown_errors() {
7090        let svc = make_service();
7091        let req = make_request(
7092            "UpdateConnection",
7093            json!({"Name": "ghost", "AuthorizationType": "API_KEY"}),
7094        );
7095        assert!(svc.update_connection(&req).is_err());
7096    }
7097
7098    #[test]
7099    fn describe_api_destination_unknown_errors() {
7100        let svc = make_service();
7101        let req = make_request("DescribeApiDestination", json!({"Name": "ghost"}));
7102        assert!(svc.describe_api_destination(&req).is_err());
7103    }
7104
7105    #[test]
7106    fn update_api_destination_unknown_errors() {
7107        let svc = make_service();
7108        let req = make_request("UpdateApiDestination", json!({"Name": "ghost"}));
7109        assert!(svc.update_api_destination(&req).is_err());
7110    }
7111
7112    #[test]
7113    fn update_archive_unknown_errors() {
7114        let svc = make_service();
7115        let req = make_request("UpdateArchive", json!({"ArchiveName": "ghost"}));
7116        assert!(svc.update_archive(&req).is_err());
7117    }
7118
7119    #[test]
7120    fn describe_archive_unknown_errors_b() {
7121        let svc = make_service();
7122        let req = make_request("DescribeArchive", json!({"ArchiveName": "ghost"}));
7123        assert!(svc.describe_archive(&req).is_err());
7124    }
7125
7126    #[test]
7127    fn list_archives_empty_ok() {
7128        let svc = make_service();
7129        let req = make_request("ListArchives", json!({}));
7130        let resp = svc.list_archives(&req).unwrap();
7131        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
7132        assert!(body["Archives"].is_array());
7133    }
7134
7135    #[test]
7136    fn list_replays_empty_ok() {
7137        let svc = make_service();
7138        let req = make_request("ListReplays", json!({}));
7139        let resp = svc.list_replays(&req).unwrap();
7140        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
7141        assert!(body["Replays"].is_array());
7142    }
7143
7144    #[test]
7145    fn describe_endpoint_unknown_errors() {
7146        let svc = make_service();
7147        let req = make_request("DescribeEndpoint", json!({"Name": "ghost"}));
7148        assert!(svc.describe_endpoint(&req).is_err());
7149    }
7150
7151    #[test]
7152    fn delete_endpoint_unknown_errors() {
7153        let svc = make_service();
7154        let req = make_request("DeleteEndpoint", json!({"Name": "ghost"}));
7155        assert!(svc.delete_endpoint(&req).is_err());
7156    }
7157
7158    #[test]
7159    fn list_endpoints_empty_ok() {
7160        let svc = make_service();
7161        let req = make_request("ListEndpoints", json!({}));
7162        let resp = svc.list_endpoints(&req).unwrap();
7163        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
7164        assert!(body["Endpoints"].is_array());
7165    }
7166}