Skip to main content

fakecloud_eventbridge/
service.rs

1use async_trait::async_trait;
2use chrono::{DateTime, Utc};
3use http::StatusCode;
4use serde_json::{json, Value};
5
6use std::collections::HashMap;
7use std::sync::Arc;
8
9use tokio::sync::Mutex as AsyncMutex;
10
11use fakecloud_aws::arn::Arn;
12use fakecloud_core::delivery::DeliveryBus;
13use fakecloud_core::pagination::paginate;
14use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
15use fakecloud_core::validation::*;
16use fakecloud_persistence::SnapshotStore;
17
18use fakecloud_lambda::runtime::ContainerRuntime;
19use fakecloud_lambda::state::{LambdaInvocation, SharedLambdaState};
20use fakecloud_logs::state::SharedLogsState;
21
22use crate::state::{
23    ApiDestination, Archive, Connection, Endpoint, EventBridgeSnapshot, EventBridgeState, EventBus,
24    EventRule, EventTarget, PartnerEventSource, PutEvent, Replay, SharedEventBridgeState,
25    EVENTBRIDGE_SNAPSHOT_SCHEMA_VERSION,
26};
27
28/// Validate a single `PutEvents` entry's required fields (`Source`,
29/// `DetailType`, `Detail`) and that `Detail` is a well-formed JSON
30/// object. Returns the JSON error body AWS surfaces in the matching
31/// `Entries[]` slot on failure.
32fn validate_put_events_entry(source: &str, detail_type: &str, detail: &str) -> Result<(), Value> {
33    if source.is_empty() {
34        return Err(json!({
35            "ErrorCode": "InvalidArgument",
36            "ErrorMessage": "Parameter Source is not valid. Reason: Source is a required argument.",
37        }));
38    }
39    if detail_type.is_empty() {
40        return Err(json!({
41            "ErrorCode": "InvalidArgument",
42            "ErrorMessage": "Parameter DetailType is not valid. Reason: DetailType is a required argument.",
43        }));
44    }
45    if detail.is_empty() {
46        return Err(json!({
47            "ErrorCode": "InvalidArgument",
48            "ErrorMessage": "Parameter Detail is not valid. Reason: Detail is a required argument.",
49        }));
50    }
51    if serde_json::from_str::<Value>(detail).is_err() {
52        return Err(json!({
53            "ErrorCode": "MalformedDetail",
54            "ErrorMessage": "Detail is malformed.",
55        }));
56    }
57    Ok(())
58}
59
60/// Parse an entry's `Time` field, tolerating the three formats AWS
61/// accepts (RFC 3339 string, fractional seconds as a float, integer
62/// seconds). Falls back to "now" if the field is absent or
63/// unparseable, which matches the real service.
64fn parse_put_events_time(raw: &Value) -> DateTime<Utc> {
65    if let Some(s) = raw.as_str() {
66        return DateTime::parse_from_rfc3339(s)
67            .map(|dt| dt.with_timezone(&Utc))
68            .unwrap_or_else(|_| Utc::now());
69    }
70    if let Some(ts) = raw.as_f64() {
71        return DateTime::from_timestamp(ts as i64, ((ts.fract()) * 1_000_000_000.0) as u32)
72            .unwrap_or_else(Utc::now);
73    }
74    if let Some(ts) = raw.as_i64() {
75        return DateTime::from_timestamp(ts, 0).unwrap_or_else(Utc::now);
76    }
77    Utc::now()
78}
79
80/// Actions that mutate EventBridge state.
81fn is_mutating_action(action: &str) -> bool {
82    matches!(
83        action,
84        "CreateEventBus"
85            | "DeleteEventBus"
86            | "UpdateEventBus"
87            | "PutRule"
88            | "DeleteRule"
89            | "EnableRule"
90            | "DisableRule"
91            | "PutTargets"
92            | "RemoveTargets"
93            | "PutEvents"
94            | "PutPermission"
95            | "RemovePermission"
96            | "TagResource"
97            | "UntagResource"
98            | "CreateArchive"
99            | "UpdateArchive"
100            | "DeleteArchive"
101            | "CreateConnection"
102            | "UpdateConnection"
103            | "DeleteConnection"
104            | "DeauthorizeConnection"
105            | "CreateApiDestination"
106            | "UpdateApiDestination"
107            | "DeleteApiDestination"
108            | "StartReplay"
109            | "CancelReplay"
110            | "CreatePartnerEventSource"
111            | "DeletePartnerEventSource"
112            | "ActivateEventSource"
113            | "DeactivateEventSource"
114            | "PutPartnerEvents"
115            | "CreateEndpoint"
116            | "DeleteEndpoint"
117            | "UpdateEndpoint"
118    )
119}
120
121pub struct EventBridgeService {
122    state: SharedEventBridgeState,
123    delivery: Arc<DeliveryBus>,
124    lambda_state: Option<SharedLambdaState>,
125    logs_state: Option<SharedLogsState>,
126    container_runtime: Option<Arc<ContainerRuntime>>,
127    snapshot_store: Option<Arc<dyn SnapshotStore>>,
128    snapshot_lock: Arc<AsyncMutex<()>>,
129}
130
131impl EventBridgeService {
132    pub fn new(state: SharedEventBridgeState, delivery: Arc<DeliveryBus>) -> Self {
133        Self {
134            state,
135            delivery,
136            lambda_state: None,
137            logs_state: None,
138            container_runtime: None,
139            snapshot_store: None,
140            snapshot_lock: Arc::new(AsyncMutex::new(())),
141        }
142    }
143
144    pub fn with_lambda(mut self, lambda_state: SharedLambdaState) -> Self {
145        self.lambda_state = Some(lambda_state);
146        self
147    }
148
149    pub fn with_logs(mut self, logs_state: SharedLogsState) -> Self {
150        self.logs_state = Some(logs_state);
151        self
152    }
153
154    pub fn with_runtime(mut self, runtime: Arc<ContainerRuntime>) -> Self {
155        self.container_runtime = Some(runtime);
156        self
157    }
158
159    pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
160        self.snapshot_store = Some(store);
161        self
162    }
163
164    /// Persist current state as a snapshot. Held across the
165    /// clone-serialize-write sequence to prevent stale-last writes,
166    /// with serde + file I/O offloaded to the blocking pool.
167    async fn save_snapshot(&self) {
168        let Some(store) = self.snapshot_store.clone() else {
169            return;
170        };
171        let _guard = self.snapshot_lock.lock().await;
172        let snapshot = EventBridgeSnapshot {
173            schema_version: EVENTBRIDGE_SNAPSHOT_SCHEMA_VERSION,
174            accounts: Some(self.state.read().clone()),
175            state: None,
176        };
177        let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
178            let bytes = serde_json::to_vec(&snapshot)
179                .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
180            store.save(&bytes)
181        })
182        .await;
183        match join {
184            Ok(Ok(())) => {}
185            Ok(Err(err)) => tracing::error!(%err, "failed to write eventbridge snapshot"),
186            Err(err) => tracing::error!(%err, "eventbridge snapshot task panicked"),
187        }
188    }
189}
190
191#[async_trait]
192impl AwsService for EventBridgeService {
193    fn service_name(&self) -> &str {
194        "events"
195    }
196
197    async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
198        let mutates = is_mutating_action(req.action.as_str());
199        let result = match req.action.as_str() {
200            "CreateEventBus" => self.create_event_bus(&req),
201            "DeleteEventBus" => self.delete_event_bus(&req),
202            "ListEventBuses" => self.list_event_buses(&req),
203            "DescribeEventBus" => self.describe_event_bus(&req),
204            "PutRule" => self.put_rule(&req),
205            "DeleteRule" => self.delete_rule(&req),
206            "ListRules" => self.list_rules(&req),
207            "DescribeRule" => self.describe_rule(&req),
208            "EnableRule" => self.enable_rule(&req),
209            "DisableRule" => self.disable_rule(&req),
210            "PutTargets" => self.put_targets(&req),
211            "RemoveTargets" => self.remove_targets(&req),
212            "ListTargetsByRule" => self.list_targets_by_rule(&req),
213            "ListRuleNamesByTarget" => self.list_rule_names_by_target(&req),
214            "PutEvents" => self.put_events(&req),
215            "PutPermission" => self.put_permission(&req),
216            "RemovePermission" => self.remove_permission(&req),
217            "TagResource" => self.tag_resource(&req),
218            "UntagResource" => self.untag_resource(&req),
219            "ListTagsForResource" => self.list_tags_for_resource(&req),
220            "CreateArchive" => self.create_archive(&req),
221            "DescribeArchive" => self.describe_archive(&req),
222            "ListArchives" => self.list_archives(&req),
223            "UpdateArchive" => self.update_archive(&req),
224            "DeleteArchive" => self.delete_archive(&req),
225            "CreateConnection" => self.create_connection(&req),
226            "DescribeConnection" => self.describe_connection(&req),
227            "ListConnections" => self.list_connections(&req),
228            "UpdateConnection" => self.update_connection(&req),
229            "DeleteConnection" => self.delete_connection(&req),
230            "CreateApiDestination" => self.create_api_destination(&req),
231            "DescribeApiDestination" => self.describe_api_destination(&req),
232            "ListApiDestinations" => self.list_api_destinations(&req),
233            "UpdateApiDestination" => self.update_api_destination(&req),
234            "DeleteApiDestination" => self.delete_api_destination(&req),
235            "StartReplay" => self.start_replay(&req),
236            "DescribeReplay" => self.describe_replay(&req),
237            "ListReplays" => self.list_replays(&req),
238            "CancelReplay" => self.cancel_replay(&req),
239            "CreatePartnerEventSource" => self.create_partner_event_source(&req),
240            "DeletePartnerEventSource" => self.delete_partner_event_source(&req),
241            "DescribePartnerEventSource" => self.describe_partner_event_source(&req),
242            "ListPartnerEventSources" => self.list_partner_event_sources(&req),
243            "ListPartnerEventSourceAccounts" => self.list_partner_event_source_accounts(&req),
244            "ActivateEventSource" => self.activate_event_source(&req),
245            "DeactivateEventSource" => self.deactivate_event_source(&req),
246            "DescribeEventSource" => self.describe_event_source(&req),
247            "ListEventSources" => self.list_event_sources(&req),
248            "PutPartnerEvents" => self.put_partner_events(&req),
249            "TestEventPattern" => self.test_event_pattern(&req),
250            "UpdateEventBus" => self.update_event_bus(&req),
251            "CreateEndpoint" => self.create_endpoint(&req),
252            "DeleteEndpoint" => self.delete_endpoint(&req),
253            "DescribeEndpoint" => self.describe_endpoint(&req),
254            "ListEndpoints" => self.list_endpoints(&req),
255            "UpdateEndpoint" => self.update_endpoint(&req),
256            "DeauthorizeConnection" => self.deauthorize_connection(&req),
257            _ => Err(AwsServiceError::action_not_implemented(
258                "events",
259                &req.action,
260            )),
261        };
262        if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
263            self.save_snapshot().await;
264        }
265        result
266    }
267
268    fn supported_actions(&self) -> &[&str] {
269        &[
270            "CreateEventBus",
271            "DeleteEventBus",
272            "ListEventBuses",
273            "DescribeEventBus",
274            "PutRule",
275            "DeleteRule",
276            "ListRules",
277            "DescribeRule",
278            "EnableRule",
279            "DisableRule",
280            "PutTargets",
281            "RemoveTargets",
282            "ListTargetsByRule",
283            "ListRuleNamesByTarget",
284            "PutEvents",
285            "PutPermission",
286            "RemovePermission",
287            "TagResource",
288            "UntagResource",
289            "ListTagsForResource",
290            "CreateArchive",
291            "DescribeArchive",
292            "ListArchives",
293            "UpdateArchive",
294            "DeleteArchive",
295            "CreateConnection",
296            "DescribeConnection",
297            "ListConnections",
298            "UpdateConnection",
299            "DeleteConnection",
300            "CreateApiDestination",
301            "DescribeApiDestination",
302            "ListApiDestinations",
303            "UpdateApiDestination",
304            "DeleteApiDestination",
305            "StartReplay",
306            "DescribeReplay",
307            "ListReplays",
308            "CancelReplay",
309            "CreatePartnerEventSource",
310            "DeletePartnerEventSource",
311            "DescribePartnerEventSource",
312            "ListPartnerEventSources",
313            "ListPartnerEventSourceAccounts",
314            "ActivateEventSource",
315            "DeactivateEventSource",
316            "DescribeEventSource",
317            "ListEventSources",
318            "PutPartnerEvents",
319            "TestEventPattern",
320            "UpdateEventBus",
321            "CreateEndpoint",
322            "DeleteEndpoint",
323            "DescribeEndpoint",
324            "ListEndpoints",
325            "UpdateEndpoint",
326            "DeauthorizeConnection",
327        ]
328    }
329}
330
331fn parse_tags(body: &Value) -> HashMap<String, String> {
332    let mut tags = HashMap::new();
333    if let Some(arr) = body["Tags"].as_array() {
334        for tag in arr {
335            if let (Some(key), Some(val)) = (tag["Key"].as_str(), tag["Value"].as_str()) {
336                tags.insert(key.to_string(), val.to_string());
337            }
338        }
339    }
340    tags
341}
342
343fn parse_target(target: &Value) -> EventTarget {
344    EventTarget {
345        id: target["Id"].as_str().unwrap_or("").to_string(),
346        arn: target["Arn"].as_str().unwrap_or("").to_string(),
347        input: target["Input"].as_str().map(|s| s.to_string()),
348        input_path: target["InputPath"].as_str().map(|s| s.to_string()),
349        input_transformer: target.get("InputTransformer").cloned(),
350        sqs_parameters: target.get("SqsParameters").cloned(),
351    }
352}
353
354fn target_to_json(t: &EventTarget) -> Value {
355    let mut obj = json!({ "Id": t.id, "Arn": t.arn });
356    if let Some(ref input) = t.input {
357        obj["Input"] = json!(input);
358    }
359    if let Some(ref input_path) = t.input_path {
360        obj["InputPath"] = json!(input_path);
361    }
362    if let Some(ref it) = t.input_transformer {
363        obj["InputTransformer"] = it.clone();
364    }
365    if let Some(ref sp) = t.sqs_parameters {
366        obj["SqsParameters"] = sp.clone();
367    }
368    obj
369}
370
371// ─── Event Bus Operations ───────────────────────────────────────────
372impl EventBridgeService {
373    fn create_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
374        let body = req.json_body();
375        validate_required("Name", &body["Name"])?;
376        let name = body["Name"]
377            .as_str()
378            .ok_or_else(|| missing("Name"))?
379            .to_string();
380        validate_string_length("name", &name, 1, 256)?;
381        validate_optional_string_length(
382            "eventSourceName",
383            body["EventSourceName"].as_str(),
384            1,
385            256,
386        )?;
387        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
388        validate_optional_string_length(
389            "kmsKeyIdentifier",
390            body["KmsKeyIdentifier"].as_str(),
391            0,
392            2048,
393        )?;
394
395        // Validate name doesn't contain '/' (unless partner bus)
396        if name.contains('/') && !name.starts_with("aws.partner/") {
397            return Err(AwsServiceError::aws_error(
398                StatusCode::BAD_REQUEST,
399                "ValidationException",
400                "Event bus name must not contain '/'.",
401            ));
402        }
403
404        // Partner event bus validation
405        if name.starts_with("aws.partner/") {
406            let event_source = body["EventSourceName"].as_str().unwrap_or("");
407            let accounts_r = self.state.read();
408            let empty_r = EventBridgeState::new(&req.account_id, &req.region);
409            let state_r = accounts_r.get(&req.account_id).unwrap_or(&empty_r);
410            let has_source = state_r.partner_event_sources.contains_key(event_source);
411            drop(accounts_r);
412            if !has_source {
413                return Err(AwsServiceError::aws_error(
414                    StatusCode::BAD_REQUEST,
415                    "ResourceNotFoundException",
416                    format!("Event source {event_source} does not exist."),
417                ));
418            }
419        }
420
421        let mut accounts = self.state.write();
422        let state = accounts.get_or_create(&req.account_id);
423
424        if state.buses.contains_key(&name) {
425            return Err(AwsServiceError::aws_error(
426                StatusCode::BAD_REQUEST,
427                "ResourceAlreadyExistsException",
428                format!("Event bus {name} already exists."),
429            ));
430        }
431
432        let arn = format!(
433            "arn:aws:events:{}:{}:event-bus/{}",
434            req.region, state.account_id, name
435        );
436        let now = Utc::now();
437        let description = body["Description"].as_str().map(|s| s.to_string());
438        let kms_key_identifier = body["KmsKeyIdentifier"].as_str().map(|s| s.to_string());
439        let dead_letter_config = body.get("DeadLetterConfig").cloned();
440
441        let tags = parse_tags(&body);
442
443        let bus = EventBus {
444            name: name.clone(),
445            arn: arn.clone(),
446            tags,
447            policy: None,
448            description,
449            kms_key_identifier,
450            dead_letter_config,
451            creation_time: now,
452            last_modified_time: now,
453        };
454        state.buses.insert(name, bus);
455
456        Ok(AwsResponse::ok_json(json!({ "EventBusArn": arn })))
457    }
458
459    fn delete_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
460        let body = req.json_body();
461        validate_required("Name", &body["Name"])?;
462        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
463        validate_string_length("name", name, 1, 256)?;
464
465        if name == "default" {
466            return Err(AwsServiceError::aws_error(
467                StatusCode::BAD_REQUEST,
468                "ValidationException",
469                format!("Cannot delete event bus {name}."),
470            ));
471        }
472
473        let mut accounts = self.state.write();
474        let state = accounts.get_or_create(&req.account_id);
475        state.buses.remove(name);
476        state.rules.retain(|k, _| k.0 != name);
477
478        Ok(AwsResponse::ok_json(json!({})))
479    }
480
481    fn list_event_buses(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
482        let body = req.json_body();
483        validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 256)?;
484        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
485        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
486        let name_prefix = body["NamePrefix"].as_str();
487        let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
488        if let Some(t) = body["NextToken"].as_str() {
489            t.parse::<usize>().map_err(|_| {
490                AwsServiceError::aws_error(
491                    StatusCode::BAD_REQUEST,
492                    "InvalidNextTokenException",
493                    format!("Invalid NextToken value: '{t}'"),
494                )
495            })?;
496        }
497
498        let accounts = self.state.read();
499        let empty = EventBridgeState::new(&req.account_id, &req.region);
500        let state = accounts.get(&req.account_id).unwrap_or(&empty);
501        let filtered: Vec<&_> = state
502            .buses
503            .values()
504            .filter(|b| match name_prefix {
505                Some(prefix) => b.name.starts_with(prefix),
506                None => true,
507            })
508            .collect();
509
510        let (page, next_token) = paginate(&filtered, body["NextToken"].as_str(), limit);
511        let buses: Vec<Value> = page
512            .iter()
513            .map(|b| json!({ "Name": b.name, "Arn": b.arn }))
514            .collect();
515        let mut resp = json!({ "EventBuses": buses });
516        if let Some(token) = next_token {
517            resp["NextToken"] = json!(token);
518        }
519
520        Ok(AwsResponse::ok_json(resp))
521    }
522
523    fn describe_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
524        let body = req.json_body();
525        validate_optional_string_length("name", body["Name"].as_str(), 1, 1600)?;
526        let name = body["Name"].as_str().unwrap_or("default");
527
528        let accounts = self.state.read();
529        let empty = EventBridgeState::new(&req.account_id, &req.region);
530        let state = accounts.get(&req.account_id).unwrap_or(&empty);
531        let bus = state.buses.get(name).ok_or_else(|| {
532            AwsServiceError::aws_error(
533                StatusCode::BAD_REQUEST,
534                "ResourceNotFoundException",
535                format!("Event bus {name} does not exist."),
536            )
537        })?;
538
539        let mut resp = json!({
540            "Name": bus.name,
541            "Arn": bus.arn,
542            "CreationTime": bus.creation_time.timestamp() as f64,
543            "LastModifiedTime": bus.last_modified_time.timestamp() as f64,
544        });
545
546        if let Some(ref policy) = bus.policy {
547            resp["Policy"] = Value::String(serde_json::to_string(policy).unwrap());
548        }
549        if let Some(ref desc) = bus.description {
550            resp["Description"] = json!(desc);
551        }
552        if let Some(ref kms) = bus.kms_key_identifier {
553            resp["KmsKeyIdentifier"] = json!(kms);
554        }
555        if let Some(ref dlc) = bus.dead_letter_config {
556            resp["DeadLetterConfig"] = dlc.clone();
557        }
558
559        Ok(AwsResponse::ok_json(resp))
560    }
561
562    // ─── Permission Operations ──────────────────────────────────────────
563
564    fn put_permission(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
565        let body = req.json_body();
566        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 256)?;
567        validate_optional_string_length("action", body["Action"].as_str(), 1, 64)?;
568        validate_optional_string_length("principal", body["Principal"].as_str(), 1, 12)?;
569        validate_optional_string_length("statementId", body["StatementId"].as_str(), 1, 64)?;
570        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
571
572        let mut accounts = self.state.write();
573        let state = accounts.get_or_create(&req.account_id);
574
575        let bus = state.buses.get_mut(event_bus_name).ok_or_else(|| {
576            AwsServiceError::aws_error(
577                StatusCode::BAD_REQUEST,
578                "ResourceNotFoundException",
579                format!("Event bus {event_bus_name} does not exist."),
580            )
581        })?;
582
583        // Check if Policy is provided (new-style)
584        if let Some(policy_str) = body["Policy"].as_str() {
585            if let Ok(policy) = serde_json::from_str::<Value>(policy_str) {
586                bus.policy = Some(policy);
587                return Ok(AwsResponse::ok_json(json!({})));
588            }
589        }
590
591        // Old-style: Action, Principal, StatementId
592        let action = body["Action"].as_str().unwrap_or("");
593        let principal = body["Principal"].as_str().unwrap_or("");
594        let statement_id = body["StatementId"].as_str().unwrap_or("");
595
596        // Validate action
597        if action != "events:PutEvents" {
598            return Err(AwsServiceError::aws_error(
599                StatusCode::BAD_REQUEST,
600                "ValidationException",
601                "Provided value in parameter 'action' is not supported.",
602            ));
603        }
604
605        let statement = json!({
606            "Sid": statement_id,
607            "Effect": "Allow",
608            "Principal": { "AWS": Arn::global("iam", principal, "root").to_string() },
609            "Action": action,
610            "Resource": bus.arn,
611        });
612
613        let policy = bus.policy.get_or_insert_with(|| {
614            json!({
615                "Version": "2012-10-17",
616                "Statement": [],
617            })
618        });
619
620        if let Some(stmts) = policy["Statement"].as_array_mut() {
621            stmts.push(statement);
622        }
623
624        Ok(AwsResponse::ok_json(json!({})))
625    }
626
627    fn remove_permission(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
628        let body = req.json_body();
629        validate_optional_string_length("statementId", body["StatementId"].as_str(), 1, 64)?;
630        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 256)?;
631        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
632        let statement_id = body["StatementId"].as_str().unwrap_or("");
633        let remove_all = body["RemoveAllPermissions"].as_bool().unwrap_or(false);
634
635        let mut accounts = self.state.write();
636        let state = accounts.get_or_create(&req.account_id);
637
638        let bus = state.buses.get_mut(event_bus_name).ok_or_else(|| {
639            AwsServiceError::aws_error(
640                StatusCode::BAD_REQUEST,
641                "ResourceNotFoundException",
642                format!("Event bus {event_bus_name} does not exist."),
643            )
644        })?;
645
646        if remove_all {
647            bus.policy = None;
648            return Ok(AwsResponse::ok_json(json!({})));
649        }
650
651        let policy = bus.policy.as_mut().ok_or_else(|| {
652            AwsServiceError::aws_error(
653                StatusCode::BAD_REQUEST,
654                "ResourceNotFoundException",
655                "EventBus does not have a policy.",
656            )
657        })?;
658
659        if let Some(stmts) = policy["Statement"].as_array_mut() {
660            let before = stmts.len();
661            stmts.retain(|s| s["Sid"].as_str() != Some(statement_id));
662            if stmts.len() == before {
663                return Err(AwsServiceError::aws_error(
664                    StatusCode::BAD_REQUEST,
665                    "ResourceNotFoundException",
666                    "Statement with the provided id does not exist.",
667                ));
668            }
669            if stmts.is_empty() {
670                bus.policy = None;
671            }
672        }
673
674        Ok(AwsResponse::ok_json(json!({})))
675    }
676
677    // ─── Rule Operations ────────────────────────────────────────────────
678
679    fn put_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
680        let body = req.json_body();
681        validate_required("Name", &body["Name"])?;
682        let name = body["Name"]
683            .as_str()
684            .ok_or_else(|| missing("Name"))?
685            .to_string();
686        validate_string_length("name", &name, 1, 64)?;
687        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
688        validate_optional_string_length(
689            "scheduleExpression",
690            body["ScheduleExpression"].as_str(),
691            0,
692            256,
693        )?;
694        validate_optional_string_length("eventPattern", body["EventPattern"].as_str(), 0, 4096)?;
695        validate_optional_enum(
696            "state",
697            body["State"].as_str(),
698            &[
699                "ENABLED",
700                "DISABLED",
701                "ENABLED_WITH_ALL_CLOUDTRAIL_MANAGEMENT_EVENTS",
702            ],
703        )?;
704        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
705        validate_optional_string_length("roleArn", body["RoleArn"].as_str(), 1, 1600)?;
706
707        let raw_bus = body["EventBusName"]
708            .as_str()
709            .unwrap_or("default")
710            .to_string();
711
712        let mut accounts = self.state.write();
713        let state = accounts.get_or_create(&req.account_id);
714        let event_bus_name = state.resolve_bus_name(&raw_bus);
715
716        let event_pattern = body["EventPattern"].as_str().and_then(|s| {
717            if s.is_empty() {
718                None
719            } else {
720                Some(s.to_string())
721            }
722        });
723        let schedule_expression = body["ScheduleExpression"].as_str().and_then(|s| {
724            if s.is_empty() {
725                None
726            } else {
727                Some(s.to_string())
728            }
729        });
730        let description = body["Description"].as_str().map(|s| s.to_string());
731        let role_arn = body["RoleArn"].as_str().map(|s| s.to_string());
732        let rule_state = body["State"].as_str().unwrap_or("ENABLED").to_string();
733
734        // Validate: schedule expressions only on default bus
735        if schedule_expression.is_some() && event_bus_name != "default" {
736            return Err(AwsServiceError::aws_error(
737                StatusCode::BAD_REQUEST,
738                "ValidationException",
739                "ScheduleExpression is supported only on the default event bus.",
740            ));
741        }
742
743        if !state.buses.contains_key(&event_bus_name) {
744            return Err(AwsServiceError::aws_error(
745                StatusCode::BAD_REQUEST,
746                "ResourceNotFoundException",
747                format!("Event bus {event_bus_name} does not exist."),
748            ));
749        }
750
751        let arn = if event_bus_name == "default" {
752            format!(
753                "arn:aws:events:{}:{}:rule/{}",
754                req.region, state.account_id, name
755            )
756        } else {
757            format!(
758                "arn:aws:events:{}:{}:rule/{}/{}",
759                req.region, state.account_id, event_bus_name, name
760            )
761        };
762
763        let key = (event_bus_name.clone(), name.clone());
764        let targets = state
765            .rules
766            .get(&key)
767            .map(|r| r.targets.clone())
768            .unwrap_or_default();
769
770        let tags = parse_tags(&body);
771
772        let rule = EventRule {
773            name: name.clone(),
774            arn: arn.clone(),
775            event_bus_name,
776            event_pattern,
777            schedule_expression,
778            state: rule_state,
779            description,
780            role_arn,
781            managed_by: None,
782            created_by: None,
783            targets,
784            tags,
785            last_fired: None,
786        };
787
788        state.rules.insert(key, rule);
789        Ok(AwsResponse::ok_json(json!({ "RuleArn": arn })))
790    }
791
792    fn delete_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
793        let body = req.json_body();
794        validate_required("Name", &body["Name"])?;
795        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
796        validate_string_length("name", name, 1, 64)?;
797        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
798        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
799
800        let mut accounts = self.state.write();
801        let state = accounts.get_or_create(&req.account_id);
802        let bus_name = state.resolve_bus_name(event_bus_name);
803        let key = (bus_name, name.to_string());
804
805        // Check if rule has targets
806        if let Some(rule) = state.rules.get(&key) {
807            if !rule.targets.is_empty() {
808                return Err(AwsServiceError::aws_error(
809                    StatusCode::BAD_REQUEST,
810                    "ValidationException",
811                    "Rule can't be deleted since it has targets.",
812                ));
813            }
814        }
815
816        state.rules.remove(&key);
817        Ok(AwsResponse::ok_json(json!({})))
818    }
819
820    fn list_rules(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
821        let body = req.json_body();
822        validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 64)?;
823        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
824        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
825        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
826        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
827        let name_prefix = body["NamePrefix"].as_str();
828        let limit = body["Limit"].as_u64().map(|n| n as usize);
829        let next_token = body["NextToken"].as_str();
830
831        let accounts = self.state.read();
832        let empty = EventBridgeState::new(&req.account_id, &req.region);
833        let state = accounts.get(&req.account_id).unwrap_or(&empty);
834        let bus_name = state.resolve_bus_name(event_bus_name);
835
836        let mut rules: Vec<&EventRule> = state
837            .rules
838            .values()
839            .filter(|r| r.event_bus_name == bus_name)
840            .filter(|r| match name_prefix {
841                Some(prefix) => r.name.starts_with(prefix),
842                None => true,
843            })
844            .collect();
845        rules.sort_by(|a, b| a.name.cmp(&b.name));
846
847        // Pagination
848        let start = next_token
849            .and_then(|t| t.parse::<usize>().ok())
850            .unwrap_or(0)
851            .min(rules.len());
852        let rules_slice = &rules[start..];
853
854        let (page, new_next_token) = if let Some(lim) = limit {
855            if rules_slice.len() > lim {
856                (&rules_slice[..lim], Some((start + lim).to_string()))
857            } else {
858                (rules_slice, None)
859            }
860        } else {
861            (rules_slice, None)
862        };
863
864        let rules_json: Vec<Value> = page
865            .iter()
866            .map(|r| {
867                let mut obj = json!({
868                    "Name": r.name,
869                    "Arn": r.arn,
870                    "EventBusName": r.event_bus_name,
871                    "State": r.state,
872                });
873                if let Some(ref desc) = r.description {
874                    obj["Description"] = json!(desc);
875                }
876                if let Some(ref ep) = r.event_pattern {
877                    obj["EventPattern"] = json!(ep);
878                }
879                if let Some(ref se) = r.schedule_expression {
880                    obj["ScheduleExpression"] = json!(se);
881                }
882                if let Some(ref mb) = r.managed_by {
883                    obj["ManagedBy"] = json!(mb);
884                }
885                obj
886            })
887            .collect();
888
889        let mut resp = json!({ "Rules": rules_json });
890        if let Some(token) = new_next_token {
891            resp["NextToken"] = json!(token);
892        }
893
894        Ok(AwsResponse::ok_json(resp))
895    }
896
897    fn describe_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
898        let body = req.json_body();
899        validate_required("Name", &body["Name"])?;
900        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
901        validate_string_length("name", name, 1, 64)?;
902        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
903        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
904
905        let accounts = self.state.read();
906        let empty = EventBridgeState::new(&req.account_id, &req.region);
907        let state = accounts.get(&req.account_id).unwrap_or(&empty);
908        let bus_name = state.resolve_bus_name(event_bus_name);
909        let key = (bus_name.clone(), name.to_string());
910
911        let rule = state.rules.get(&key).ok_or_else(|| {
912            AwsServiceError::aws_error(
913                StatusCode::BAD_REQUEST,
914                "ResourceNotFoundException",
915                format!("Rule {name} does not exist."),
916            )
917        })?;
918
919        let mut resp = json!({
920            "Name": rule.name,
921            "Arn": rule.arn,
922            "EventBusName": rule.event_bus_name,
923            "State": rule.state,
924        });
925
926        if let Some(ref desc) = rule.description {
927            resp["Description"] = json!(desc);
928        }
929        if let Some(ref ep) = rule.event_pattern {
930            resp["EventPattern"] = json!(ep);
931        }
932        if let Some(ref se) = rule.schedule_expression {
933            resp["ScheduleExpression"] = json!(se);
934        }
935        if let Some(ref role) = rule.role_arn {
936            resp["RoleArn"] = json!(role);
937        }
938        if let Some(ref mb) = rule.managed_by {
939            resp["ManagedBy"] = json!(mb);
940        }
941        if let Some(ref cb) = rule.created_by {
942            resp["CreatedBy"] = json!(cb);
943        }
944        // If non-default bus, set CreatedBy to account_id
945        if rule.event_bus_name != "default" && rule.created_by.is_none() {
946            resp["CreatedBy"] = json!(state.account_id);
947        }
948
949        Ok(AwsResponse::ok_json(resp))
950    }
951
952    fn enable_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
953        let body = req.json_body();
954        validate_required("Name", &body["Name"])?;
955        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
956        validate_string_length("name", name, 1, 64)?;
957        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
958        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
959
960        let mut accounts = self.state.write();
961        let state = accounts.get_or_create(&req.account_id);
962        let bus_name = state.resolve_bus_name(event_bus_name);
963        let key = (bus_name, name.to_string());
964
965        let rule = state.rules.get_mut(&key).ok_or_else(|| {
966            AwsServiceError::aws_error(
967                StatusCode::BAD_REQUEST,
968                "ResourceNotFoundException",
969                format!("Rule {name} does not exist."),
970            )
971        })?;
972
973        rule.state = "ENABLED".to_string();
974        Ok(AwsResponse::ok_json(json!({})))
975    }
976
977    fn disable_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
978        let body = req.json_body();
979        validate_required("Name", &body["Name"])?;
980        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
981        validate_string_length("name", name, 1, 64)?;
982        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
983        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
984
985        let mut accounts = self.state.write();
986        let state = accounts.get_or_create(&req.account_id);
987        let bus_name = state.resolve_bus_name(event_bus_name);
988        let key = (bus_name, name.to_string());
989
990        let rule = state.rules.get_mut(&key).ok_or_else(|| {
991            AwsServiceError::aws_error(
992                StatusCode::BAD_REQUEST,
993                "ResourceNotFoundException",
994                format!("Rule {name} does not exist."),
995            )
996        })?;
997
998        rule.state = "DISABLED".to_string();
999        Ok(AwsResponse::ok_json(json!({})))
1000    }
1001
1002    // ─── Target Operations ──────────────────────────────────────────────
1003
1004    fn put_targets(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1005        let body = req.json_body();
1006        validate_required("Rule", &body["Rule"])?;
1007        let rule_name = body["Rule"].as_str().ok_or_else(|| missing("Rule"))?;
1008        validate_string_length("rule", rule_name, 1, 64)?;
1009        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
1010        validate_required("Targets", &body["Targets"])?;
1011        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
1012        let targets = body["Targets"]
1013            .as_array()
1014            .ok_or_else(|| missing("Targets"))?;
1015
1016        // Validate targets - check for FIFO SQS without SqsParameters
1017        for target in targets {
1018            let target_id = target["Id"].as_str().unwrap_or("");
1019            let target_arn = target["Arn"].as_str().unwrap_or("");
1020
1021            if target_arn.ends_with(".fifo") && target.get("SqsParameters").is_none() {
1022                return Err(AwsServiceError::aws_error(
1023                    StatusCode::BAD_REQUEST,
1024                    "ValidationException",
1025                    format!(
1026                        "Parameter(s) SqsParameters must be specified for target: {target_id}."
1027                    ),
1028                ));
1029            }
1030
1031            // Validate ARN format
1032            if !target_arn.starts_with("arn:") {
1033                return Err(AwsServiceError::aws_error(
1034                    StatusCode::BAD_REQUEST,
1035                    "ValidationException",
1036                    format!(
1037                        "Parameter {target_arn} is not valid. Reason: Provided Arn is not in correct format."
1038                    ),
1039                ));
1040            }
1041        }
1042
1043        let mut accounts = self.state.write();
1044        let state = accounts.get_or_create(&req.account_id);
1045        let bus_name = state.resolve_bus_name(event_bus_name);
1046        let key = (bus_name.clone(), rule_name.to_string());
1047
1048        let rule = state.rules.get_mut(&key).ok_or_else(|| {
1049            AwsServiceError::aws_error(
1050                StatusCode::BAD_REQUEST,
1051                "ResourceNotFoundException",
1052                format!("Rule {rule_name} does not exist on EventBus {bus_name}."),
1053            )
1054        })?;
1055
1056        for target in targets {
1057            let et = parse_target(target);
1058            // Remove existing target with same ID
1059            rule.targets.retain(|t| t.id != et.id);
1060            rule.targets.push(et);
1061        }
1062
1063        Ok(AwsResponse::ok_json(json!({
1064            "FailedEntryCount": 0,
1065            "FailedEntries": [],
1066        })))
1067    }
1068
1069    fn remove_targets(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1070        let body = req.json_body();
1071        validate_required("Rule", &body["Rule"])?;
1072        let rule_name = body["Rule"].as_str().ok_or_else(|| missing("Rule"))?;
1073        validate_string_length("rule", rule_name, 1, 64)?;
1074        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
1075        validate_required("Ids", &body["Ids"])?;
1076        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
1077        let ids = body["Ids"].as_array().ok_or_else(|| missing("Ids"))?;
1078
1079        let target_ids: Vec<String> = ids
1080            .iter()
1081            .filter_map(|v| v.as_str().map(|s| s.to_string()))
1082            .collect();
1083
1084        let mut accounts = self.state.write();
1085        let state = accounts.get_or_create(&req.account_id);
1086        let bus_name = state.resolve_bus_name(event_bus_name);
1087        let key = (bus_name.clone(), rule_name.to_string());
1088
1089        let rule = state.rules.get_mut(&key).ok_or_else(|| {
1090            AwsServiceError::aws_error(
1091                StatusCode::BAD_REQUEST,
1092                "ResourceNotFoundException",
1093                format!("Rule {rule_name} does not exist on EventBus {bus_name}."),
1094            )
1095        })?;
1096
1097        rule.targets.retain(|t| !target_ids.contains(&t.id));
1098
1099        Ok(AwsResponse::ok_json(json!({
1100            "FailedEntryCount": 0,
1101            "FailedEntries": [],
1102        })))
1103    }
1104
1105    fn list_targets_by_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1106        let body = req.json_body();
1107        validate_required("Rule", &body["Rule"])?;
1108        let rule_name = body["Rule"].as_str().ok_or_else(|| missing("Rule"))?;
1109        validate_string_length("rule", rule_name, 1, 64)?;
1110        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
1111        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1112        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1113        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
1114        let limit = body["Limit"].as_u64().map(|n| n as usize);
1115        let next_token = body["NextToken"].as_str();
1116
1117        let accounts = self.state.read();
1118        let empty = EventBridgeState::new(&req.account_id, &req.region);
1119        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1120        let bus_name = state.resolve_bus_name(event_bus_name);
1121        let key = (bus_name, rule_name.to_string());
1122
1123        let rule = state.rules.get(&key).ok_or_else(|| {
1124            AwsServiceError::aws_error(
1125                StatusCode::BAD_REQUEST,
1126                "ResourceNotFoundException",
1127                format!("Rule {rule_name} does not exist."),
1128            )
1129        })?;
1130
1131        let all_targets = &rule.targets;
1132        let start = next_token
1133            .and_then(|t| t.parse::<usize>().ok())
1134            .unwrap_or(0)
1135            .min(all_targets.len());
1136        let slice = &all_targets[start..];
1137
1138        let (page, new_next_token) = if let Some(lim) = limit {
1139            if slice.len() > lim {
1140                (&slice[..lim], Some((start + lim).to_string()))
1141            } else {
1142                (slice, None)
1143            }
1144        } else {
1145            (slice, None)
1146        };
1147
1148        let targets: Vec<Value> = page.iter().map(target_to_json).collect();
1149
1150        let mut resp = json!({ "Targets": targets });
1151        if let Some(token) = new_next_token {
1152            resp["NextToken"] = json!(token);
1153        }
1154
1155        Ok(AwsResponse::ok_json(resp))
1156    }
1157
1158    fn list_rule_names_by_target(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1159        let body = req.json_body();
1160        validate_required("TargetArn", &body["TargetArn"])?;
1161        let target_arn = body["TargetArn"]
1162            .as_str()
1163            .ok_or_else(|| missing("TargetArn"))?;
1164        validate_string_length("targetArn", target_arn, 1, 1600)?;
1165        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
1166        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1167        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1168        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
1169        let limit = body["Limit"].as_u64().map(|n| n as usize);
1170        let next_token = body["NextToken"].as_str();
1171
1172        let accounts = self.state.read();
1173        let empty = EventBridgeState::new(&req.account_id, &req.region);
1174        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1175        let bus_name = state.resolve_bus_name(event_bus_name);
1176
1177        // Deduplicate rule names
1178        let mut rule_names: Vec<String> = Vec::new();
1179        for rule in state.rules.values() {
1180            if rule.event_bus_name == bus_name
1181                && rule.targets.iter().any(|t| t.arn == target_arn)
1182                && !rule_names.contains(&rule.name)
1183            {
1184                rule_names.push(rule.name.clone());
1185            }
1186        }
1187        rule_names.sort();
1188
1189        let start = next_token
1190            .and_then(|t| t.parse::<usize>().ok())
1191            .unwrap_or(0)
1192            .min(rule_names.len());
1193        let slice = &rule_names[start..];
1194
1195        let (page, new_next_token) = if let Some(lim) = limit {
1196            if slice.len() > lim {
1197                (&slice[..lim], Some((start + lim).to_string()))
1198            } else {
1199                (slice, None)
1200            }
1201        } else {
1202            (slice, None)
1203        };
1204
1205        let mut resp = json!({ "RuleNames": page });
1206        if let Some(token) = new_next_token {
1207            resp["NextToken"] = json!(token);
1208        }
1209
1210        Ok(AwsResponse::ok_json(resp))
1211    }
1212
1213    // ─── Partner Event Sources ────────────���───────────────────────────
1214
1215    fn create_partner_event_source(
1216        &self,
1217        req: &AwsRequest,
1218    ) -> Result<AwsResponse, AwsServiceError> {
1219        let body = req.json_body();
1220        validate_required("Name", &body["Name"])?;
1221        let name = body["Name"]
1222            .as_str()
1223            .ok_or_else(|| missing("Name"))?
1224            .to_string();
1225        validate_string_length("name", &name, 1, 256)?;
1226        validate_required("Account", &body["Account"])?;
1227        let account = body["Account"]
1228            .as_str()
1229            .ok_or_else(|| missing("Account"))?
1230            .to_string();
1231        validate_string_length("account", &account, 12, 12)?;
1232
1233        let mut accounts = self.state.write();
1234        let state = accounts.get_or_create(&req.account_id);
1235        if state.partner_event_sources.contains_key(&name) {
1236            return Err(AwsServiceError::aws_error(
1237                StatusCode::CONFLICT,
1238                "ResourceAlreadyExistsException",
1239                format!("Partner event source {name} already exists."),
1240            ));
1241        }
1242        let arn = format!(
1243            "arn:aws:events:{}::event-source/aws.partner/{}",
1244            state.region, name
1245        );
1246        let now = Utc::now();
1247        let ps = PartnerEventSource {
1248            name: name.clone(),
1249            arn: arn.clone(),
1250            account,
1251            creation_time: now,
1252            expiration_time: None,
1253            state: "ACTIVE".to_string(),
1254        };
1255        state.partner_event_sources.insert(name.clone(), ps);
1256
1257        Ok(AwsResponse::ok_json(json!({ "EventSourceArn": arn })))
1258    }
1259
1260    fn delete_partner_event_source(
1261        &self,
1262        req: &AwsRequest,
1263    ) -> Result<AwsResponse, AwsServiceError> {
1264        let body = req.json_body();
1265        validate_required("Name", &body["Name"])?;
1266        let name = body["Name"]
1267            .as_str()
1268            .ok_or_else(|| missing("Name"))?
1269            .to_string();
1270        validate_required("Account", &body["Account"])?;
1271        let account = body["Account"]
1272            .as_str()
1273            .ok_or_else(|| missing("Account"))?
1274            .to_string();
1275
1276        let mut accounts = self.state.write();
1277        let state = accounts.get_or_create(&req.account_id);
1278        match state.partner_event_sources.get(&name) {
1279            Some(ps) if ps.account == account => {
1280                state.partner_event_sources.remove(&name);
1281            }
1282            Some(_) => {
1283                return Err(AwsServiceError::aws_error(
1284                    StatusCode::NOT_FOUND,
1285                    "ResourceNotFoundException",
1286                    format!("Partner event source {name} does not exist for account {account}."),
1287                ));
1288            }
1289            None => {
1290                return Err(AwsServiceError::aws_error(
1291                    StatusCode::NOT_FOUND,
1292                    "ResourceNotFoundException",
1293                    format!("Partner event source {name} does not exist."),
1294                ));
1295            }
1296        }
1297
1298        Ok(AwsResponse::ok_json(json!({})))
1299    }
1300
1301    fn describe_partner_event_source(
1302        &self,
1303        req: &AwsRequest,
1304    ) -> Result<AwsResponse, AwsServiceError> {
1305        let body = req.json_body();
1306        validate_required("Name", &body["Name"])?;
1307        let name = body["Name"]
1308            .as_str()
1309            .ok_or_else(|| missing("Name"))?
1310            .to_string();
1311        validate_string_length("name", &name, 1, 256)?;
1312
1313        let accounts = self.state.read();
1314        let empty = EventBridgeState::new(&req.account_id, &req.region);
1315        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1316        let ps = state.partner_event_sources.get(&name).ok_or_else(|| {
1317            AwsServiceError::aws_error(
1318                StatusCode::NOT_FOUND,
1319                "ResourceNotFoundException",
1320                format!("Partner event source {name} does not exist."),
1321            )
1322        })?;
1323
1324        Ok(AwsResponse::ok_json(json!({
1325            "Arn": ps.arn,
1326            "Name": ps.name,
1327        })))
1328    }
1329
1330    fn list_partner_event_sources(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1331        let body = req.json_body();
1332        validate_required("namePrefix", &body["NamePrefix"])?;
1333        let name_prefix = body["NamePrefix"]
1334            .as_str()
1335            .ok_or_else(|| missing("NamePrefix"))?;
1336        validate_string_length("namePrefix", name_prefix, 1, 256)?;
1337        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1338        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1339        let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
1340
1341        let accounts = self.state.read();
1342        let empty = EventBridgeState::new(&req.account_id, &req.region);
1343        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1344        let all: Vec<Value> = state
1345            .partner_event_sources
1346            .values()
1347            .filter(|ps| ps.name.starts_with(name_prefix))
1348            .map(|ps| {
1349                json!({
1350                    "Arn": ps.arn,
1351                    "Name": ps.name,
1352                })
1353            })
1354            .collect();
1355
1356        let (sources, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
1357        let mut resp = json!({ "PartnerEventSources": sources });
1358        if let Some(token) = next_token {
1359            resp["NextToken"] = json!(token);
1360        }
1361
1362        Ok(AwsResponse::ok_json(resp))
1363    }
1364
1365    fn list_partner_event_source_accounts(
1366        &self,
1367        req: &AwsRequest,
1368    ) -> Result<AwsResponse, AwsServiceError> {
1369        let body = req.json_body();
1370        validate_required("EventSourceName", &body["EventSourceName"])?;
1371        let event_source_name = body["EventSourceName"]
1372            .as_str()
1373            .ok_or_else(|| missing("EventSourceName"))?;
1374        validate_string_length("eventSourceName", event_source_name, 1, 256)?;
1375        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1376        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1377
1378        let accounts = self.state.read();
1379        let empty = EventBridgeState::new(&req.account_id, &req.region);
1380        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1381        let accounts: Vec<Value> = state
1382            .partner_event_sources
1383            .values()
1384            .filter(|ps| ps.name == event_source_name)
1385            .map(|ps| json!({ "Account": ps.account }))
1386            .collect();
1387
1388        Ok(AwsResponse::ok_json(json!({
1389            "PartnerEventSourceAccounts": accounts
1390        })))
1391    }
1392
1393    fn activate_event_source(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1394        let body = req.json_body();
1395        validate_required("Name", &body["Name"])?;
1396        let name = body["Name"]
1397            .as_str()
1398            .ok_or_else(|| missing("Name"))?
1399            .to_string();
1400
1401        let mut accounts = self.state.write();
1402        let state = accounts.get_or_create(&req.account_id);
1403        let ps = state.partner_event_sources.get_mut(&name).ok_or_else(|| {
1404            AwsServiceError::aws_error(
1405                StatusCode::NOT_FOUND,
1406                "ResourceNotFoundException",
1407                format!("Event source {name} does not exist."),
1408            )
1409        })?;
1410        ps.state = "ACTIVE".to_string();
1411
1412        Ok(AwsResponse::ok_json(json!({})))
1413    }
1414
1415    fn deactivate_event_source(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1416        let body = req.json_body();
1417        validate_required("Name", &body["Name"])?;
1418        let name = body["Name"]
1419            .as_str()
1420            .ok_or_else(|| missing("Name"))?
1421            .to_string();
1422
1423        let mut accounts = self.state.write();
1424        let state = accounts.get_or_create(&req.account_id);
1425        let ps = state.partner_event_sources.get_mut(&name).ok_or_else(|| {
1426            AwsServiceError::aws_error(
1427                StatusCode::NOT_FOUND,
1428                "ResourceNotFoundException",
1429                format!("Event source {name} does not exist."),
1430            )
1431        })?;
1432        ps.state = "INACTIVE".to_string();
1433
1434        Ok(AwsResponse::ok_json(json!({})))
1435    }
1436
1437    fn describe_event_source(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1438        let body = req.json_body();
1439        validate_required("Name", &body["Name"])?;
1440        let name = body["Name"]
1441            .as_str()
1442            .ok_or_else(|| missing("Name"))?
1443            .to_string();
1444
1445        let accounts = self.state.read();
1446        let empty = EventBridgeState::new(&req.account_id, &req.region);
1447        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1448        let ps = state.partner_event_sources.get(&name).ok_or_else(|| {
1449            AwsServiceError::aws_error(
1450                StatusCode::NOT_FOUND,
1451                "ResourceNotFoundException",
1452                format!("Event source {name} does not exist."),
1453            )
1454        })?;
1455
1456        Ok(AwsResponse::ok_json(json!({
1457            "Arn": ps.arn,
1458            "Name": ps.name,
1459            "CreatedBy": ps.account,
1460            "CreationTime": ps.creation_time.timestamp() as f64,
1461            "State": ps.state,
1462        })))
1463    }
1464
1465    fn list_event_sources(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1466        let body = req.json_body();
1467        validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 256)?;
1468        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1469        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1470        let name_prefix = body["NamePrefix"].as_str();
1471        let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
1472
1473        let accounts = self.state.read();
1474        let empty = EventBridgeState::new(&req.account_id, &req.region);
1475        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1476        let all: Vec<Value> = state
1477            .partner_event_sources
1478            .values()
1479            .filter(|ps| match name_prefix {
1480                Some(prefix) => ps.name.starts_with(prefix),
1481                None => true,
1482            })
1483            .map(|ps| {
1484                json!({
1485                    "Arn": ps.arn,
1486                    "Name": ps.name,
1487                    "CreatedBy": ps.account,
1488                    "CreationTime": ps.creation_time.timestamp() as f64,
1489                    "State": ps.state,
1490                })
1491            })
1492            .collect();
1493
1494        let (sources, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
1495        let mut resp = json!({ "EventSources": sources });
1496        if let Some(token) = next_token {
1497            resp["NextToken"] = json!(token);
1498        }
1499
1500        Ok(AwsResponse::ok_json(resp))
1501    }
1502
1503    fn put_partner_events(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1504        let body = req.json_body();
1505        validate_required("Entries", &body["Entries"])?;
1506        let entries = body["Entries"]
1507            .as_array()
1508            .ok_or_else(|| missing("Entries"))?;
1509
1510        let mut result_entries = Vec::new();
1511        for _entry in entries {
1512            let event_id = uuid::Uuid::new_v4().to_string();
1513            result_entries.push(json!({ "EventId": event_id }));
1514        }
1515
1516        Ok(AwsResponse::ok_json(json!({
1517            "FailedEntryCount": 0,
1518            "Entries": result_entries,
1519        })))
1520    }
1521
1522    // ─── TestEventPattern ────────────────────────────────────────────────
1523
1524    fn test_event_pattern(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1525        let body = req.json_body();
1526        validate_required("EventPattern", &body["EventPattern"])?;
1527        validate_required("Event", &body["Event"])?;
1528        let event_pattern = body["EventPattern"]
1529            .as_str()
1530            .ok_or_else(|| missing("EventPattern"))?;
1531        let event_str = body["Event"].as_str().ok_or_else(|| missing("Event"))?;
1532
1533        // Parse the event JSON
1534        let event: Value = serde_json::from_str(event_str).map_err(|_| {
1535            AwsServiceError::aws_error(
1536                StatusCode::BAD_REQUEST,
1537                "InvalidEventPatternException",
1538                "Event is not valid JSON.",
1539            )
1540        })?;
1541
1542        // Parse the pattern JSON
1543        let _pattern: Value = serde_json::from_str(event_pattern).map_err(|_| {
1544            AwsServiceError::aws_error(
1545                StatusCode::BAD_REQUEST,
1546                "InvalidEventPatternException",
1547                "Event pattern is not valid JSON.",
1548            )
1549        })?;
1550
1551        let source = event["source"].as_str().unwrap_or("");
1552        let detail_type = event["detail-type"].as_str().unwrap_or("");
1553        let detail = event
1554            .get("detail")
1555            .map(|v| serde_json::to_string(v).unwrap_or_default())
1556            .unwrap_or_else(|| "{}".to_string());
1557        let account = event["account"].as_str().unwrap_or("");
1558        let region = event["region"].as_str().unwrap_or("");
1559        let resources: Vec<String> = event["resources"]
1560            .as_array()
1561            .map(|arr| {
1562                arr.iter()
1563                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
1564                    .collect()
1565            })
1566            .unwrap_or_default();
1567
1568        let result = matches_pattern(
1569            Some(event_pattern),
1570            source,
1571            detail_type,
1572            &detail,
1573            account,
1574            region,
1575            &resources,
1576        );
1577
1578        Ok(AwsResponse::ok_json(json!({ "Result": result })))
1579    }
1580
1581    // ─── UpdateEventBus ─────────────────────────────────────────────────
1582
1583    fn update_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1584        let body = req.json_body();
1585        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
1586        validate_optional_string_length(
1587            "kmsKeyIdentifier",
1588            body["KmsKeyIdentifier"].as_str(),
1589            0,
1590            2048,
1591        )?;
1592        let name = body["Name"].as_str().unwrap_or("default");
1593
1594        let mut accounts = self.state.write();
1595        let state = accounts.get_or_create(&req.account_id);
1596        let bus = state.buses.get_mut(name).ok_or_else(|| {
1597            AwsServiceError::aws_error(
1598                StatusCode::BAD_REQUEST,
1599                "ResourceNotFoundException",
1600                format!("Event bus {name} does not exist."),
1601            )
1602        })?;
1603
1604        if let Some(desc) = body["Description"].as_str() {
1605            bus.description = Some(desc.to_string());
1606        }
1607        if let Some(kms) = body["KmsKeyIdentifier"].as_str() {
1608            bus.kms_key_identifier = Some(kms.to_string());
1609        }
1610        if let Some(dlc) = body.get("DeadLetterConfig") {
1611            bus.dead_letter_config = Some(dlc.clone());
1612        }
1613        bus.last_modified_time = Utc::now();
1614
1615        let arn = bus.arn.clone();
1616        let bus_name = bus.name.clone();
1617
1618        Ok(AwsResponse::ok_json(json!({
1619            "Arn": arn,
1620            "Name": bus_name,
1621        })))
1622    }
1623
1624    // ─── Endpoint Operations ────────────────────────────────────────────
1625
1626    fn create_endpoint(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1627        let body = req.json_body();
1628        validate_required("Name", &body["Name"])?;
1629        let name = body["Name"]
1630            .as_str()
1631            .ok_or_else(|| missing("Name"))?
1632            .to_string();
1633        validate_string_length("name", &name, 1, 64)?;
1634        validate_required("RoutingConfig", &body["RoutingConfig"])?;
1635        validate_required("EventBuses", &body["EventBuses"])?;
1636
1637        let description = body["Description"].as_str().map(|s| s.to_string());
1638        let routing_config = body["RoutingConfig"].clone();
1639        let replication_config = body.get("ReplicationConfig").cloned();
1640        let event_buses = body["EventBuses"].as_array().cloned().unwrap_or_default();
1641        let role_arn = body["RoleArn"].as_str().map(|s| s.to_string());
1642
1643        let mut accounts = self.state.write();
1644        let state = accounts.get_or_create(&req.account_id);
1645        if state.endpoints.contains_key(&name) {
1646            return Err(AwsServiceError::aws_error(
1647                StatusCode::CONFLICT,
1648                "ResourceAlreadyExistsException",
1649                format!("Endpoint {name} already exists."),
1650            ));
1651        }
1652
1653        let endpoint_id = format!("{}.abc123", name);
1654        let arn = format!(
1655            "arn:aws:events:{}:{}:endpoint/{}",
1656            req.region, state.account_id, name
1657        );
1658        let endpoint_url = format!(
1659            "https://{}.endpoint.events.{}.amazonaws.com",
1660            endpoint_id, req.region
1661        );
1662        let now = Utc::now();
1663
1664        let endpoint = Endpoint {
1665            name: name.clone(),
1666            arn: arn.clone(),
1667            endpoint_id: endpoint_id.clone(),
1668            endpoint_url: Some(endpoint_url),
1669            description,
1670            routing_config: routing_config.clone(),
1671            replication_config: replication_config.clone(),
1672            event_buses: event_buses.clone(),
1673            role_arn: role_arn.clone(),
1674            state: "ACTIVE".to_string(),
1675            creation_time: now,
1676            last_modified_time: now,
1677        };
1678        state.endpoints.insert(name.clone(), endpoint);
1679
1680        let mut resp = json!({
1681            "Name": name,
1682            "Arn": arn,
1683            "State": "ACTIVE",
1684            "RoutingConfig": routing_config,
1685            "EventBuses": event_buses,
1686        });
1687        if let Some(ref rc) = replication_config {
1688            resp["ReplicationConfig"] = rc.clone();
1689        }
1690        if let Some(ref ra) = role_arn {
1691            resp["RoleArn"] = json!(ra);
1692        }
1693
1694        Ok(AwsResponse::ok_json(resp))
1695    }
1696
1697    fn delete_endpoint(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1698        let body = req.json_body();
1699        validate_required("Name", &body["Name"])?;
1700        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
1701
1702        let mut accounts = self.state.write();
1703        let state = accounts.get_or_create(&req.account_id);
1704        state.endpoints.remove(name).ok_or_else(|| {
1705            AwsServiceError::aws_error(
1706                StatusCode::BAD_REQUEST,
1707                "ResourceNotFoundException",
1708                format!("Endpoint '{name}' does not exist."),
1709            )
1710        })?;
1711
1712        Ok(AwsResponse::ok_json(json!({})))
1713    }
1714
1715    fn describe_endpoint(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1716        let body = req.json_body();
1717        validate_required("Name", &body["Name"])?;
1718        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
1719
1720        let accounts = self.state.read();
1721        let empty = EventBridgeState::new(&req.account_id, &req.region);
1722        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1723        let ep = state.endpoints.get(name).ok_or_else(|| {
1724            AwsServiceError::aws_error(
1725                StatusCode::BAD_REQUEST,
1726                "ResourceNotFoundException",
1727                format!("Endpoint '{name}' does not exist."),
1728            )
1729        })?;
1730
1731        let mut resp = json!({
1732            "Name": ep.name,
1733            "Arn": ep.arn,
1734            "EndpointId": ep.endpoint_id,
1735            "State": ep.state,
1736            "RoutingConfig": ep.routing_config,
1737            "EventBuses": ep.event_buses,
1738            "CreationTime": ep.creation_time.timestamp() as f64,
1739            "LastModifiedTime": ep.last_modified_time.timestamp() as f64,
1740        });
1741        if let Some(ref url) = ep.endpoint_url {
1742            resp["EndpointUrl"] = json!(url);
1743        }
1744        if let Some(ref desc) = ep.description {
1745            resp["Description"] = json!(desc);
1746        }
1747        if let Some(ref rc) = ep.replication_config {
1748            resp["ReplicationConfig"] = rc.clone();
1749        }
1750        if let Some(ref ra) = ep.role_arn {
1751            resp["RoleArn"] = json!(ra);
1752        }
1753
1754        Ok(AwsResponse::ok_json(resp))
1755    }
1756
1757    fn list_endpoints(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1758        let body = req.json_body();
1759        validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 64)?;
1760        validate_optional_string_length("homeRegion", body["HomeRegion"].as_str(), 9, 20)?;
1761        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1762        validate_optional_range_i64("maxResults", body["MaxResults"].as_i64(), 1, 100)?;
1763        let name_prefix = body["NamePrefix"].as_str();
1764        let limit = body["MaxResults"].as_i64().unwrap_or(100) as usize;
1765
1766        let accounts = self.state.read();
1767        let empty = EventBridgeState::new(&req.account_id, &req.region);
1768        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1769        let all: Vec<Value> = state
1770            .endpoints
1771            .values()
1772            .filter(|ep| match name_prefix {
1773                Some(prefix) => ep.name.starts_with(prefix),
1774                None => true,
1775            })
1776            .map(|ep| {
1777                let mut obj = json!({
1778                    "Name": ep.name,
1779                    "Arn": ep.arn,
1780                    "EndpointId": ep.endpoint_id,
1781                    "State": ep.state,
1782                    "RoutingConfig": ep.routing_config,
1783                    "EventBuses": ep.event_buses,
1784                    "CreationTime": ep.creation_time.timestamp() as f64,
1785                    "LastModifiedTime": ep.last_modified_time.timestamp() as f64,
1786                });
1787                if let Some(ref url) = ep.endpoint_url {
1788                    obj["EndpointUrl"] = json!(url);
1789                }
1790                obj
1791            })
1792            .collect();
1793
1794        let (endpoints, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
1795        let mut resp = json!({ "Endpoints": endpoints });
1796        if let Some(token) = next_token {
1797            resp["NextToken"] = json!(token);
1798        }
1799
1800        Ok(AwsResponse::ok_json(resp))
1801    }
1802
1803    fn update_endpoint(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1804        let body = req.json_body();
1805        validate_required("Name", &body["Name"])?;
1806        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
1807
1808        let mut accounts = self.state.write();
1809        let state = accounts.get_or_create(&req.account_id);
1810        let ep = state.endpoints.get_mut(name).ok_or_else(|| {
1811            AwsServiceError::aws_error(
1812                StatusCode::BAD_REQUEST,
1813                "ResourceNotFoundException",
1814                format!("Endpoint '{name}' does not exist."),
1815            )
1816        })?;
1817
1818        if let Some(desc) = body["Description"].as_str() {
1819            ep.description = Some(desc.to_string());
1820        }
1821        if !body["RoutingConfig"].is_null() {
1822            ep.routing_config = body["RoutingConfig"].clone();
1823        }
1824        if let Some(rc) = body.get("ReplicationConfig") {
1825            ep.replication_config = Some(rc.clone());
1826        }
1827        if let Some(buses) = body["EventBuses"].as_array() {
1828            ep.event_buses = buses.clone();
1829        }
1830        if let Some(ra) = body["RoleArn"].as_str() {
1831            ep.role_arn = Some(ra.to_string());
1832        }
1833        ep.last_modified_time = Utc::now();
1834
1835        let resp = json!({
1836            "Name": ep.name,
1837            "Arn": ep.arn,
1838            "EndpointId": ep.endpoint_id,
1839            "State": ep.state,
1840            "RoutingConfig": ep.routing_config,
1841            "EventBuses": ep.event_buses,
1842        });
1843
1844        Ok(AwsResponse::ok_json(resp))
1845    }
1846
1847    // ─── DeauthorizeConnection ──────────────────────────────────────────
1848
1849    fn deauthorize_connection(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1850        let body = req.json_body();
1851        validate_required("Name", &body["Name"])?;
1852        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
1853        validate_string_length("name", name, 1, 64)?;
1854
1855        let mut accounts = self.state.write();
1856        let state = accounts.get_or_create(&req.account_id);
1857        let conn = state.connections.get_mut(name).ok_or_else(|| {
1858            AwsServiceError::aws_error(
1859                StatusCode::BAD_REQUEST,
1860                "ResourceNotFoundException",
1861                format!("Connection '{name}' does not exist."),
1862            )
1863        })?;
1864
1865        conn.connection_state = "DEAUTHORIZING".to_string();
1866        conn.last_modified_time = Utc::now();
1867
1868        let resp = json!({
1869            "ConnectionArn": conn.arn,
1870            "ConnectionState": conn.connection_state,
1871            "CreationTime": conn.creation_time.timestamp() as f64,
1872            "LastModifiedTime": conn.last_modified_time.timestamp() as f64,
1873            "LastAuthorizedTime": conn.last_authorized_time.timestamp() as f64,
1874        });
1875
1876        Ok(AwsResponse::ok_json(resp))
1877    }
1878
1879    // ─── PutEvents ──────────────────────────────────────────────────────
1880
1881    fn put_events(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1882        let body = req.json_body();
1883        validate_required("Entries", &body["Entries"])?;
1884        validate_optional_string_length("endpointId", body["EndpointId"].as_str(), 1, 50)?;
1885        let entries = body["Entries"]
1886            .as_array()
1887            .ok_or_else(|| missing("Entries"))?;
1888
1889        // Validate entries count
1890        if entries.is_empty() {
1891            return Err(AwsServiceError::aws_error(
1892                StatusCode::BAD_REQUEST,
1893                "ValidationException",
1894                "1 validation error detected: Value '[PutEventsRequestEntry]' at 'entries' failed to satisfy constraint: Member must have length greater than or equal to 1",
1895            ));
1896        }
1897        if entries.len() > 10 {
1898            return Err(AwsServiceError::aws_error(
1899                StatusCode::BAD_REQUEST,
1900                "ValidationException",
1901                "1 validation error detected: Value '[PutEventsRequestEntry]' at 'entries' failed to satisfy constraint: Member must have length less than or equal to 10",
1902            ));
1903        }
1904
1905        let mut accounts = self.state.write();
1906        let state = accounts.get_or_create(&req.account_id);
1907        let mut result_entries = Vec::new();
1908        let mut events_to_deliver = Vec::new();
1909        let mut failed_count = 0;
1910
1911        for entry in entries {
1912            let source = entry["Source"].as_str().unwrap_or("").to_string();
1913            let detail_type = entry["DetailType"].as_str().unwrap_or("").to_string();
1914            let detail = entry["Detail"].as_str().unwrap_or("").to_string();
1915
1916            if let Err(error) = validate_put_events_entry(&source, &detail_type, &detail) {
1917                failed_count += 1;
1918                result_entries.push(error);
1919                continue;
1920            }
1921
1922            let event_id = uuid::Uuid::new_v4().to_string();
1923            let raw_bus = entry["EventBusName"]
1924                .as_str()
1925                .unwrap_or("default")
1926                .to_string();
1927            let event_bus_name = state.resolve_bus_name(&raw_bus);
1928            let time = parse_put_events_time(&entry["Time"]);
1929            let resources: Vec<String> = entry["Resources"]
1930                .as_array()
1931                .map(|arr| {
1932                    arr.iter()
1933                        .filter_map(|v| v.as_str().map(|s| s.to_string()))
1934                        .collect()
1935                })
1936                .unwrap_or_default();
1937
1938            let event = PutEvent {
1939                event_id: event_id.clone(),
1940                source: source.clone(),
1941                detail_type: detail_type.clone(),
1942                detail: detail.clone(),
1943                event_bus_name: event_bus_name.clone(),
1944                time,
1945                resources: resources.clone(),
1946            };
1947
1948            archive_matching_event(
1949                state,
1950                &event,
1951                &event_bus_name,
1952                &source,
1953                &detail_type,
1954                &detail,
1955                &req.account_id,
1956                &req.region,
1957                &resources,
1958            );
1959
1960            state.events.push(event);
1961
1962            // Find matching rules and their targets
1963            let matching_targets: Vec<EventTarget> = state
1964                .rules
1965                .values()
1966                .filter(|r| {
1967                    r.event_bus_name == event_bus_name
1968                        && r.state == "ENABLED"
1969                        && matches_pattern(
1970                            r.event_pattern.as_deref(),
1971                            &source,
1972                            &detail_type,
1973                            &detail,
1974                            &req.account_id,
1975                            &req.region,
1976                            &resources,
1977                        )
1978                })
1979                .flat_map(|r| r.targets.clone())
1980                .collect();
1981
1982            if !matching_targets.is_empty() {
1983                events_to_deliver.push((
1984                    event_id.clone(),
1985                    source,
1986                    detail_type,
1987                    detail,
1988                    time,
1989                    resources,
1990                    matching_targets,
1991                ));
1992            }
1993
1994            result_entries.push(json!({ "EventId": event_id }));
1995        }
1996
1997        // Drop the lock before delivering
1998        drop(accounts);
1999
2000        // Deliver to targets
2001        for (event_id, source, detail_type, detail, time, resources, targets) in events_to_deliver {
2002            let detail_value: Value = serde_json::from_str(&detail).unwrap_or(json!({}));
2003            let event_json = json!({
2004                "version": "0",
2005                "id": event_id,
2006                "source": source,
2007                "account": req.account_id,
2008                "detail-type": detail_type,
2009                "detail": detail_value,
2010                "time": time.format("%Y-%m-%dT%H:%M:%SZ").to_string(),
2011                "region": req.region,
2012                "resources": resources,
2013            });
2014            let event_str = event_json.to_string();
2015
2016            for target in targets {
2017                let arn = &target.arn;
2018                // Compute the message body, applying InputTransformer if present
2019                let body_str = if let Some(ref transformer) = target.input_transformer {
2020                    apply_input_transformer(transformer, &event_json)
2021                } else if let Some(ref input) = target.input {
2022                    input.clone()
2023                } else if let Some(ref input_path) = target.input_path {
2024                    resolve_json_path(&event_json, input_path)
2025                        .map(|v| v.to_string())
2026                        .unwrap_or_else(|| event_str.clone())
2027                } else {
2028                    event_str.clone()
2029                };
2030
2031                if arn.contains(":sqs:") {
2032                    // Extract FIFO parameters (MessageGroupId)
2033                    let group_id = target
2034                        .sqs_parameters
2035                        .as_ref()
2036                        .and_then(|p| p["MessageGroupId"].as_str())
2037                        .map(|s| s.to_string());
2038                    if group_id.is_some() {
2039                        // FIFO queue: send with group ID but no dedup ID.
2040                        // Queues with content-based dedup will auto-generate one;
2041                        // queues without it will reject the message.
2042                        self.delivery.send_to_sqs_with_attrs(
2043                            arn,
2044                            &body_str,
2045                            &HashMap::new(),
2046                            group_id.as_deref(),
2047                            None,
2048                        );
2049                    } else {
2050                        self.delivery.send_to_sqs(arn, &body_str, &HashMap::new());
2051                    }
2052                } else if arn.contains(":sns:") {
2053                    self.delivery
2054                        .publish_to_sns(arn, &body_str, Some(&detail_type));
2055                } else if arn.contains(":lambda:") {
2056                    tracing::info!(
2057                        function_arn = %arn,
2058                        payload = %body_str,
2059                        "EventBridge delivering to Lambda function"
2060                    );
2061                    let now = Utc::now();
2062                    let mut accounts = self.state.write();
2063                    let state = accounts.get_or_create(&req.account_id);
2064                    state
2065                        .lambda_invocations
2066                        .push(crate::state::LambdaInvocation {
2067                            function_arn: arn.clone(),
2068                            payload: body_str.clone(),
2069                            timestamp: now,
2070                        });
2071                    drop(accounts);
2072                    // Record in Lambda state for cross-service visibility
2073                    if let Some(ref ls) = self.lambda_state {
2074                        ls.write().default_mut().invocations.push(LambdaInvocation {
2075                            function_arn: arn.clone(),
2076                            payload: body_str.clone(),
2077                            timestamp: now,
2078                            source: "aws:events".to_string(),
2079                        });
2080                    }
2081                    // Actually invoke the Lambda function if a container runtime is available
2082                    invoke_lambda_async(
2083                        &self.container_runtime,
2084                        &self.lambda_state,
2085                        arn,
2086                        &body_str,
2087                    );
2088                } else if arn.contains(":logs:") {
2089                    tracing::info!(
2090                        log_group_arn = %arn,
2091                        payload = %body_str,
2092                        "EventBridge delivering to CloudWatch Logs"
2093                    );
2094                    let now = Utc::now();
2095                    let mut accounts = self.state.write();
2096                    let state = accounts.get_or_create(&req.account_id);
2097                    state.log_deliveries.push(crate::state::LogDelivery {
2098                        log_group_arn: arn.clone(),
2099                        payload: body_str.clone(),
2100                        timestamp: now,
2101                    });
2102                    drop(accounts);
2103                    // Write event to CloudWatch Logs state
2104                    if let Some(ref log_state) = self.logs_state {
2105                        deliver_to_logs(log_state, arn, &body_str, now);
2106                    }
2107                } else if arn.contains(":kinesis:") {
2108                    tracing::info!(
2109                        stream_arn = %arn,
2110                        "EventBridge delivering to Kinesis stream"
2111                    );
2112                    // Use event ID as partition key for even distribution
2113                    self.delivery.send_to_kinesis(arn, &body_str, &event_id);
2114                } else if arn.contains(":states:") {
2115                    tracing::info!(
2116                        state_machine_arn = %arn,
2117                        "EventBridge delivering to Step Functions"
2118                    );
2119                    self.delivery.start_stepfunctions_execution(arn, &body_str);
2120                    let mut accounts = self.state.write();
2121                    let state = accounts.get_or_create(&req.account_id);
2122                    state
2123                        .step_function_executions
2124                        .push(crate::state::StepFunctionExecution {
2125                            state_machine_arn: arn.clone(),
2126                            payload: body_str.clone(),
2127                            timestamp: Utc::now(),
2128                        });
2129                } else if arn.contains(":api-destination/") {
2130                    // ApiDestination target: look up destination + connection, then POST
2131                    let accounts = self.state.read();
2132                    let empty = EventBridgeState::new(&req.account_id, &req.region);
2133                    let state = accounts.get(&req.account_id).unwrap_or(&empty);
2134                    let dest = state.api_destinations.values().find(|d| d.arn == *arn);
2135                    if let Some(dest) = dest {
2136                        let url = dest.invocation_endpoint.clone();
2137                        let method = dest.http_method.clone();
2138                        let conn = state
2139                            .connections
2140                            .values()
2141                            .find(|c| c.arn == dest.connection_arn)
2142                            .cloned();
2143                        drop(accounts);
2144
2145                        let payload = body_str.clone();
2146                        tokio::spawn(async move {
2147                            let client = reqwest::Client::new();
2148                            let mut req_builder = match method.as_str() {
2149                                "GET" => client.get(&url),
2150                                "PUT" => client.put(&url),
2151                                "DELETE" => client.delete(&url),
2152                                "PATCH" => client.patch(&url),
2153                                "HEAD" => client.head(&url),
2154                                _ => client.post(&url),
2155                            };
2156                            req_builder = req_builder.header("Content-Type", "application/json");
2157
2158                            // Apply auth from connection
2159                            if let Some(conn) = conn {
2160                                req_builder = apply_connection_auth(req_builder, &conn);
2161                            }
2162
2163                            let result = req_builder.body(payload).send().await;
2164                            if let Err(e) = result {
2165                                tracing::warn!(
2166                                    endpoint = %url,
2167                                    error = %e,
2168                                    "EventBridge ApiDestination delivery failed"
2169                                );
2170                            }
2171                        });
2172                    }
2173                } else if arn.starts_with("https://") || arn.starts_with("http://") {
2174                    // HTTP target — fire-and-forget POST
2175                    let url = arn.clone();
2176                    let payload = body_str.clone();
2177                    tokio::spawn(async move {
2178                        let client = reqwest::Client::new();
2179                        let result = client
2180                            .post(&url)
2181                            .header("Content-Type", "application/json")
2182                            .body(payload)
2183                            .send()
2184                            .await;
2185                        if let Err(e) = result {
2186                            tracing::warn!(
2187                                endpoint = %url,
2188                                error = %e,
2189                                "EventBridge HTTP target delivery failed"
2190                            );
2191                        }
2192                    });
2193                }
2194            }
2195        }
2196
2197        let resp = json!({
2198            "FailedEntryCount": failed_count,
2199            "Entries": result_entries,
2200        });
2201
2202        Ok(AwsResponse::ok_json(resp))
2203    }
2204
2205    // ─── Tagging ────────────────────────────────────────────────────────
2206
2207    fn tag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2208        let body = req.json_body();
2209        validate_required("ResourceARN", &body["ResourceARN"])?;
2210        let arn = body["ResourceARN"]
2211            .as_str()
2212            .ok_or_else(|| missing("ResourceARN"))?;
2213        validate_string_length("resourceARN", arn, 1, 1600)?;
2214        validate_required("Tags", &body["Tags"])?;
2215
2216        let mut accounts = self.state.write();
2217        let state = accounts.get_or_create(&req.account_id);
2218        let tag_map = find_tags_mut(state, arn)?;
2219
2220        fakecloud_core::tags::apply_tags(tag_map, &body, "Tags", "Key", "Value").map_err(|f| {
2221            AwsServiceError::aws_error(
2222                StatusCode::BAD_REQUEST,
2223                "ValidationException",
2224                format!("{f} must be a list"),
2225            )
2226        })?;
2227
2228        Ok(AwsResponse::ok_json(json!({})))
2229    }
2230
2231    fn untag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2232        let body = req.json_body();
2233        validate_required("ResourceARN", &body["ResourceARN"])?;
2234        let arn = body["ResourceARN"]
2235            .as_str()
2236            .ok_or_else(|| missing("ResourceARN"))?;
2237        validate_string_length("resourceARN", arn, 1, 1600)?;
2238        validate_required("TagKeys", &body["TagKeys"])?;
2239
2240        let mut accounts = self.state.write();
2241        let state = accounts.get_or_create(&req.account_id);
2242        let tag_map = find_tags_mut(state, arn)?;
2243
2244        fakecloud_core::tags::remove_tags(tag_map, &body, "TagKeys").map_err(|f| {
2245            AwsServiceError::aws_error(
2246                StatusCode::BAD_REQUEST,
2247                "ValidationException",
2248                format!("{f} must be a list"),
2249            )
2250        })?;
2251
2252        Ok(AwsResponse::ok_json(json!({})))
2253    }
2254
2255    fn list_tags_for_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2256        let body = req.json_body();
2257        validate_required("ResourceARN", &body["ResourceARN"])?;
2258        let arn = body["ResourceARN"]
2259            .as_str()
2260            .ok_or_else(|| missing("ResourceARN"))?;
2261        validate_string_length("resourceARN", arn, 1, 1600)?;
2262
2263        let accounts = self.state.read();
2264        let empty = EventBridgeState::new(&req.account_id, &req.region);
2265        let state = accounts.get(&req.account_id).unwrap_or(&empty);
2266        let tag_map = find_tags(state, arn)?;
2267
2268        let tags = fakecloud_core::tags::tags_to_json(tag_map, "Key", "Value");
2269
2270        Ok(AwsResponse::ok_json(json!({ "Tags": tags })))
2271    }
2272
2273    // ─── Archive Operations ─────────────────────────────────────────────
2274
2275    fn create_archive(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2276        let body = req.json_body();
2277        validate_required("ArchiveName", &body["ArchiveName"])?;
2278        let name = body["ArchiveName"]
2279            .as_str()
2280            .ok_or_else(|| missing("ArchiveName"))?
2281            .to_string();
2282        validate_string_length("archiveName", &name, 1, 48)?;
2283        validate_required("EventSourceArn", &body["EventSourceArn"])?;
2284        let event_source_arn = body["EventSourceArn"]
2285            .as_str()
2286            .ok_or_else(|| missing("EventSourceArn"))?
2287            .to_string();
2288        validate_string_length("eventSourceArn", &event_source_arn, 1, 1600)?;
2289        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2290        validate_optional_string_length("eventPattern", body["EventPattern"].as_str(), 0, 4096)?;
2291        if let Some(rd) = body["RetentionDays"].as_i64() {
2292            validate_range_i64("retentionDays", rd, 0, i64::MAX)?;
2293        }
2294        let description = body["Description"].as_str().map(|s| s.to_string());
2295        let event_pattern = body["EventPattern"].as_str().map(|s| s.to_string());
2296        let retention_days = body["RetentionDays"].as_i64().unwrap_or(0);
2297
2298        // Validate event pattern if provided
2299        if let Some(ref pattern) = event_pattern {
2300            validate_event_pattern(pattern)?;
2301        }
2302
2303        let mut accounts = self.state.write();
2304        let state = accounts.get_or_create(&req.account_id);
2305
2306        // Validate event bus exists
2307        let bus_name = state.resolve_bus_name(&event_source_arn);
2308        if !state.buses.contains_key(&bus_name) {
2309            return Err(AwsServiceError::aws_error(
2310                StatusCode::BAD_REQUEST,
2311                "ResourceNotFoundException",
2312                format!("Event bus {bus_name} does not exist."),
2313            ));
2314        }
2315
2316        // Check duplicate
2317        if state.archives.contains_key(&name) {
2318            return Err(AwsServiceError::aws_error(
2319                StatusCode::BAD_REQUEST,
2320                "ResourceAlreadyExistsException",
2321                format!("Archive {name} already exists."),
2322            ));
2323        }
2324
2325        let now = Utc::now();
2326        let arn = format!(
2327            "arn:aws:events:{}:{}:archive/{}",
2328            req.region, state.account_id, name
2329        );
2330
2331        let archive = Archive {
2332            name: name.clone(),
2333            arn: arn.clone(),
2334            event_source_arn: event_source_arn.clone(),
2335            description,
2336            event_pattern: event_pattern.clone(),
2337            retention_days,
2338            state: "ENABLED".to_string(),
2339            creation_time: now,
2340            event_count: 0,
2341            size_bytes: 0,
2342            events: Vec::new(),
2343        };
2344        state.archives.insert(name.clone(), archive);
2345
2346        // Create the archive rule
2347        let rule_name = format!("Events-Archive-{name}");
2348        let rule_arn = format!(
2349            "arn:aws:events:{}:{}:rule/{}",
2350            req.region, state.account_id, rule_name
2351        );
2352        // Merge archive event pattern with replay-name filter
2353        let rule_event_pattern = {
2354            let mut merged = if let Some(ref ep) = event_pattern {
2355                serde_json::from_str::<Value>(ep).unwrap_or_else(|_| json!({}))
2356            } else {
2357                json!({})
2358            };
2359            if let Some(obj) = merged.as_object_mut() {
2360                obj.insert("replay-name".to_string(), json!([{"exists": false}]));
2361            }
2362            serde_json::to_string(&merged).unwrap_or_default()
2363        };
2364
2365        // Build the archive target with InputTransformer
2366        let archive_target = EventTarget {
2367            id: name.clone(),
2368            arn: format!("arn:aws:events:{}:::", req.region),
2369            input: None,
2370            input_path: None,
2371            input_transformer: Some(json!({
2372                "InputPathsMap": {},
2373                "InputTemplate": format!(
2374                    "{{\"archive-arn\": \"{}\", \"event\": <aws.events.event.json>, \"ingestion-time\": <aws.events.event.ingestion-time>}}",
2375                    arn
2376                )
2377            })),
2378            sqs_parameters: None,
2379        };
2380
2381        let archive_rule = EventRule {
2382            name: rule_name.clone(),
2383            arn: rule_arn,
2384            event_bus_name: bus_name.clone(),
2385            event_pattern: Some(rule_event_pattern),
2386            schedule_expression: None,
2387            state: "ENABLED".to_string(),
2388            description: None,
2389            role_arn: None,
2390            managed_by: Some("prod.vhs.events.aws.internal".to_string()),
2391            created_by: Some(state.account_id.clone()),
2392            targets: vec![archive_target],
2393            tags: HashMap::new(),
2394            last_fired: None,
2395        };
2396        let key = (bus_name, rule_name);
2397        state.rules.insert(key, archive_rule);
2398
2399        Ok(AwsResponse::ok_json(json!({
2400            "ArchiveArn": arn,
2401            "CreationTime": now.timestamp() as f64,
2402            "State": "ENABLED",
2403        })))
2404    }
2405
2406    fn describe_archive(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2407        let body = req.json_body();
2408        validate_required("ArchiveName", &body["ArchiveName"])?;
2409        let name = body["ArchiveName"]
2410            .as_str()
2411            .ok_or_else(|| missing("ArchiveName"))?;
2412        validate_string_length("archiveName", name, 1, 48)?;
2413
2414        let accounts = self.state.read();
2415        let empty = EventBridgeState::new(&req.account_id, &req.region);
2416        let state = accounts.get(&req.account_id).unwrap_or(&empty);
2417        let archive = state.archives.get(name).ok_or_else(|| {
2418            AwsServiceError::aws_error(
2419                StatusCode::BAD_REQUEST,
2420                "ResourceNotFoundException",
2421                format!("Archive {name} does not exist."),
2422            )
2423        })?;
2424
2425        let mut resp = json!({
2426            "ArchiveArn": archive.arn,
2427            "ArchiveName": archive.name,
2428            "CreationTime": archive.creation_time.timestamp() as f64,
2429            "EventCount": archive.event_count,
2430            "EventSourceArn": archive.event_source_arn,
2431            "RetentionDays": archive.retention_days,
2432            "SizeBytes": archive.size_bytes,
2433            "State": archive.state,
2434        });
2435        if let Some(ref desc) = archive.description {
2436            resp["Description"] = json!(desc);
2437        }
2438        if let Some(ref ep) = archive.event_pattern {
2439            resp["EventPattern"] = json!(ep);
2440        }
2441
2442        Ok(AwsResponse::ok_json(resp))
2443    }
2444
2445    fn list_archives(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2446        let body = req.json_body();
2447        validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 48)?;
2448        validate_optional_string_length(
2449            "eventSourceArn",
2450            body["EventSourceArn"].as_str(),
2451            1,
2452            1600,
2453        )?;
2454        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
2455        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
2456        let name_prefix = body["NamePrefix"].as_str();
2457        let source_arn = body["EventSourceArn"].as_str();
2458        let archive_state = body["State"].as_str();
2459
2460        // Validate at most one filter
2461        let filter_count = [
2462            name_prefix.is_some(),
2463            source_arn.is_some(),
2464            archive_state.is_some(),
2465        ]
2466        .iter()
2467        .filter(|&&x| x)
2468        .count();
2469        if filter_count > 1 {
2470            return Err(AwsServiceError::aws_error(
2471                StatusCode::BAD_REQUEST,
2472                "ValidationException",
2473                "At most one filter is allowed for ListArchives. Use either : State, EventSourceArn, or NamePrefix.",
2474            ));
2475        }
2476
2477        // Validate state
2478        if let Some(s) = archive_state {
2479            let valid = [
2480                "ENABLED",
2481                "DISABLED",
2482                "CREATING",
2483                "UPDATING",
2484                "CREATE_FAILED",
2485                "UPDATE_FAILED",
2486            ];
2487            if !valid.contains(&s) {
2488                return Err(AwsServiceError::aws_error(
2489                    StatusCode::BAD_REQUEST,
2490                    "ValidationException",
2491                    format!(
2492                        "1 validation error detected: Value '{}' at 'state' failed to satisfy constraint: Member must satisfy enum value set: [ENABLED, DISABLED, CREATING, UPDATING, CREATE_FAILED, UPDATE_FAILED]",
2493                        s
2494                    ),
2495                ));
2496            }
2497        }
2498
2499        let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
2500
2501        let accounts = self.state.read();
2502        let empty = EventBridgeState::new(&req.account_id, &req.region);
2503        let state = accounts.get(&req.account_id).unwrap_or(&empty);
2504        let all: Vec<Value> = state
2505            .archives
2506            .values()
2507            .filter(|a| {
2508                if let Some(prefix) = name_prefix {
2509                    a.name.starts_with(prefix)
2510                } else if let Some(arn) = source_arn {
2511                    a.event_source_arn == arn
2512                } else if let Some(s) = archive_state {
2513                    a.state == s
2514                } else {
2515                    true
2516                }
2517            })
2518            .map(|a| {
2519                json!({
2520                    "ArchiveName": a.name,
2521                    "CreationTime": a.creation_time.timestamp() as f64,
2522                    "EventCount": a.event_count,
2523                    "EventSourceArn": a.event_source_arn,
2524                    "RetentionDays": a.retention_days,
2525                    "SizeBytes": a.size_bytes,
2526                    "State": a.state,
2527                })
2528            })
2529            .collect();
2530
2531        let (archives, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
2532        let mut resp = json!({ "Archives": archives });
2533        if let Some(token) = next_token {
2534            resp["NextToken"] = json!(token);
2535        }
2536
2537        Ok(AwsResponse::ok_json(resp))
2538    }
2539
2540    fn update_archive(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2541        let body = req.json_body();
2542        validate_required("ArchiveName", &body["ArchiveName"])?;
2543        let name = body["ArchiveName"]
2544            .as_str()
2545            .ok_or_else(|| missing("ArchiveName"))?;
2546        validate_string_length("archiveName", name, 1, 48)?;
2547        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2548        validate_optional_string_length("eventPattern", body["EventPattern"].as_str(), 0, 4096)?;
2549        if let Some(rd) = body["RetentionDays"].as_i64() {
2550            validate_range_i64("retentionDays", rd, 0, i64::MAX)?;
2551        }
2552
2553        // Validate event pattern if provided
2554        if let Some(pattern) = body["EventPattern"].as_str() {
2555            validate_event_pattern(pattern)?;
2556        }
2557
2558        let mut accounts = self.state.write();
2559        let state = accounts.get_or_create(&req.account_id);
2560        let archive = state.archives.get_mut(name).ok_or_else(|| {
2561            AwsServiceError::aws_error(
2562                StatusCode::BAD_REQUEST,
2563                "ResourceNotFoundException",
2564                format!("Archive {name} does not exist."),
2565            )
2566        })?;
2567
2568        if let Some(desc) = body["Description"].as_str() {
2569            archive.description = Some(desc.to_string());
2570        }
2571        if let Some(pattern) = body["EventPattern"].as_str() {
2572            archive.event_pattern = Some(pattern.to_string());
2573        }
2574        if let Some(days) = body["RetentionDays"].as_i64() {
2575            archive.retention_days = days;
2576        }
2577
2578        Ok(AwsResponse::ok_json(json!({
2579            "ArchiveArn": archive.arn,
2580            "CreationTime": archive.creation_time.timestamp() as f64,
2581            "State": archive.state,
2582        })))
2583    }
2584
2585    fn delete_archive(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2586        let body = req.json_body();
2587        validate_required("ArchiveName", &body["ArchiveName"])?;
2588        let name = body["ArchiveName"]
2589            .as_str()
2590            .ok_or_else(|| missing("ArchiveName"))?;
2591        validate_string_length("archiveName", name, 1, 48)?;
2592
2593        let mut accounts = self.state.write();
2594        let state = accounts.get_or_create(&req.account_id);
2595        if !state.archives.contains_key(name) {
2596            return Err(AwsServiceError::aws_error(
2597                StatusCode::BAD_REQUEST,
2598                "ResourceNotFoundException",
2599                format!("Archive {name} does not exist."),
2600            ));
2601        }
2602
2603        state.archives.remove(name);
2604
2605        // Remove the archive rule
2606        let rule_name = format!("Events-Archive-{name}");
2607        state.rules.retain(|k, _| k.1 != rule_name);
2608
2609        Ok(AwsResponse::ok_json(json!({})))
2610    }
2611
2612    // ─── Connection Operations ──────────────────────────────────────────
2613
2614    fn create_connection(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2615        let body = req.json_body();
2616        validate_required("Name", &body["Name"])?;
2617        let name = body["Name"]
2618            .as_str()
2619            .ok_or_else(|| missing("Name"))?
2620            .to_string();
2621        validate_string_length("name", &name, 1, 64)?;
2622        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2623        validate_required("AuthorizationType", &body["AuthorizationType"])?;
2624        let description = body["Description"].as_str().map(|s| s.to_string());
2625        let auth_type = body["AuthorizationType"]
2626            .as_str()
2627            .ok_or_else(|| missing("AuthorizationType"))?
2628            .to_string();
2629        validate_enum(
2630            "authorizationType",
2631            &auth_type,
2632            &["BASIC", "OAUTH_CLIENT_CREDENTIALS", "API_KEY"],
2633        )?;
2634        validate_optional_string_length(
2635            "kmsKeyIdentifier",
2636            body["KmsKeyIdentifier"].as_str(),
2637            0,
2638            2048,
2639        )?;
2640        validate_required("AuthParameters", &body["AuthParameters"])?;
2641        let auth_params = body["AuthParameters"].clone();
2642
2643        let mut accounts = self.state.write();
2644        let state = accounts.get_or_create(&req.account_id);
2645        let now = Utc::now();
2646        let conn_uuid = uuid::Uuid::new_v4();
2647        let arn = format!(
2648            "arn:aws:events:{}:{}:connection/{}/{}",
2649            req.region, state.account_id, name, conn_uuid
2650        );
2651        let secret_arn = format!(
2652            "arn:aws:secretsmanager:{}:{}:secret:events!connection/{}/{}",
2653            req.region, state.account_id, name, conn_uuid
2654        );
2655
2656        let conn = Connection {
2657            name: name.clone(),
2658            arn: arn.clone(),
2659            description,
2660            authorization_type: auth_type.clone(),
2661            auth_parameters: auth_params,
2662            connection_state: "AUTHORIZED".to_string(),
2663            secret_arn: secret_arn.clone(),
2664            creation_time: now,
2665            last_modified_time: now,
2666            last_authorized_time: now,
2667        };
2668        state.connections.insert(name, conn);
2669
2670        Ok(AwsResponse::ok_json(json!({
2671            "ConnectionArn": arn,
2672            "ConnectionState": "AUTHORIZED",
2673            "CreationTime": now.timestamp() as f64,
2674            "LastModifiedTime": now.timestamp() as f64,
2675        })))
2676    }
2677
2678    fn describe_connection(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2679        let body = req.json_body();
2680        validate_required("Name", &body["Name"])?;
2681        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
2682        validate_string_length("name", name, 1, 64)?;
2683
2684        let accounts = self.state.read();
2685        let empty = EventBridgeState::new(&req.account_id, &req.region);
2686        let state = accounts.get(&req.account_id).unwrap_or(&empty);
2687        let conn = state.connections.get(name).ok_or_else(|| {
2688            AwsServiceError::aws_error(
2689                StatusCode::BAD_REQUEST,
2690                "ResourceNotFoundException",
2691                format!("Connection '{name}' does not exist."),
2692            )
2693        })?;
2694
2695        // Build auth parameters response - strip secrets
2696        let auth_params_response =
2697            build_auth_params_response(&conn.authorization_type, &conn.auth_parameters);
2698
2699        let mut resp = json!({
2700            "ConnectionArn": conn.arn,
2701            "Name": conn.name,
2702            "AuthorizationType": conn.authorization_type,
2703            "AuthParameters": auth_params_response,
2704            "ConnectionState": conn.connection_state,
2705            "SecretArn": conn.secret_arn,
2706            "CreationTime": conn.creation_time.timestamp() as f64,
2707            "LastModifiedTime": conn.last_modified_time.timestamp() as f64,
2708            "LastAuthorizedTime": conn.last_authorized_time.timestamp() as f64,
2709        });
2710        if let Some(ref desc) = conn.description {
2711            resp["Description"] = json!(desc);
2712        }
2713
2714        Ok(AwsResponse::ok_json(resp))
2715    }
2716
2717    fn list_connections(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2718        let body = req.json_body();
2719        validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 64)?;
2720        validate_optional_enum(
2721            "connectionState",
2722            body["ConnectionState"].as_str(),
2723            &[
2724                "CREATING",
2725                "UPDATING",
2726                "DELETING",
2727                "AUTHORIZED",
2728                "DEAUTHORIZED",
2729                "AUTHORIZING",
2730                "DEAUTHORIZING",
2731                "ACTIVE",
2732                "FAILED_CONNECTIVITY",
2733            ],
2734        )?;
2735        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
2736        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
2737
2738        let name_prefix = body["NamePrefix"].as_str();
2739        let connection_state = body["ConnectionState"].as_str();
2740        let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
2741
2742        let accounts = self.state.read();
2743        let empty = EventBridgeState::new(&req.account_id, &req.region);
2744        let state = accounts.get(&req.account_id).unwrap_or(&empty);
2745        let all: Vec<Value> = state
2746            .connections
2747            .values()
2748            .filter(|c| {
2749                if let Some(prefix) = name_prefix {
2750                    if !c.name.starts_with(prefix) {
2751                        return false;
2752                    }
2753                }
2754                if let Some(cs) = connection_state {
2755                    if c.connection_state != cs {
2756                        return false;
2757                    }
2758                }
2759                true
2760            })
2761            .map(|c| {
2762                json!({
2763                    "ConnectionArn": c.arn,
2764                    "Name": c.name,
2765                    "AuthorizationType": c.authorization_type,
2766                    "ConnectionState": c.connection_state,
2767                    "CreationTime": c.creation_time.timestamp() as f64,
2768                    "LastModifiedTime": c.last_modified_time.timestamp() as f64,
2769                    "LastAuthorizedTime": c.last_authorized_time.timestamp() as f64,
2770                })
2771            })
2772            .collect();
2773
2774        let (conns, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
2775        let mut resp = json!({ "Connections": conns });
2776        if let Some(token) = next_token {
2777            resp["NextToken"] = json!(token);
2778        }
2779
2780        Ok(AwsResponse::ok_json(resp))
2781    }
2782
2783    fn update_connection(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2784        let body = req.json_body();
2785        validate_required("Name", &body["Name"])?;
2786        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
2787        validate_string_length("name", name, 1, 64)?;
2788        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2789        validate_optional_enum(
2790            "authorizationType",
2791            body["AuthorizationType"].as_str(),
2792            &["BASIC", "OAUTH_CLIENT_CREDENTIALS", "API_KEY"],
2793        )?;
2794
2795        let mut accounts = self.state.write();
2796        let state = accounts.get_or_create(&req.account_id);
2797        let conn = state.connections.get_mut(name).ok_or_else(|| {
2798            AwsServiceError::aws_error(
2799                StatusCode::BAD_REQUEST,
2800                "ResourceNotFoundException",
2801                format!("Connection '{name}' does not exist."),
2802            )
2803        })?;
2804
2805        if let Some(desc) = body["Description"].as_str() {
2806            conn.description = Some(desc.to_string());
2807        }
2808        if let Some(auth_type) = body["AuthorizationType"].as_str() {
2809            conn.authorization_type = auth_type.to_string();
2810        }
2811        if body.get("AuthParameters").is_some() {
2812            conn.auth_parameters = body["AuthParameters"].clone();
2813        }
2814        conn.last_modified_time = Utc::now();
2815
2816        Ok(AwsResponse::ok_json(json!({
2817            "ConnectionArn": conn.arn,
2818            "ConnectionState": conn.connection_state,
2819            "CreationTime": conn.creation_time.timestamp() as f64,
2820            "LastModifiedTime": conn.last_modified_time.timestamp() as f64,
2821            "LastAuthorizedTime": conn.last_authorized_time.timestamp() as f64,
2822        })))
2823    }
2824
2825    fn delete_connection(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2826        let body = req.json_body();
2827        validate_required("Name", &body["Name"])?;
2828        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
2829        validate_string_length("name", name, 1, 64)?;
2830
2831        let mut accounts = self.state.write();
2832        let state = accounts.get_or_create(&req.account_id);
2833        let conn = state.connections.remove(name).ok_or_else(|| {
2834            AwsServiceError::aws_error(
2835                StatusCode::BAD_REQUEST,
2836                "ResourceNotFoundException",
2837                format!("Connection '{name}' does not exist."),
2838            )
2839        })?;
2840
2841        Ok(AwsResponse::ok_json(json!({
2842            "ConnectionArn": conn.arn,
2843            "ConnectionState": conn.connection_state,
2844            "CreationTime": conn.creation_time.timestamp() as f64,
2845            "LastModifiedTime": conn.last_modified_time.timestamp() as f64,
2846            "LastAuthorizedTime": conn.last_authorized_time.timestamp() as f64,
2847        })))
2848    }
2849
2850    // ─── API Destination Operations ─────────────────────────────────────
2851
2852    fn create_api_destination(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2853        let body = req.json_body();
2854        validate_required("Name", &body["Name"])?;
2855        let name = body["Name"]
2856            .as_str()
2857            .ok_or_else(|| missing("Name"))?
2858            .to_string();
2859        validate_string_length("name", &name, 1, 64)?;
2860        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2861        validate_required("ConnectionArn", &body["ConnectionArn"])?;
2862        let description = body["Description"].as_str().map(|s| s.to_string());
2863        let connection_arn = body["ConnectionArn"]
2864            .as_str()
2865            .ok_or_else(|| missing("ConnectionArn"))?
2866            .to_string();
2867        validate_string_length("connectionArn", &connection_arn, 1, 1600)?;
2868        validate_required("InvocationEndpoint", &body["InvocationEndpoint"])?;
2869        let endpoint = body["InvocationEndpoint"]
2870            .as_str()
2871            .ok_or_else(|| missing("InvocationEndpoint"))?
2872            .to_string();
2873        validate_string_length("invocationEndpoint", &endpoint, 1, 2048)?;
2874        validate_required("HttpMethod", &body["HttpMethod"])?;
2875        let http_method = body["HttpMethod"]
2876            .as_str()
2877            .ok_or_else(|| missing("HttpMethod"))?
2878            .to_string();
2879        validate_enum(
2880            "httpMethod",
2881            &http_method,
2882            &["POST", "GET", "HEAD", "OPTIONS", "PUT", "PATCH", "DELETE"],
2883        )?;
2884        let rate_limit = body["InvocationRateLimitPerSecond"].as_i64();
2885        if let Some(r) = rate_limit {
2886            validate_range_i64("invocationRateLimitPerSecond", r, 1, i64::MAX)?;
2887        }
2888
2889        let mut accounts = self.state.write();
2890        let state = accounts.get_or_create(&req.account_id);
2891        let now = Utc::now();
2892        let dest_uuid = uuid::Uuid::new_v4();
2893        let arn = format!(
2894            "arn:aws:events:{}:{}:api-destination/{}/{}",
2895            req.region, state.account_id, name, dest_uuid
2896        );
2897
2898        let dest = ApiDestination {
2899            name: name.clone(),
2900            arn: arn.clone(),
2901            description,
2902            connection_arn,
2903            invocation_endpoint: endpoint,
2904            http_method,
2905            invocation_rate_limit_per_second: rate_limit,
2906            state: "ACTIVE".to_string(),
2907            creation_time: now,
2908            last_modified_time: now,
2909        };
2910        state.api_destinations.insert(name, dest);
2911
2912        Ok(AwsResponse::ok_json(json!({
2913            "ApiDestinationArn": arn,
2914            "ApiDestinationState": "ACTIVE",
2915            "CreationTime": now.timestamp() as f64,
2916            "LastModifiedTime": now.timestamp() as f64,
2917        })))
2918    }
2919
2920    fn describe_api_destination(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2921        let body = req.json_body();
2922        validate_required("Name", &body["Name"])?;
2923        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
2924        validate_string_length("name", name, 1, 64)?;
2925
2926        let accounts = self.state.read();
2927        let empty = EventBridgeState::new(&req.account_id, &req.region);
2928        let state = accounts.get(&req.account_id).unwrap_or(&empty);
2929        let dest = state.api_destinations.get(name).ok_or_else(|| {
2930            AwsServiceError::aws_error(
2931                StatusCode::BAD_REQUEST,
2932                "ResourceNotFoundException",
2933                format!("An api-destination '{name}' does not exist."),
2934            )
2935        })?;
2936
2937        let mut resp = json!({
2938            "ApiDestinationArn": dest.arn,
2939            "Name": dest.name,
2940            "ConnectionArn": dest.connection_arn,
2941            "InvocationEndpoint": dest.invocation_endpoint,
2942            "HttpMethod": dest.http_method,
2943            "ApiDestinationState": dest.state,
2944            "CreationTime": dest.creation_time.timestamp() as f64,
2945            "LastModifiedTime": dest.last_modified_time.timestamp() as f64,
2946        });
2947        if let Some(ref desc) = dest.description {
2948            resp["Description"] = json!(desc);
2949        }
2950        if let Some(rate) = dest.invocation_rate_limit_per_second {
2951            resp["InvocationRateLimitPerSecond"] = json!(rate);
2952        }
2953
2954        Ok(AwsResponse::ok_json(resp))
2955    }
2956
2957    fn list_api_destinations(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2958        let body = req.json_body();
2959        validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 64)?;
2960        validate_optional_string_length("connectionArn", body["ConnectionArn"].as_str(), 1, 1600)?;
2961        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
2962        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
2963
2964        let name_prefix = body["NamePrefix"].as_str();
2965        let connection_arn = body["ConnectionArn"].as_str();
2966        let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
2967
2968        let accounts = self.state.read();
2969        let empty = EventBridgeState::new(&req.account_id, &req.region);
2970        let state = accounts.get(&req.account_id).unwrap_or(&empty);
2971        let all: Vec<Value> = state
2972            .api_destinations
2973            .values()
2974            .filter(|d| {
2975                if let Some(prefix) = name_prefix {
2976                    if !d.name.starts_with(prefix) {
2977                        return false;
2978                    }
2979                }
2980                if let Some(arn) = connection_arn {
2981                    if d.connection_arn != arn {
2982                        return false;
2983                    }
2984                }
2985                true
2986            })
2987            .map(|d| {
2988                let mut obj = json!({
2989                    "ApiDestinationArn": d.arn,
2990                    "Name": d.name,
2991                    "ConnectionArn": d.connection_arn,
2992                    "InvocationEndpoint": d.invocation_endpoint,
2993                    "HttpMethod": d.http_method,
2994                    "ApiDestinationState": d.state,
2995                    "CreationTime": d.creation_time.timestamp() as f64,
2996                    "LastModifiedTime": d.last_modified_time.timestamp() as f64,
2997                });
2998                if let Some(rate) = d.invocation_rate_limit_per_second {
2999                    obj["InvocationRateLimitPerSecond"] = json!(rate);
3000                }
3001                obj
3002            })
3003            .collect();
3004
3005        let (dests, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
3006        let mut resp = json!({ "ApiDestinations": dests });
3007        if let Some(token) = next_token {
3008            resp["NextToken"] = json!(token);
3009        }
3010
3011        Ok(AwsResponse::ok_json(resp))
3012    }
3013
3014    fn update_api_destination(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3015        let body = req.json_body();
3016        validate_required("Name", &body["Name"])?;
3017        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
3018        validate_string_length("name", name, 1, 64)?;
3019        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
3020        validate_optional_string_length("connectionArn", body["ConnectionArn"].as_str(), 1, 1600)?;
3021        validate_optional_string_length(
3022            "invocationEndpoint",
3023            body["InvocationEndpoint"].as_str(),
3024            1,
3025            2048,
3026        )?;
3027        validate_optional_enum(
3028            "httpMethod",
3029            body["HttpMethod"].as_str(),
3030            &["POST", "GET", "HEAD", "OPTIONS", "PUT", "PATCH", "DELETE"],
3031        )?;
3032        if let Some(r) = body["InvocationRateLimitPerSecond"].as_i64() {
3033            validate_range_i64("invocationRateLimitPerSecond", r, 1, i64::MAX)?;
3034        }
3035
3036        let mut accounts = self.state.write();
3037        let state = accounts.get_or_create(&req.account_id);
3038        let dest = state.api_destinations.get_mut(name).ok_or_else(|| {
3039            AwsServiceError::aws_error(
3040                StatusCode::BAD_REQUEST,
3041                "ResourceNotFoundException",
3042                format!("An api-destination '{name}' does not exist."),
3043            )
3044        })?;
3045
3046        if let Some(desc) = body["Description"].as_str() {
3047            dest.description = Some(desc.to_string());
3048        }
3049        if let Some(endpoint) = body["InvocationEndpoint"].as_str() {
3050            dest.invocation_endpoint = endpoint.to_string();
3051        }
3052        if let Some(method) = body["HttpMethod"].as_str() {
3053            dest.http_method = method.to_string();
3054        }
3055        if let Some(rate) = body["InvocationRateLimitPerSecond"].as_i64() {
3056            dest.invocation_rate_limit_per_second = Some(rate);
3057        }
3058        if let Some(conn) = body["ConnectionArn"].as_str() {
3059            dest.connection_arn = conn.to_string();
3060        }
3061        dest.last_modified_time = Utc::now();
3062
3063        Ok(AwsResponse::ok_json(json!({
3064            "ApiDestinationArn": dest.arn,
3065            "ApiDestinationState": dest.state,
3066            "CreationTime": dest.creation_time.timestamp() as f64,
3067            "LastModifiedTime": dest.last_modified_time.timestamp() as f64,
3068        })))
3069    }
3070
3071    fn delete_api_destination(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3072        let body = req.json_body();
3073        validate_required("Name", &body["Name"])?;
3074        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
3075        validate_string_length("name", name, 1, 64)?;
3076
3077        let mut accounts = self.state.write();
3078        let state = accounts.get_or_create(&req.account_id);
3079        if !state.api_destinations.contains_key(name) {
3080            return Err(AwsServiceError::aws_error(
3081                StatusCode::BAD_REQUEST,
3082                "ResourceNotFoundException",
3083                format!("An api-destination '{name}' does not exist."),
3084            ));
3085        }
3086        state.api_destinations.remove(name);
3087
3088        Ok(AwsResponse::ok_json(json!({})))
3089    }
3090
3091    // ─── Replay Operations ──────────────────────────────────────────────
3092
3093    fn start_replay(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3094        let input = StartReplayInput::from_body(&req.json_body())?;
3095
3096        let mut accounts = self.state.write();
3097        let state = accounts.get_or_create(&req.account_id);
3098
3099        // Validate event bus + archive, in the order the real service validates them.
3100        let bus_name = state.resolve_bus_name(&input.destination_arn);
3101        if !state.buses.contains_key(&bus_name) {
3102            return Err(AwsServiceError::aws_error(
3103                StatusCode::BAD_REQUEST,
3104                "ResourceNotFoundException",
3105                format!("Event bus {bus_name} does not exist."),
3106            ));
3107        }
3108
3109        let archive_name = input
3110            .event_source_arn
3111            .rsplit_once("archive/")
3112            .map(|(_, n)| n.to_string())
3113            .unwrap_or_default();
3114        let archive = state.archives.get(&archive_name).ok_or_else(|| {
3115            AwsServiceError::aws_error(
3116                StatusCode::BAD_REQUEST,
3117                "ValidationException",
3118                format!(
3119                    "Parameter EventSourceArn is not valid. Reason: Archive {archive_name} does not exist."
3120                ),
3121            )
3122        })?;
3123        let archive_bus = state.resolve_bus_name(&archive.event_source_arn);
3124        if archive_bus != bus_name {
3125            return Err(AwsServiceError::aws_error(
3126                StatusCode::BAD_REQUEST,
3127                "ValidationException",
3128                "Parameter Destination.Arn is not valid. Reason: Cross event bus replay is not permitted.",
3129            ));
3130        }
3131
3132        if input.event_end_time <= input.event_start_time {
3133            return Err(AwsServiceError::aws_error(
3134                StatusCode::BAD_REQUEST,
3135                "ValidationException",
3136                "Parameter EventEndTime is not valid. Reason: EventStartTime must be before EventEndTime.",
3137            ));
3138        }
3139
3140        if state.replays.contains_key(&input.name) {
3141            return Err(AwsServiceError::aws_error(
3142                StatusCode::BAD_REQUEST,
3143                "ResourceAlreadyExistsException",
3144                format!("Replay {} already exists.", input.name),
3145            ));
3146        }
3147
3148        let now = Utc::now();
3149        let arn = format!(
3150            "arn:aws:events:{}:{}:replay/{}",
3151            req.region, state.account_id, input.name
3152        );
3153
3154        let events_to_deliver = collect_replay_events_with_targets(
3155            state,
3156            &archive_name,
3157            &bus_name,
3158            input.event_start_time,
3159            input.event_end_time,
3160            &req.account_id,
3161            &req.region,
3162        );
3163
3164        let replay = Replay {
3165            name: input.name.clone(),
3166            arn: arn.clone(),
3167            description: input.description,
3168            event_source_arn: input.event_source_arn,
3169            destination: input.destination,
3170            event_start_time: input.event_start_time,
3171            event_end_time: input.event_end_time,
3172            state: "COMPLETED".to_string(),
3173            replay_start_time: now,
3174            replay_end_time: Some(now),
3175        };
3176        state.replays.insert(input.name, replay);
3177
3178        drop(accounts);
3179
3180        for (event, targets) in events_to_deliver {
3181            let detail_value: Value = serde_json::from_str(&event.detail).unwrap_or(json!({}));
3182            let event_json = json!({
3183                "version": "0",
3184                "id": event.event_id,
3185                "source": event.source,
3186                "account": req.account_id,
3187                "detail-type": event.detail_type,
3188                "detail": detail_value,
3189                "time": event.time.format("%Y-%m-%dT%H:%M:%SZ").to_string(),
3190                "region": req.region,
3191                "resources": event.resources,
3192                "replay-name": arn,
3193            });
3194            let event_str = event_json.to_string();
3195
3196            for target in targets {
3197                self.deliver_replay_event_to_target(
3198                    &target,
3199                    &event,
3200                    &event_json,
3201                    &event_str,
3202                    &req.account_id,
3203                );
3204            }
3205        }
3206
3207        Ok(AwsResponse::ok_json(json!({
3208            "ReplayArn": arn,
3209            "ReplayStartTime": now.timestamp() as f64,
3210            "State": "STARTING",
3211        })))
3212    }
3213
3214    fn deliver_replay_event_to_target(
3215        &self,
3216        target: &EventTarget,
3217        event: &PutEvent,
3218        event_json: &Value,
3219        event_str: &str,
3220        account_id: &str,
3221    ) {
3222        let target_arn = &target.arn;
3223        let body_str = if let Some(ref transformer) = target.input_transformer {
3224            apply_input_transformer(transformer, event_json)
3225        } else if let Some(ref input) = target.input {
3226            input.clone()
3227        } else if let Some(ref input_path) = target.input_path {
3228            resolve_json_path(event_json, input_path)
3229                .map(|v| v.to_string())
3230                .unwrap_or_else(|| event_str.to_string())
3231        } else {
3232            event_str.to_string()
3233        };
3234
3235        if target_arn.contains(":sqs:") {
3236            let group_id = target
3237                .sqs_parameters
3238                .as_ref()
3239                .and_then(|p| p["MessageGroupId"].as_str())
3240                .map(|s| s.to_string());
3241            if group_id.is_some() {
3242                self.delivery.send_to_sqs_with_attrs(
3243                    target_arn,
3244                    &body_str,
3245                    &HashMap::new(),
3246                    group_id.as_deref(),
3247                    None,
3248                );
3249            } else {
3250                self.delivery
3251                    .send_to_sqs(target_arn, &body_str, &HashMap::new());
3252            }
3253        } else if target_arn.contains(":sns:") {
3254            self.delivery
3255                .publish_to_sns(target_arn, &body_str, Some(&event.detail_type));
3256        } else if target_arn.contains(":lambda:") {
3257            let mut accounts = self.state.write();
3258            let state = accounts.get_or_create(account_id);
3259            state
3260                .lambda_invocations
3261                .push(crate::state::LambdaInvocation {
3262                    function_arn: target_arn.clone(),
3263                    payload: body_str.clone(),
3264                    timestamp: Utc::now(),
3265                });
3266            drop(accounts);
3267            if let Some(ref ls) = self.lambda_state {
3268                ls.write()
3269                    .get_or_create(account_id)
3270                    .invocations
3271                    .push(LambdaInvocation {
3272                        function_arn: target_arn.clone(),
3273                        payload: body_str.clone(),
3274                        timestamp: Utc::now(),
3275                        source: "aws:events".to_string(),
3276                    });
3277            }
3278            invoke_lambda_async(
3279                &self.container_runtime,
3280                &self.lambda_state,
3281                target_arn,
3282                &body_str,
3283            );
3284        } else if target_arn.contains(":logs:") {
3285            let mut accounts = self.state.write();
3286            let state = accounts.get_or_create(account_id);
3287            state.log_deliveries.push(crate::state::LogDelivery {
3288                log_group_arn: target_arn.clone(),
3289                payload: body_str.clone(),
3290                timestamp: Utc::now(),
3291            });
3292            drop(accounts);
3293            if let Some(ref log_state) = self.logs_state {
3294                deliver_to_logs(log_state, target_arn, &body_str, Utc::now());
3295            }
3296        } else if target_arn.contains(":states:") {
3297            self.delivery
3298                .start_stepfunctions_execution(target_arn, &body_str);
3299            let mut accounts = self.state.write();
3300            let state = accounts.get_or_create(account_id);
3301            state
3302                .step_function_executions
3303                .push(crate::state::StepFunctionExecution {
3304                    state_machine_arn: target_arn.clone(),
3305                    payload: body_str.clone(),
3306                    timestamp: Utc::now(),
3307                });
3308        } else if target_arn.starts_with("https://") || target_arn.starts_with("http://") {
3309            let url = target_arn.clone();
3310            let payload = body_str.clone();
3311            tokio::spawn(async move {
3312                let client = reqwest::Client::new();
3313                let _ = client
3314                    .post(&url)
3315                    .header("Content-Type", "application/json")
3316                    .body(payload)
3317                    .send()
3318                    .await;
3319            });
3320        }
3321    }
3322
3323    fn describe_replay(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3324        let body = req.json_body();
3325        validate_required("ReplayName", &body["ReplayName"])?;
3326        let name = body["ReplayName"]
3327            .as_str()
3328            .ok_or_else(|| missing("ReplayName"))?;
3329        validate_string_length("replayName", name, 1, 64)?;
3330
3331        let accounts = self.state.read();
3332        let empty = EventBridgeState::new(&req.account_id, &req.region);
3333        let state = accounts.get(&req.account_id).unwrap_or(&empty);
3334        let replay = state.replays.get(name).ok_or_else(|| {
3335            AwsServiceError::aws_error(
3336                StatusCode::BAD_REQUEST,
3337                "ResourceNotFoundException",
3338                format!("Replay {name} does not exist."),
3339            )
3340        })?;
3341
3342        let mut resp = json!({
3343            "Destination": replay.destination,
3344            "EventSourceArn": replay.event_source_arn,
3345            "EventStartTime": replay.event_start_time.timestamp() as f64,
3346            "EventEndTime": replay.event_end_time.timestamp() as f64,
3347            "ReplayArn": replay.arn,
3348            "ReplayName": replay.name,
3349            "ReplayStartTime": replay.replay_start_time.timestamp() as f64,
3350            "State": replay.state,
3351        });
3352        if let Some(ref desc) = replay.description {
3353            resp["Description"] = json!(desc);
3354        }
3355        if let Some(ref end) = replay.replay_end_time {
3356            resp["ReplayEndTime"] = json!(end.timestamp() as f64);
3357        }
3358
3359        Ok(AwsResponse::ok_json(resp))
3360    }
3361
3362    fn list_replays(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3363        let body = req.json_body();
3364        validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 64)?;
3365        validate_optional_string_length(
3366            "eventSourceArn",
3367            body["EventSourceArn"].as_str(),
3368            1,
3369            1600,
3370        )?;
3371        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
3372        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
3373        let name_prefix = body["NamePrefix"].as_str();
3374        let source_arn = body["EventSourceArn"].as_str();
3375        let replay_state = body["State"].as_str();
3376
3377        // Validate at most one filter
3378        let filter_count = [
3379            name_prefix.is_some(),
3380            source_arn.is_some(),
3381            replay_state.is_some(),
3382        ]
3383        .iter()
3384        .filter(|&&x| x)
3385        .count();
3386        if filter_count > 1 {
3387            return Err(AwsServiceError::aws_error(
3388                StatusCode::BAD_REQUEST,
3389                "ValidationException",
3390                "At most one filter is allowed for ListReplays. Use either : State, EventSourceArn, or NamePrefix.",
3391            ));
3392        }
3393
3394        // Validate state
3395        if let Some(s) = replay_state {
3396            let valid = [
3397                "CANCELLED",
3398                "CANCELLING",
3399                "COMPLETED",
3400                "FAILED",
3401                "RUNNING",
3402                "STARTING",
3403            ];
3404            if !valid.contains(&s) {
3405                return Err(AwsServiceError::aws_error(
3406                    StatusCode::BAD_REQUEST,
3407                    "ValidationException",
3408                    format!(
3409                        "1 validation error detected: Value '{}' at 'state' failed to satisfy constraint: Member must satisfy enum value set: [CANCELLED, CANCELLING, COMPLETED, FAILED, RUNNING, STARTING]",
3410                        s
3411                    ),
3412                ));
3413            }
3414        }
3415
3416        let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
3417
3418        let accounts = self.state.read();
3419        let empty = EventBridgeState::new(&req.account_id, &req.region);
3420        let state = accounts.get(&req.account_id).unwrap_or(&empty);
3421        let all: Vec<Value> = state
3422            .replays
3423            .values()
3424            .filter(|r| {
3425                if let Some(prefix) = name_prefix {
3426                    r.name.starts_with(prefix)
3427                } else if let Some(arn) = source_arn {
3428                    r.event_source_arn == arn
3429                } else if let Some(s) = replay_state {
3430                    r.state == s
3431                } else {
3432                    true
3433                }
3434            })
3435            .map(|r| {
3436                let mut obj = json!({
3437                    "EventSourceArn": r.event_source_arn,
3438                    "EventStartTime": r.event_start_time.timestamp() as f64,
3439                    "EventEndTime": r.event_end_time.timestamp() as f64,
3440                    "ReplayName": r.name,
3441                    "ReplayStartTime": r.replay_start_time.timestamp() as f64,
3442                    "State": r.state,
3443                });
3444                if let Some(ref end) = r.replay_end_time {
3445                    obj["ReplayEndTime"] = json!(end.timestamp() as f64);
3446                }
3447                obj
3448            })
3449            .collect();
3450
3451        let (replays, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
3452        let mut resp = json!({ "Replays": replays });
3453        if let Some(token) = next_token {
3454            resp["NextToken"] = json!(token);
3455        }
3456
3457        Ok(AwsResponse::ok_json(resp))
3458    }
3459
3460    fn cancel_replay(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3461        let body = req.json_body();
3462        validate_required("ReplayName", &body["ReplayName"])?;
3463        let name = body["ReplayName"]
3464            .as_str()
3465            .ok_or_else(|| missing("ReplayName"))?;
3466        validate_string_length("replayName", name, 1, 64)?;
3467
3468        let mut accounts = self.state.write();
3469        let state = accounts.get_or_create(&req.account_id);
3470        let replay = state.replays.get_mut(name).ok_or_else(|| {
3471            AwsServiceError::aws_error(
3472                StatusCode::BAD_REQUEST,
3473                "ResourceNotFoundException",
3474                format!("Replay {name} does not exist."),
3475            )
3476        })?;
3477
3478        // Can only cancel STARTING or RUNNING replays (or COMPLETED in our mock)
3479        if replay.state == "CANCELLED" || replay.state == "CANCELLING" {
3480            return Err(AwsServiceError::aws_error(
3481                StatusCode::BAD_REQUEST,
3482                "IllegalStatusException",
3483                format!("Replay {name} is not in a valid state for this operation."),
3484            ));
3485        }
3486
3487        let arn = replay.arn.clone();
3488        replay.state = "CANCELLED".to_string();
3489
3490        Ok(AwsResponse::ok_json(json!({
3491            "ReplayArn": arn,
3492            "State": "CANCELLING",
3493        })))
3494    }
3495}
3496
3497// ─── Tag Lookup Helpers ─────────────────────────────────────────────────
3498
3499fn find_tags_mut<'a>(
3500    state: &'a mut crate::state::EventBridgeState,
3501    arn: &str,
3502) -> Result<&'a mut HashMap<String, String>, AwsServiceError> {
3503    // Check buses
3504    for bus in state.buses.values_mut() {
3505        if bus.arn == arn {
3506            return Ok(&mut bus.tags);
3507        }
3508    }
3509    // Check rules
3510    for rule in state.rules.values_mut() {
3511        if rule.arn == arn {
3512            return Ok(&mut rule.tags);
3513        }
3514    }
3515
3516    // Parse ARN to give better error messages
3517    let error_msg = if arn.contains(":rule/") {
3518        // Extract rule name and bus from ARN
3519        let parts: Vec<&str> = arn.rsplitn(2, ":rule/").collect();
3520        if let Some(rule_path) = parts.first() {
3521            if let Some((bus, rule_name)) = rule_path.rsplit_once('/') {
3522                format!("Rule {rule_name} does not exist on EventBus {bus}.")
3523            } else {
3524                format!("Rule {} does not exist on EventBus default.", rule_path)
3525            }
3526        } else {
3527            format!("Resource {arn} not found.")
3528        }
3529    } else {
3530        format!("Resource {arn} not found.")
3531    };
3532
3533    Err(AwsServiceError::aws_error(
3534        StatusCode::BAD_REQUEST,
3535        "ResourceNotFoundException",
3536        error_msg,
3537    ))
3538}
3539
3540fn find_tags<'a>(
3541    state: &'a crate::state::EventBridgeState,
3542    arn: &str,
3543) -> Result<&'a HashMap<String, String>, AwsServiceError> {
3544    for bus in state.buses.values() {
3545        if bus.arn == arn {
3546            return Ok(&bus.tags);
3547        }
3548    }
3549    for rule in state.rules.values() {
3550        if rule.arn == arn {
3551            return Ok(&rule.tags);
3552        }
3553    }
3554
3555    let error_msg = if arn.contains(":rule/") {
3556        let parts: Vec<&str> = arn.rsplitn(2, ":rule/").collect();
3557        if let Some(rule_path) = parts.first() {
3558            if let Some((bus, rule_name)) = rule_path.rsplit_once('/') {
3559                format!("Rule {rule_name} does not exist on EventBus {bus}.")
3560            } else {
3561                format!("Rule {} does not exist on EventBus default.", rule_path)
3562            }
3563        } else {
3564            format!("Resource {arn} not found.")
3565        }
3566    } else {
3567        format!("Resource {arn} not found.")
3568    };
3569
3570    Err(AwsServiceError::aws_error(
3571        StatusCode::BAD_REQUEST,
3572        "ResourceNotFoundException",
3573        error_msg,
3574    ))
3575}
3576
3577// ─── Event Pattern Validation ────────────────────────────────────────
3578
3579fn validate_event_pattern(pattern: &str) -> Result<(), AwsServiceError> {
3580    let parsed: Value = serde_json::from_str(pattern).map_err(|_| {
3581        AwsServiceError::aws_error(
3582            StatusCode::BAD_REQUEST,
3583            "InvalidEventPatternException",
3584            "Event pattern is not valid. Reason: Invalid JSON",
3585        )
3586    })?;
3587
3588    validate_pattern_values(&parsed, "")?;
3589    Ok(())
3590}
3591
3592fn validate_pattern_values(value: &Value, path: &str) -> Result<(), AwsServiceError> {
3593    match value {
3594        Value::Object(obj) => {
3595            for (key, val) in obj {
3596                let new_path = if path.is_empty() {
3597                    key.clone()
3598                } else {
3599                    format!("{path}.{key}")
3600                };
3601                match val {
3602                    Value::Object(_) => validate_pattern_values(val, &new_path)?,
3603                    Value::Array(_) => {} // Arrays are fine at leaf level
3604                    _ => {
3605                        return Err(AwsServiceError::aws_error(
3606                            StatusCode::BAD_REQUEST,
3607                            "InvalidEventPatternException",
3608                            format!(
3609                                "Event pattern is not valid. Reason: '{}' must be an object or an array",
3610                                key
3611                            ),
3612                        ));
3613                    }
3614                }
3615            }
3616            Ok(())
3617        }
3618        _ => Ok(()),
3619    }
3620}
3621
3622// ─── Connection Auth Params Response Builder ────────────────────────
3623
3624fn build_auth_params_response(auth_type: &str, params: &Value) -> Value {
3625    match auth_type {
3626        "API_KEY" => {
3627            let mut resp = json!({});
3628            if let Some(api_key) = params.get("ApiKeyAuthParameters") {
3629                resp["ApiKeyAuthParameters"] = json!({
3630                    "ApiKeyName": api_key["ApiKeyName"],
3631                });
3632            }
3633            resp
3634        }
3635        "BASIC" => {
3636            let mut resp = json!({});
3637            if let Some(basic) = params.get("BasicAuthParameters") {
3638                resp["BasicAuthParameters"] = json!({
3639                    "Username": basic["Username"],
3640                });
3641            }
3642            resp
3643        }
3644        "OAUTH_CLIENT_CREDENTIALS" => {
3645            let mut resp = json!({});
3646            if let Some(oauth) = params.get("OAuthParameters") {
3647                resp["OAuthParameters"] = json!({
3648                    "AuthorizationEndpoint": oauth["AuthorizationEndpoint"],
3649                    "HttpMethod": oauth["HttpMethod"],
3650                    "ClientParameters": {
3651                        "ClientID": oauth.get("ClientParameters").and_then(|c| c.get("ClientID")),
3652                    },
3653                });
3654            }
3655            resp
3656        }
3657        _ => params.clone(),
3658    }
3659}
3660
3661// ─── Event Pattern Matching ─────────────────────────────────────────
3662
3663/// Match an event against an EventBridge event pattern.
3664pub fn matches_pattern(
3665    pattern_json: Option<&str>,
3666    source: &str,
3667    detail_type: &str,
3668    detail: &str,
3669    account: &str,
3670    region: &str,
3671    resources: &[String],
3672) -> bool {
3673    let pattern_json = match pattern_json {
3674        Some(p) => p,
3675        None => return true,
3676    };
3677
3678    let pattern: Value = match serde_json::from_str(pattern_json) {
3679        Ok(v) => v,
3680        Err(_) => return false,
3681    };
3682
3683    let pattern_obj = match pattern.as_object() {
3684        Some(o) => o,
3685        None => return false,
3686    };
3687
3688    let detail_value: Value = serde_json::from_str(detail).unwrap_or(json!({}));
3689    let event = json!({
3690        "source": source,
3691        "detail-type": detail_type,
3692        "detail": detail_value,
3693        "account": account,
3694        "region": region,
3695        "resources": resources,
3696    });
3697
3698    for (key, pattern_value) in pattern_obj {
3699        let event_value = &event[key];
3700        if !matches_value(pattern_value, event_value) {
3701            return false;
3702        }
3703    }
3704
3705    true
3706}
3707
3708fn matches_value(pattern: &Value, event_value: &Value) -> bool {
3709    match pattern {
3710        Value::Object(obj) => {
3711            for (key, sub_pattern) in obj {
3712                let sub_value = &event_value[key];
3713                if !matches_value(sub_pattern, sub_value) {
3714                    return false;
3715                }
3716            }
3717            true
3718        }
3719        Value::Array(arr) => arr.iter().any(|elem| matches_single(elem, event_value)),
3720        _ => false,
3721    }
3722}
3723
3724fn matches_single(pattern_elem: &Value, event_value: &Value) -> bool {
3725    match pattern_elem {
3726        Value::Object(obj) => {
3727            if let Some(prefix_val) = obj.get("prefix") {
3728                if let (Some(prefix), Some(actual)) = (prefix_val.as_str(), event_value.as_str()) {
3729                    return actual.starts_with(prefix);
3730                }
3731                return false;
3732            }
3733            if let Some(exists_val) = obj.get("exists") {
3734                let should_exist = exists_val.as_bool().unwrap_or(true);
3735                let does_exist = !event_value.is_null();
3736                return should_exist == does_exist;
3737            }
3738            if let Some(anything_but_val) = obj.get("anything-but") {
3739                return match anything_but_val {
3740                    Value::String(s) => event_value.as_str() != Some(s.as_str()),
3741                    Value::Array(arr) => !arr.iter().any(|v| values_equal(v, event_value)),
3742                    Value::Number(_) => event_value != anything_but_val,
3743                    _ => true,
3744                };
3745            }
3746            if let Some(numeric_val) = obj.get("numeric") {
3747                return matches_numeric(numeric_val, event_value);
3748            }
3749            false
3750        }
3751        _ => values_equal(pattern_elem, event_value),
3752    }
3753}
3754
3755/// For each archive on `event_bus_name` whose event pattern matches the
3756/// event, append a clone of it to the archive's stored events and bump
3757/// the archive's counters.
3758#[allow(clippy::too_many_arguments)]
3759fn archive_matching_event(
3760    state: &mut crate::state::EventBridgeState,
3761    event: &PutEvent,
3762    event_bus_name: &str,
3763    source: &str,
3764    detail_type: &str,
3765    detail: &str,
3766    account_id: &str,
3767    region: &str,
3768    resources: &[String],
3769) {
3770    let archive_keys: Vec<String> = state.archives.keys().cloned().collect();
3771    for akey in archive_keys {
3772        let (archive_bus, archive_pattern, archive_enabled) = {
3773            let a = &state.archives[&akey];
3774            (
3775                state.resolve_bus_name(&a.event_source_arn),
3776                a.event_pattern.clone(),
3777                a.state == "ENABLED",
3778            )
3779        };
3780        if archive_bus != event_bus_name || !archive_enabled {
3781            continue;
3782        }
3783        let pattern_matches = matches_pattern(
3784            archive_pattern.as_deref(),
3785            source,
3786            detail_type,
3787            detail,
3788            account_id,
3789            region,
3790            resources,
3791        );
3792        if !pattern_matches {
3793            continue;
3794        }
3795        if let Some(archive) = state.archives.get_mut(&akey) {
3796            archive.event_count += 1;
3797            archive.size_bytes += detail.len() as i64;
3798            archive.events.push(event.clone());
3799        }
3800    }
3801}
3802
3803/// Parsed + validated inputs for `StartReplay`.
3804struct StartReplayInput {
3805    name: String,
3806    description: Option<String>,
3807    event_source_arn: String,
3808    destination: Value,
3809    destination_arn: String,
3810    event_start_time: DateTime<Utc>,
3811    event_end_time: DateTime<Utc>,
3812}
3813
3814impl StartReplayInput {
3815    fn from_body(body: &Value) -> Result<Self, AwsServiceError> {
3816        validate_required("ReplayName", &body["ReplayName"])?;
3817        let name = body["ReplayName"]
3818            .as_str()
3819            .ok_or_else(|| missing("ReplayName"))?
3820            .to_string();
3821        validate_string_length("replayName", &name, 1, 64)?;
3822        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
3823        validate_required("EventSourceArn", &body["EventSourceArn"])?;
3824        let description = body["Description"].as_str().map(|s| s.to_string());
3825        let event_source_arn = body["EventSourceArn"]
3826            .as_str()
3827            .ok_or_else(|| missing("EventSourceArn"))?
3828            .to_string();
3829        validate_string_length("eventSourceArn", &event_source_arn, 1, 1600)?;
3830        validate_required("EventStartTime", &body["EventStartTime"])?;
3831        validate_required("EventEndTime", &body["EventEndTime"])?;
3832        validate_required("Destination", &body["Destination"])?;
3833        let destination = body["Destination"].clone();
3834
3835        let event_start_time = body["EventStartTime"]
3836            .as_f64()
3837            .and_then(|f| DateTime::from_timestamp(f as i64, 0))
3838            .unwrap_or_else(Utc::now);
3839        let event_end_time = body["EventEndTime"]
3840            .as_f64()
3841            .and_then(|f| DateTime::from_timestamp(f as i64, 0))
3842            .unwrap_or_else(Utc::now);
3843
3844        let destination_arn = destination["Arn"].as_str().unwrap_or("").to_string();
3845        if !destination_arn.contains(":event-bus/") {
3846            return Err(AwsServiceError::aws_error(
3847                StatusCode::BAD_REQUEST,
3848                "ValidationException",
3849                "Parameter Destination.Arn is not valid. Reason: Must contain an event bus ARN.",
3850            ));
3851        }
3852
3853        Ok(Self {
3854            name,
3855            description,
3856            event_source_arn,
3857            destination,
3858            destination_arn,
3859            event_start_time,
3860            event_end_time,
3861        })
3862    }
3863}
3864
3865/// Walk the named archive, filter events into the replay window, then
3866/// fan out each event against rules on `bus_name` to collect its
3867/// matching targets. Returns only events that matched at least one
3868/// target.
3869#[allow(clippy::too_many_arguments)]
3870fn collect_replay_events_with_targets(
3871    state: &crate::state::EventBridgeState,
3872    archive_name: &str,
3873    bus_name: &str,
3874    event_start_time: DateTime<Utc>,
3875    event_end_time: DateTime<Utc>,
3876    account_id: &str,
3877    region: &str,
3878) -> Vec<(PutEvent, Vec<EventTarget>)> {
3879    let Some(archive) = state.archives.get(archive_name) else {
3880        return Vec::new();
3881    };
3882
3883    let replay_events: Vec<PutEvent> = archive
3884        .events
3885        .iter()
3886        .filter(|e| e.time >= event_start_time && e.time < event_end_time)
3887        .cloned()
3888        .collect();
3889
3890    let mut events_to_deliver: Vec<(PutEvent, Vec<EventTarget>)> = Vec::new();
3891    for event in replay_events {
3892        let matching_targets: Vec<EventTarget> = state
3893            .rules
3894            .values()
3895            .filter(|r| {
3896                r.event_bus_name == bus_name
3897                    && r.state == "ENABLED"
3898                    && matches_pattern(
3899                        r.event_pattern.as_deref(),
3900                        &event.source,
3901                        &event.detail_type,
3902                        &event.detail,
3903                        account_id,
3904                        region,
3905                        &event.resources,
3906                    )
3907            })
3908            .flat_map(|r| r.targets.clone())
3909            .collect();
3910
3911        if !matching_targets.is_empty() {
3912            events_to_deliver.push((event, matching_targets));
3913        }
3914    }
3915    events_to_deliver
3916}
3917
3918fn matches_numeric(numeric_arr: &Value, event_value: &Value) -> bool {
3919    let arr = match numeric_arr.as_array() {
3920        Some(a) => a,
3921        None => return false,
3922    };
3923    let actual = match event_value.as_f64() {
3924        Some(n) => n,
3925        None => return false,
3926    };
3927    let mut i = 0;
3928    while i + 1 < arr.len() {
3929        let op = match arr[i].as_str() {
3930            Some(s) => s,
3931            None => return false,
3932        };
3933        let threshold = match arr[i + 1].as_f64() {
3934            Some(n) => n,
3935            None => return false,
3936        };
3937        let ok = match op {
3938            ">" => actual > threshold,
3939            ">=" => actual >= threshold,
3940            "<" => actual < threshold,
3941            "<=" => actual <= threshold,
3942            "=" => (actual - threshold).abs() < f64::EPSILON,
3943            _ => return false,
3944        };
3945        if !ok {
3946            return false;
3947        }
3948        i += 2;
3949    }
3950    true
3951}
3952
3953fn values_equal(a: &Value, b: &Value) -> bool {
3954    a == b
3955}
3956
3957/// Resolve a simple JSON path like `$.detail.name` against an event JSON value.
3958fn resolve_json_path(event: &Value, path: &str) -> Option<Value> {
3959    let path = path.strip_prefix('$').unwrap_or(path);
3960    let mut current = event;
3961    for segment in path.split('.') {
3962        if segment.is_empty() {
3963            continue;
3964        }
3965        current = current.get(segment)?;
3966    }
3967    Some(current.clone())
3968}
3969
3970/// Apply an EventBridge InputTransformer to an event.
3971fn apply_input_transformer(transformer: &Value, event: &Value) -> String {
3972    let input_paths_map = transformer
3973        .get("InputPathsMap")
3974        .and_then(|v| v.as_object())
3975        .cloned()
3976        .unwrap_or_default();
3977    let template = transformer
3978        .get("InputTemplate")
3979        .and_then(|v| v.as_str())
3980        .unwrap_or("")
3981        .to_string();
3982
3983    // Resolve all input paths
3984    let mut resolved: HashMap<String, Value> = HashMap::new();
3985    for (var_name, path_val) in &input_paths_map {
3986        if let Some(path_str) = path_val.as_str() {
3987            if let Some(val) = resolve_json_path(event, path_str) {
3988                resolved.insert(var_name.clone(), val);
3989            }
3990        }
3991    }
3992
3993    // Replace <varName> placeholders in template
3994    let mut result = template;
3995    for (var_name, val) in &resolved {
3996        let placeholder = format!("<{var_name}>");
3997        let replacement = match val {
3998            Value::String(s) => s.clone(),
3999            other => other.to_string(),
4000        };
4001        result = result.replace(&placeholder, &replacement);
4002    }
4003
4004    result
4005}
4006
4007fn missing(name: &str) -> AwsServiceError {
4008    AwsServiceError::aws_error(
4009        StatusCode::BAD_REQUEST,
4010        "ValidationException",
4011        format!("The request must contain the parameter {name}"),
4012    )
4013}
4014
4015/// Extract a Lambda function name from its ARN.
4016///
4017/// Handles both unqualified (`arn:aws:lambda:region:account:function:NAME`)
4018/// and qualified (`arn:aws:lambda:region:account:function:NAME:alias`) ARNs.
4019fn function_name_from_arn(arn: &str) -> &str {
4020    let parts: Vec<&str> = arn.split(':').collect();
4021    if parts.len() >= 7 && parts[5] == "function" {
4022        parts[6]
4023    } else {
4024        arn
4025    }
4026}
4027
4028/// Spawn a background task to invoke a Lambda function via ContainerRuntime.
4029/// This is fire-and-forget: EventBridge delivery is asynchronous.
4030pub fn invoke_lambda_async(
4031    container_runtime: &Option<Arc<ContainerRuntime>>,
4032    lambda_state: &Option<SharedLambdaState>,
4033    function_arn: &str,
4034    payload: &str,
4035) {
4036    let runtime = match container_runtime {
4037        Some(rt) => rt.clone(),
4038        None => return,
4039    };
4040    let lambda_state = match lambda_state {
4041        Some(ls) => ls.clone(),
4042        None => return,
4043    };
4044    let func_name = function_name_from_arn(function_arn).to_string();
4045    let payload = payload.as_bytes().to_vec();
4046
4047    tokio::spawn(async move {
4048        let func = {
4049            let accounts = lambda_state.read();
4050            let state = accounts.default_ref();
4051            state.functions.get(&func_name).cloned()
4052        };
4053        let func = match func {
4054            Some(f) => f,
4055            None => {
4056                tracing::warn!(
4057                    function = %func_name,
4058                    "EventBridge Lambda target not found, skipping invocation"
4059                );
4060                return;
4061            }
4062        };
4063        match runtime.invoke(&func, &payload).await {
4064            Ok(_) => {
4065                tracing::info!(function = %func_name, "EventBridge Lambda invocation succeeded");
4066            }
4067            Err(e) => {
4068                tracing::warn!(
4069                    function = %func_name,
4070                    error = %e,
4071                    "EventBridge Lambda invocation failed"
4072                );
4073            }
4074        }
4075    });
4076}
4077
4078/// Deliver an EventBridge event to CloudWatch Logs by writing a log event
4079/// to the appropriate log group and stream.
4080pub fn deliver_to_logs(
4081    logs_state: &SharedLogsState,
4082    log_group_arn: &str,
4083    payload: &str,
4084    timestamp: chrono::DateTime<chrono::Utc>,
4085) {
4086    // Extract log group name from ARN: arn:aws:logs:region:account:log-group:NAME
4087    // or just the name if it's not an ARN
4088    let group_name = if log_group_arn.contains(":log-group:") {
4089        log_group_arn
4090            .split(":log-group:")
4091            .nth(1)
4092            .unwrap_or(log_group_arn)
4093            .trim_end_matches(":*")
4094    } else {
4095        log_group_arn
4096    };
4097
4098    let stream_name = "events".to_string();
4099    let ts_millis = timestamp.timestamp_millis();
4100
4101    let mut accounts = logs_state.write();
4102    let state = accounts.default_mut();
4103    let region = state.region.clone();
4104    let account_id = state.account_id.clone();
4105
4106    // Auto-create log group and stream if they don't exist
4107    let group = state
4108        .log_groups
4109        .entry(group_name.to_string())
4110        .or_insert_with(|| fakecloud_logs::state::LogGroup {
4111            name: group_name.to_string(),
4112            arn: Arn::new(
4113                "logs",
4114                &region,
4115                &account_id,
4116                &format!("log-group:{group_name}"),
4117            )
4118            .to_string(),
4119            creation_time: ts_millis,
4120            retention_in_days: None,
4121            kms_key_id: None,
4122            tags: HashMap::new(),
4123            log_streams: HashMap::new(),
4124            stored_bytes: 0,
4125            subscription_filters: Vec::new(),
4126            data_protection_policy: None,
4127            index_policies: Vec::new(),
4128            transformer: None,
4129            deletion_protection: false,
4130            log_group_class: Some("STANDARD".to_string()),
4131        });
4132
4133    let stream = group
4134        .log_streams
4135        .entry(stream_name.clone())
4136        .or_insert_with(|| fakecloud_logs::state::LogStream {
4137            name: stream_name,
4138            arn: format!("{}:log-stream:events", group.arn),
4139            creation_time: ts_millis,
4140            first_event_timestamp: None,
4141            last_event_timestamp: None,
4142            last_ingestion_time: None,
4143            upload_sequence_token: "1".to_string(),
4144            events: Vec::new(),
4145        });
4146
4147    stream.events.push(fakecloud_logs::state::LogEvent {
4148        timestamp: ts_millis,
4149        message: payload.to_string(),
4150        ingestion_time: ts_millis,
4151    });
4152    stream.last_event_timestamp = Some(ts_millis);
4153    stream.last_ingestion_time = Some(ts_millis);
4154    if stream.first_event_timestamp.is_none() {
4155        stream.first_event_timestamp = Some(ts_millis);
4156    }
4157}
4158
4159/// Apply connection auth parameters to an outgoing HTTP request.
4160fn apply_connection_auth(
4161    mut builder: reqwest::RequestBuilder,
4162    conn: &Connection,
4163) -> reqwest::RequestBuilder {
4164    match conn.authorization_type.as_str() {
4165        "API_KEY" => {
4166            if let Some(params) = conn.auth_parameters.get("ApiKeyAuthParameters") {
4167                if let (Some(name), Some(value)) = (
4168                    params["ApiKeyName"].as_str(),
4169                    params["ApiKeyValue"].as_str(),
4170                ) {
4171                    builder = builder.header(name, value);
4172                }
4173            }
4174        }
4175        "BASIC" => {
4176            if let Some(params) = conn.auth_parameters.get("BasicAuthParameters") {
4177                if let (Some(user), Some(pass)) =
4178                    (params["Username"].as_str(), params["Password"].as_str())
4179                {
4180                    builder = builder.basic_auth(user, Some(pass));
4181                }
4182            }
4183        }
4184        "OAUTH_CLIENT_CREDENTIALS" => {
4185            // For OAuth, in a real implementation we'd exchange credentials for a token.
4186            // Here we pass client credentials as basic auth as a reasonable approximation.
4187            if let Some(params) = conn.auth_parameters.get("OAuthParameters") {
4188                if let (Some(client_id), Some(client_secret)) = (
4189                    params["ClientParameters"]["ClientID"].as_str(),
4190                    params["ClientParameters"]["ClientSecret"].as_str(),
4191                ) {
4192                    builder = builder.basic_auth(client_id, Some(client_secret));
4193                }
4194            }
4195        }
4196        _ => {}
4197    }
4198    builder
4199}
4200
4201#[cfg(test)]
4202mod tests {
4203    use super::*;
4204
4205    /// Test helper that calls matches_pattern with default account/region/resources
4206    fn test_matches(
4207        pattern_json: Option<&str>,
4208        source: &str,
4209        detail_type: &str,
4210        detail: &str,
4211    ) -> bool {
4212        matches_pattern(
4213            pattern_json,
4214            source,
4215            detail_type,
4216            detail,
4217            "123456789012",
4218            "us-east-1",
4219            &[],
4220        )
4221    }
4222
4223    #[test]
4224    fn pattern_matches_source() {
4225        assert!(test_matches(
4226            Some(r#"{"source": ["my.app"]}"#),
4227            "my.app",
4228            "OrderPlaced",
4229            "{}"
4230        ));
4231        assert!(!test_matches(
4232            Some(r#"{"source": ["other.app"]}"#),
4233            "my.app",
4234            "OrderPlaced",
4235            "{}"
4236        ));
4237    }
4238
4239    #[test]
4240    fn pattern_matches_detail_type() {
4241        assert!(test_matches(
4242            Some(r#"{"detail-type": ["OrderPlaced"]}"#),
4243            "my.app",
4244            "OrderPlaced",
4245            "{}"
4246        ));
4247        assert!(!test_matches(
4248            Some(r#"{"detail-type": ["OrderShipped"]}"#),
4249            "my.app",
4250            "OrderPlaced",
4251            "{}"
4252        ));
4253    }
4254
4255    #[test]
4256    fn pattern_matches_detail_field() {
4257        assert!(test_matches(
4258            Some(r#"{"detail": {"status": ["ACTIVE"]}}"#),
4259            "my.app",
4260            "StatusChange",
4261            r#"{"status": "ACTIVE"}"#
4262        ));
4263        assert!(!test_matches(
4264            Some(r#"{"detail": {"status": ["ACTIVE"]}}"#),
4265            "my.app",
4266            "StatusChange",
4267            r#"{"status": "INACTIVE"}"#
4268        ));
4269    }
4270
4271    #[test]
4272    fn no_pattern_matches_everything() {
4273        assert!(test_matches(None, "any", "any", "{}"));
4274    }
4275
4276    #[test]
4277    fn combined_pattern() {
4278        let pattern = r#"{"source": ["orders"], "detail-type": ["OrderPlaced"]}"#;
4279        assert!(test_matches(Some(pattern), "orders", "OrderPlaced", "{}"));
4280        assert!(!test_matches(Some(pattern), "orders", "OrderShipped", "{}"));
4281        assert!(!test_matches(Some(pattern), "other", "OrderPlaced", "{}"));
4282    }
4283
4284    #[test]
4285    fn nested_detail_pattern() {
4286        let pattern = r#"{"detail": {"order": {"status": ["PLACED"]}}}"#;
4287        assert!(test_matches(
4288            Some(pattern),
4289            "my.app",
4290            "OrderEvent",
4291            r#"{"order": {"status": "PLACED", "id": "123"}}"#
4292        ));
4293        assert!(!test_matches(
4294            Some(pattern),
4295            "my.app",
4296            "OrderEvent",
4297            r#"{"order": {"status": "SHIPPED", "id": "123"}}"#
4298        ));
4299        assert!(!test_matches(
4300            Some(pattern),
4301            "my.app",
4302            "OrderEvent",
4303            r#"{"order": {"id": "123"}}"#
4304        ));
4305    }
4306
4307    #[test]
4308    fn deeply_nested_detail_pattern() {
4309        let pattern = r#"{"detail": {"a": {"b": {"c": ["deep"]}}}}"#;
4310        assert!(test_matches(
4311            Some(pattern),
4312            "src",
4313            "type",
4314            r#"{"a": {"b": {"c": "deep"}}}"#
4315        ));
4316        assert!(!test_matches(
4317            Some(pattern),
4318            "src",
4319            "type",
4320            r#"{"a": {"b": {"c": "shallow"}}}"#
4321        ));
4322    }
4323
4324    #[test]
4325    fn prefix_matcher() {
4326        let pattern = r#"{"source": [{"prefix": "com.myapp"}]}"#;
4327        assert!(test_matches(
4328            Some(pattern),
4329            "com.myapp.orders",
4330            "OrderPlaced",
4331            "{}"
4332        ));
4333        assert!(test_matches(
4334            Some(pattern),
4335            "com.myapp",
4336            "OrderPlaced",
4337            "{}"
4338        ));
4339        assert!(!test_matches(
4340            Some(pattern),
4341            "com.other",
4342            "OrderPlaced",
4343            "{}"
4344        ));
4345    }
4346
4347    #[test]
4348    fn prefix_matcher_in_detail() {
4349        let pattern = r#"{"detail": {"region": [{"prefix": "us-"}]}}"#;
4350        assert!(test_matches(
4351            Some(pattern),
4352            "src",
4353            "type",
4354            r#"{"region": "us-east-1"}"#
4355        ));
4356        assert!(!test_matches(
4357            Some(pattern),
4358            "src",
4359            "type",
4360            r#"{"region": "eu-west-1"}"#
4361        ));
4362    }
4363
4364    #[test]
4365    fn exists_matcher() {
4366        let pattern = r#"{"detail": {"error": [{"exists": true}]}}"#;
4367        assert!(test_matches(
4368            Some(pattern),
4369            "src",
4370            "type",
4371            r#"{"error": "something broke"}"#
4372        ));
4373        assert!(!test_matches(
4374            Some(pattern),
4375            "src",
4376            "type",
4377            r#"{"status": "ok"}"#
4378        ));
4379
4380        let pattern = r#"{"detail": {"error": [{"exists": false}]}}"#;
4381        assert!(test_matches(
4382            Some(pattern),
4383            "src",
4384            "type",
4385            r#"{"status": "ok"}"#
4386        ));
4387        assert!(!test_matches(
4388            Some(pattern),
4389            "src",
4390            "type",
4391            r#"{"error": "something broke"}"#
4392        ));
4393    }
4394
4395    #[test]
4396    fn anything_but_matcher() {
4397        let pattern = r#"{"source": [{"anything-but": "internal"}]}"#;
4398        assert!(test_matches(Some(pattern), "external", "Event", "{}"));
4399        assert!(!test_matches(Some(pattern), "internal", "Event", "{}"));
4400
4401        let pattern = r#"{"source": [{"anything-but": ["internal", "test"]}]}"#;
4402        assert!(test_matches(Some(pattern), "external", "Event", "{}"));
4403        assert!(!test_matches(Some(pattern), "internal", "Event", "{}"));
4404        assert!(!test_matches(Some(pattern), "test", "Event", "{}"));
4405    }
4406
4407    #[test]
4408    fn anything_but_in_detail() {
4409        let pattern = r#"{"detail": {"env": [{"anything-but": "prod"}]}}"#;
4410        assert!(test_matches(
4411            Some(pattern),
4412            "src",
4413            "type",
4414            r#"{"env": "staging"}"#
4415        ));
4416        assert!(!test_matches(
4417            Some(pattern),
4418            "src",
4419            "type",
4420            r#"{"env": "prod"}"#
4421        ));
4422    }
4423
4424    #[test]
4425    fn numeric_greater_than() {
4426        let pattern = r#"{"detail": {"count": [{"numeric": [">", 100]}]}}"#;
4427        assert!(test_matches(
4428            Some(pattern),
4429            "src",
4430            "type",
4431            r#"{"count": 150}"#
4432        ));
4433        assert!(!test_matches(
4434            Some(pattern),
4435            "src",
4436            "type",
4437            r#"{"count": 100}"#
4438        ));
4439        assert!(!test_matches(
4440            Some(pattern),
4441            "src",
4442            "type",
4443            r#"{"count": 50}"#
4444        ));
4445    }
4446
4447    #[test]
4448    fn numeric_less_than() {
4449        let pattern = r#"{"detail": {"count": [{"numeric": ["<", 10]}]}}"#;
4450        assert!(test_matches(
4451            Some(pattern),
4452            "src",
4453            "type",
4454            r#"{"count": 5}"#
4455        ));
4456        assert!(!test_matches(
4457            Some(pattern),
4458            "src",
4459            "type",
4460            r#"{"count": 10}"#
4461        ));
4462        assert!(!test_matches(
4463            Some(pattern),
4464            "src",
4465            "type",
4466            r#"{"count": 15}"#
4467        ));
4468    }
4469
4470    #[test]
4471    fn numeric_range() {
4472        let pattern = r#"{"detail": {"count": [{"numeric": [">=", 50, "<", 200]}]}}"#;
4473        assert!(test_matches(
4474            Some(pattern),
4475            "src",
4476            "type",
4477            r#"{"count": 50}"#
4478        ));
4479        assert!(test_matches(
4480            Some(pattern),
4481            "src",
4482            "type",
4483            r#"{"count": 100}"#
4484        ));
4485        assert!(!test_matches(
4486            Some(pattern),
4487            "src",
4488            "type",
4489            r#"{"count": 200}"#
4490        ));
4491        assert!(!test_matches(
4492            Some(pattern),
4493            "src",
4494            "type",
4495            r#"{"count": 49}"#
4496        ));
4497    }
4498
4499    #[test]
4500    fn mixed_matchers_and_literals() {
4501        let pattern = r#"{"source": ["exact.match", {"prefix": "com.myapp"}]}"#;
4502        assert!(test_matches(Some(pattern), "exact.match", "Event", "{}"));
4503        assert!(test_matches(
4504            Some(pattern),
4505            "com.myapp.orders",
4506            "Event",
4507            "{}"
4508        ));
4509        assert!(!test_matches(Some(pattern), "other.source", "Event", "{}"));
4510    }
4511
4512    // ---- list_connections / list_api_destinations filtering & pagination ----
4513
4514    use fakecloud_core::delivery::DeliveryBus;
4515    use parking_lot::RwLock;
4516
4517    fn make_service() -> EventBridgeService {
4518        let state = Arc::new(RwLock::new(
4519            fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
4520        ));
4521        let delivery = Arc::new(DeliveryBus::new());
4522        EventBridgeService::new(state, delivery)
4523    }
4524
4525    fn make_request(action: &str, body: Value) -> AwsRequest {
4526        AwsRequest {
4527            service: "events".to_string(),
4528            action: action.to_string(),
4529            region: "us-east-1".to_string(),
4530            account_id: "123456789012".to_string(),
4531            request_id: "test-id".to_string(),
4532            headers: http::HeaderMap::new(),
4533            query_params: HashMap::new(),
4534            body: serde_json::to_vec(&body).unwrap().into(),
4535            path_segments: vec![],
4536            raw_path: "/".to_string(),
4537            raw_query: String::new(),
4538            method: http::Method::POST,
4539            is_query_protocol: false,
4540            access_key_id: None,
4541            principal: None,
4542        }
4543    }
4544
4545    fn create_connection(svc: &EventBridgeService, name: &str) {
4546        let req = make_request(
4547            "CreateConnection",
4548            json!({
4549                "Name": name,
4550                "AuthorizationType": "API_KEY",
4551                "AuthParameters": {
4552                    "ApiKeyAuthParameters": {
4553                        "ApiKeyName": "x-api-key",
4554                        "ApiKeyValue": "secret"
4555                    }
4556                }
4557            }),
4558        );
4559        svc.create_connection(&req).unwrap();
4560    }
4561
4562    fn create_api_destination(svc: &EventBridgeService, name: &str, conn_name: &str) {
4563        let conn_arn_field = {
4564            let _mas = svc.state.read();
4565            let state = _mas.default_ref();
4566            state.connections.get(conn_name).unwrap().arn.clone()
4567        };
4568        let req = make_request(
4569            "CreateApiDestination",
4570            json!({
4571                "Name": name,
4572                "ConnectionArn": conn_arn_field,
4573                "InvocationEndpoint": "https://example.com",
4574                "HttpMethod": "POST"
4575            }),
4576        );
4577        svc.create_api_destination(&req).unwrap();
4578    }
4579
4580    // -- ListConnections tests --
4581
4582    #[test]
4583    fn list_connections_returns_all_by_default() {
4584        let svc = make_service();
4585        create_connection(&svc, "conn-alpha");
4586        create_connection(&svc, "conn-beta");
4587        create_connection(&svc, "conn-gamma");
4588
4589        let req = make_request("ListConnections", json!({}));
4590        let resp = svc.list_connections(&req).unwrap();
4591        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4592        assert_eq!(body["Connections"].as_array().unwrap().len(), 3);
4593        assert!(body["NextToken"].is_null());
4594    }
4595
4596    #[test]
4597    fn list_connections_name_prefix_filter() {
4598        let svc = make_service();
4599        create_connection(&svc, "prod-conn-1");
4600        create_connection(&svc, "prod-conn-2");
4601        create_connection(&svc, "dev-conn-1");
4602
4603        let req = make_request("ListConnections", json!({ "NamePrefix": "prod-" }));
4604        let resp = svc.list_connections(&req).unwrap();
4605        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4606        let names: Vec<&str> = body["Connections"]
4607            .as_array()
4608            .unwrap()
4609            .iter()
4610            .map(|c| c["Name"].as_str().unwrap())
4611            .collect();
4612        assert_eq!(names.len(), 2);
4613        assert!(names.iter().all(|n| n.starts_with("prod-")));
4614    }
4615
4616    #[test]
4617    fn list_connections_state_filter() {
4618        let svc = make_service();
4619        create_connection(&svc, "conn-a");
4620        create_connection(&svc, "conn-b");
4621
4622        // All connections start as AUTHORIZED; change one
4623        {
4624            let mut _mas = svc.state.write();
4625            let state = _mas.default_mut();
4626            state
4627                .connections
4628                .get_mut("conn-b")
4629                .unwrap()
4630                .connection_state = "DEAUTHORIZED".to_string();
4631        }
4632
4633        let req = make_request(
4634            "ListConnections",
4635            json!({ "ConnectionState": "AUTHORIZED" }),
4636        );
4637        let resp = svc.list_connections(&req).unwrap();
4638        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4639        let conns = body["Connections"].as_array().unwrap();
4640        assert_eq!(conns.len(), 1);
4641        assert_eq!(conns[0]["Name"].as_str().unwrap(), "conn-a");
4642    }
4643
4644    #[test]
4645    fn list_connections_pagination() {
4646        let svc = make_service();
4647        for i in 0..5 {
4648            create_connection(&svc, &format!("conn-{i:02}"));
4649        }
4650
4651        // First page: limit 2
4652        let req = make_request("ListConnections", json!({ "Limit": 2 }));
4653        let resp = svc.list_connections(&req).unwrap();
4654        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4655        assert_eq!(body["Connections"].as_array().unwrap().len(), 2);
4656        let token = body["NextToken"].as_str().unwrap();
4657        assert_eq!(token, "2");
4658
4659        // Second page
4660        let req = make_request("ListConnections", json!({ "Limit": 2, "NextToken": token }));
4661        let resp = svc.list_connections(&req).unwrap();
4662        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4663        assert_eq!(body["Connections"].as_array().unwrap().len(), 2);
4664        let token = body["NextToken"].as_str().unwrap();
4665        assert_eq!(token, "4");
4666
4667        // Third page (only 1 remaining)
4668        let req = make_request("ListConnections", json!({ "Limit": 2, "NextToken": token }));
4669        let resp = svc.list_connections(&req).unwrap();
4670        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4671        assert_eq!(body["Connections"].as_array().unwrap().len(), 1);
4672        assert!(body["NextToken"].is_null());
4673    }
4674
4675    #[test]
4676    fn list_connections_pagination_with_filter() {
4677        let svc = make_service();
4678        for i in 0..4 {
4679            create_connection(&svc, &format!("prod-{i:02}"));
4680        }
4681        create_connection(&svc, "dev-00");
4682
4683        let req = make_request(
4684            "ListConnections",
4685            json!({ "NamePrefix": "prod-", "Limit": 2 }),
4686        );
4687        let resp = svc.list_connections(&req).unwrap();
4688        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4689        assert_eq!(body["Connections"].as_array().unwrap().len(), 2);
4690        assert!(body["NextToken"].as_str().is_some());
4691    }
4692
4693    // -- ListApiDestinations tests --
4694
4695    #[test]
4696    fn list_api_destinations_returns_all_by_default() {
4697        let svc = make_service();
4698        create_connection(&svc, "my-conn");
4699        create_api_destination(&svc, "dest-alpha", "my-conn");
4700        create_api_destination(&svc, "dest-beta", "my-conn");
4701
4702        let req = make_request("ListApiDestinations", json!({}));
4703        let resp = svc.list_api_destinations(&req).unwrap();
4704        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4705        assert_eq!(body["ApiDestinations"].as_array().unwrap().len(), 2);
4706        assert!(body["NextToken"].is_null());
4707    }
4708
4709    #[test]
4710    fn list_api_destinations_name_prefix_filter() {
4711        let svc = make_service();
4712        create_connection(&svc, "my-conn");
4713        create_api_destination(&svc, "prod-dest-1", "my-conn");
4714        create_api_destination(&svc, "prod-dest-2", "my-conn");
4715        create_api_destination(&svc, "dev-dest-1", "my-conn");
4716
4717        let req = make_request("ListApiDestinations", json!({ "NamePrefix": "prod-" }));
4718        let resp = svc.list_api_destinations(&req).unwrap();
4719        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4720        let names: Vec<&str> = body["ApiDestinations"]
4721            .as_array()
4722            .unwrap()
4723            .iter()
4724            .map(|d| d["Name"].as_str().unwrap())
4725            .collect();
4726        assert_eq!(names.len(), 2);
4727        assert!(names.iter().all(|n| n.starts_with("prod-")));
4728    }
4729
4730    #[test]
4731    fn list_api_destinations_connection_arn_filter() {
4732        let svc = make_service();
4733        create_connection(&svc, "conn-a");
4734        create_connection(&svc, "conn-b");
4735        create_api_destination(&svc, "dest-1", "conn-a");
4736        create_api_destination(&svc, "dest-2", "conn-b");
4737        create_api_destination(&svc, "dest-3", "conn-a");
4738
4739        let conn_a_arn = {
4740            let _mas = svc.state.read();
4741            let state = _mas.default_ref();
4742            state.connections.get("conn-a").unwrap().arn.clone()
4743        };
4744
4745        let req = make_request(
4746            "ListApiDestinations",
4747            json!({ "ConnectionArn": conn_a_arn }),
4748        );
4749        let resp = svc.list_api_destinations(&req).unwrap();
4750        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4751        let names: Vec<&str> = body["ApiDestinations"]
4752            .as_array()
4753            .unwrap()
4754            .iter()
4755            .map(|d| d["Name"].as_str().unwrap())
4756            .collect();
4757        assert_eq!(names.len(), 2);
4758        assert!(names.contains(&"dest-1"));
4759        assert!(names.contains(&"dest-3"));
4760    }
4761
4762    #[test]
4763    fn list_api_destinations_pagination() {
4764        let svc = make_service();
4765        create_connection(&svc, "my-conn");
4766        for i in 0..5 {
4767            create_api_destination(&svc, &format!("dest-{i:02}"), "my-conn");
4768        }
4769
4770        // First page
4771        let req = make_request("ListApiDestinations", json!({ "Limit": 2 }));
4772        let resp = svc.list_api_destinations(&req).unwrap();
4773        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4774        assert_eq!(body["ApiDestinations"].as_array().unwrap().len(), 2);
4775        let token = body["NextToken"].as_str().unwrap();
4776        assert_eq!(token, "2");
4777
4778        // Second page
4779        let req = make_request(
4780            "ListApiDestinations",
4781            json!({ "Limit": 2, "NextToken": token }),
4782        );
4783        let resp = svc.list_api_destinations(&req).unwrap();
4784        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4785        assert_eq!(body["ApiDestinations"].as_array().unwrap().len(), 2);
4786        let token = body["NextToken"].as_str().unwrap();
4787        assert_eq!(token, "4");
4788
4789        // Last page
4790        let req = make_request(
4791            "ListApiDestinations",
4792            json!({ "Limit": 2, "NextToken": token }),
4793        );
4794        let resp = svc.list_api_destinations(&req).unwrap();
4795        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4796        assert_eq!(body["ApiDestinations"].as_array().unwrap().len(), 1);
4797        assert!(body["NextToken"].is_null());
4798    }
4799
4800    // -- ListEventBuses pagination tests --
4801
4802    fn create_event_bus(svc: &EventBridgeService, name: &str) {
4803        let req = make_request("CreateEventBus", json!({ "Name": name }));
4804        svc.create_event_bus(&req).unwrap();
4805    }
4806
4807    #[test]
4808    fn list_event_buses_pagination() {
4809        let svc = make_service();
4810        // "default" bus already exists, create 4 more
4811        for i in 0..4 {
4812            create_event_bus(&svc, &format!("bus-{i:02}"));
4813        }
4814
4815        // First page: limit 2
4816        let req = make_request("ListEventBuses", json!({ "Limit": 2 }));
4817        let resp = svc.list_event_buses(&req).unwrap();
4818        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4819        assert_eq!(body["EventBuses"].as_array().unwrap().len(), 2);
4820        let token = body["NextToken"].as_str().unwrap();
4821        assert_eq!(token, "2");
4822
4823        // Second page
4824        let req = make_request("ListEventBuses", json!({ "Limit": 2, "NextToken": token }));
4825        let resp = svc.list_event_buses(&req).unwrap();
4826        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4827        assert_eq!(body["EventBuses"].as_array().unwrap().len(), 2);
4828        let token = body["NextToken"].as_str().unwrap();
4829        assert_eq!(token, "4");
4830
4831        // Third page (only 1 remaining)
4832        let req = make_request("ListEventBuses", json!({ "Limit": 2, "NextToken": token }));
4833        let resp = svc.list_event_buses(&req).unwrap();
4834        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4835        assert_eq!(body["EventBuses"].as_array().unwrap().len(), 1);
4836        assert!(body["NextToken"].is_null());
4837    }
4838
4839    #[test]
4840    fn list_event_buses_no_pagination_returns_all() {
4841        let svc = make_service();
4842        create_event_bus(&svc, "bus-alpha");
4843        create_event_bus(&svc, "bus-beta");
4844
4845        let req = make_request("ListEventBuses", json!({}));
4846        let resp = svc.list_event_buses(&req).unwrap();
4847        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4848        // default + 2 custom = 3
4849        assert_eq!(body["EventBuses"].as_array().unwrap().len(), 3);
4850        assert!(body["NextToken"].is_null());
4851    }
4852
4853    // -- PutEvents EndpointId tests --
4854
4855    #[test]
4856    fn put_events_never_includes_endpoint_id_in_response() {
4857        let svc = make_service();
4858        // Even when EndpointId is provided in the request, it must not appear in the response
4859        let req = make_request(
4860            "PutEvents",
4861            json!({
4862                "EndpointId": "my-endpoint.abc123",
4863                "Entries": [{
4864                    "Source": "my.source",
4865                    "DetailType": "MyType",
4866                    "Detail": "{}",
4867                    "EventBusName": "default"
4868                }]
4869            }),
4870        );
4871        let resp = svc.put_events(&req).unwrap();
4872        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4873        assert!(
4874            !body.as_object().unwrap().contains_key("EndpointId"),
4875            "EndpointId should never be in the PutEvents response"
4876        );
4877        assert_eq!(body["FailedEntryCount"], 0);
4878    }
4879
4880    // -- ListArchives pagination tests --
4881
4882    fn create_archive(svc: &EventBridgeService, name: &str) {
4883        let req = make_request(
4884            "CreateArchive",
4885            json!({
4886                "ArchiveName": name,
4887                "EventSourceArn": "arn:aws:events:us-east-1:123456789012:event-bus/default"
4888            }),
4889        );
4890        svc.create_archive(&req).unwrap();
4891    }
4892
4893    #[test]
4894    fn list_archives_pagination() {
4895        let svc = make_service();
4896        for i in 0..5 {
4897            create_archive(&svc, &format!("archive-{i:02}"));
4898        }
4899
4900        // First page: limit 2
4901        let req = make_request("ListArchives", json!({ "Limit": 2 }));
4902        let resp = svc.list_archives(&req).unwrap();
4903        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4904        assert_eq!(body["Archives"].as_array().unwrap().len(), 2);
4905        let token = body["NextToken"].as_str().unwrap();
4906        assert_eq!(token, "2");
4907
4908        // Second page
4909        let req = make_request("ListArchives", json!({ "Limit": 2, "NextToken": token }));
4910        let resp = svc.list_archives(&req).unwrap();
4911        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4912        assert_eq!(body["Archives"].as_array().unwrap().len(), 2);
4913        let token = body["NextToken"].as_str().unwrap();
4914        assert_eq!(token, "4");
4915
4916        // Third page (only 1 remaining)
4917        let req = make_request("ListArchives", json!({ "Limit": 2, "NextToken": token }));
4918        let resp = svc.list_archives(&req).unwrap();
4919        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4920        assert_eq!(body["Archives"].as_array().unwrap().len(), 1);
4921        assert!(body["NextToken"].is_null());
4922    }
4923
4924    // -- ListReplays pagination tests --
4925
4926    fn create_replay(svc: &EventBridgeService, name: &str) {
4927        // Need an archive first for the replay's event source
4928        let archive_arn = {
4929            let guard = svc.state.read();
4930            let st = guard.default_ref();
4931            if st.archives.contains_key("replay-archive") {
4932                st.archives["replay-archive"].arn.clone()
4933            } else {
4934                drop(guard);
4935                create_archive(svc, "replay-archive");
4936                svc.state.read().default_ref().archives["replay-archive"]
4937                    .arn
4938                    .clone()
4939            }
4940        };
4941        let req = make_request(
4942            "StartReplay",
4943            json!({
4944                "ReplayName": name,
4945                "EventSourceArn": archive_arn,
4946                "EventStartTime": 1000000.0,
4947                "EventEndTime": 2000000.0,
4948                "Destination": {
4949                    "Arn": "arn:aws:events:us-east-1:123456789012:event-bus/default"
4950                }
4951            }),
4952        );
4953        svc.start_replay(&req).unwrap();
4954    }
4955
4956    #[test]
4957    fn list_replays_pagination() {
4958        let svc = make_service();
4959        for i in 0..5 {
4960            create_replay(&svc, &format!("replay-{i:02}"));
4961        }
4962
4963        // First page: limit 2
4964        let req = make_request("ListReplays", json!({ "Limit": 2 }));
4965        let resp = svc.list_replays(&req).unwrap();
4966        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4967        assert_eq!(body["Replays"].as_array().unwrap().len(), 2);
4968        let token = body["NextToken"].as_str().unwrap();
4969        assert_eq!(token, "2");
4970
4971        // Second page
4972        let req = make_request("ListReplays", json!({ "Limit": 2, "NextToken": token }));
4973        let resp = svc.list_replays(&req).unwrap();
4974        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4975        assert_eq!(body["Replays"].as_array().unwrap().len(), 2);
4976        let token = body["NextToken"].as_str().unwrap();
4977        assert_eq!(token, "4");
4978
4979        // Third page (only 1 remaining)
4980        let req = make_request("ListReplays", json!({ "Limit": 2, "NextToken": token }));
4981        let resp = svc.list_replays(&req).unwrap();
4982        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4983        assert_eq!(body["Replays"].as_array().unwrap().len(), 1);
4984        assert!(body["NextToken"].is_null());
4985    }
4986
4987    #[test]
4988    fn list_event_buses_invalid_next_token_returns_error() {
4989        let svc = make_service();
4990
4991        let req = make_request("ListEventBuses", json!({ "NextToken": "not-a-number" }));
4992        let result = svc.list_event_buses(&req);
4993        assert!(
4994            result.is_err(),
4995            "non-numeric NextToken should return an error"
4996        );
4997    }
4998
4999    // ---- TestEventPattern tests ----
5000
5001    #[test]
5002    fn test_event_pattern_match() {
5003        let svc = make_service();
5004        let req = make_request(
5005            "TestEventPattern",
5006            json!({
5007                "EventPattern": r#"{"source": ["my.app"]}"#,
5008                "Event": r#"{"source": "my.app", "detail-type": "Test", "detail": {}}"#
5009            }),
5010        );
5011        let resp = svc.test_event_pattern(&req).unwrap();
5012        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5013        assert_eq!(body["Result"], true);
5014    }
5015
5016    #[test]
5017    fn test_event_pattern_no_match() {
5018        let svc = make_service();
5019        let req = make_request(
5020            "TestEventPattern",
5021            json!({
5022                "EventPattern": r#"{"source": ["other.app"]}"#,
5023                "Event": r#"{"source": "my.app", "detail-type": "Test", "detail": {}}"#
5024            }),
5025        );
5026        let resp = svc.test_event_pattern(&req).unwrap();
5027        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5028        assert_eq!(body["Result"], false);
5029    }
5030
5031    #[test]
5032    fn test_event_pattern_detail_match() {
5033        let svc = make_service();
5034        let req = make_request(
5035            "TestEventPattern",
5036            json!({
5037                "EventPattern": r#"{"detail": {"status": ["PLACED"]}}"#,
5038                "Event": r#"{"source": "my.app", "detail-type": "Order", "detail": {"status": "PLACED", "id": "123"}}"#
5039            }),
5040        );
5041        let resp = svc.test_event_pattern(&req).unwrap();
5042        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5043        assert_eq!(body["Result"], true);
5044    }
5045
5046    // ---- UpdateEventBus tests ----
5047
5048    #[test]
5049    fn update_event_bus_description() {
5050        let svc = make_service();
5051        create_event_bus(&svc, "my-bus");
5052
5053        let req = make_request(
5054            "UpdateEventBus",
5055            json!({ "Name": "my-bus", "Description": "Updated desc" }),
5056        );
5057        let resp = svc.update_event_bus(&req).unwrap();
5058        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5059        assert_eq!(body["Name"], "my-bus");
5060
5061        // Verify via describe
5062        let req = make_request("DescribeEventBus", json!({ "Name": "my-bus" }));
5063        let resp = svc.describe_event_bus(&req).unwrap();
5064        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5065        assert_eq!(body["Description"], "Updated desc");
5066    }
5067
5068    #[test]
5069    fn update_event_bus_not_found() {
5070        let svc = make_service();
5071        let req = make_request(
5072            "UpdateEventBus",
5073            json!({ "Name": "ghost-bus", "Description": "nope" }),
5074        );
5075        assert!(svc.update_event_bus(&req).is_err());
5076    }
5077
5078    // ---- Endpoint CRUD tests ----
5079
5080    fn create_endpoint_helper(svc: &EventBridgeService, name: &str) {
5081        let req = make_request(
5082            "CreateEndpoint",
5083            json!({
5084                "Name": name,
5085                "RoutingConfig": {
5086                    "FailoverConfig": {
5087                        "Primary": { "HealthCheck": "" },
5088                        "Secondary": { "Route": "us-west-2" }
5089                    }
5090                },
5091                "EventBuses": [
5092                    { "EventBusArn": "arn:aws:events:us-east-1:123456789012:event-bus/default" }
5093                ]
5094            }),
5095        );
5096        svc.create_endpoint(&req).unwrap();
5097    }
5098
5099    #[test]
5100    fn endpoint_create_describe_delete() {
5101        let svc = make_service();
5102        create_endpoint_helper(&svc, "my-endpoint");
5103
5104        // Describe
5105        let req = make_request("DescribeEndpoint", json!({ "Name": "my-endpoint" }));
5106        let resp = svc.describe_endpoint(&req).unwrap();
5107        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5108        assert_eq!(body["Name"], "my-endpoint");
5109        assert_eq!(body["State"], "ACTIVE");
5110        assert!(body["EndpointId"].as_str().unwrap().contains("my-endpoint"));
5111
5112        // Delete
5113        let req = make_request("DeleteEndpoint", json!({ "Name": "my-endpoint" }));
5114        svc.delete_endpoint(&req).unwrap();
5115
5116        // Verify gone
5117        let req = make_request("DescribeEndpoint", json!({ "Name": "my-endpoint" }));
5118        assert!(svc.describe_endpoint(&req).is_err());
5119    }
5120
5121    #[test]
5122    fn endpoint_list_and_update() {
5123        let svc = make_service();
5124        create_endpoint_helper(&svc, "ep-alpha");
5125        create_endpoint_helper(&svc, "ep-beta");
5126
5127        // List all
5128        let req = make_request("ListEndpoints", json!({}));
5129        let resp = svc.list_endpoints(&req).unwrap();
5130        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5131        assert_eq!(body["Endpoints"].as_array().unwrap().len(), 2);
5132
5133        // Update
5134        let req = make_request(
5135            "UpdateEndpoint",
5136            json!({ "Name": "ep-alpha", "Description": "updated" }),
5137        );
5138        let resp = svc.update_endpoint(&req).unwrap();
5139        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5140        assert_eq!(body["Name"], "ep-alpha");
5141
5142        // Verify description
5143        let req = make_request("DescribeEndpoint", json!({ "Name": "ep-alpha" }));
5144        let resp = svc.describe_endpoint(&req).unwrap();
5145        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5146        assert_eq!(body["Description"], "updated");
5147    }
5148
5149    #[test]
5150    fn endpoint_duplicate_fails() {
5151        let svc = make_service();
5152        create_endpoint_helper(&svc, "dup-ep");
5153        let req = make_request(
5154            "CreateEndpoint",
5155            json!({
5156                "Name": "dup-ep",
5157                "RoutingConfig": {},
5158                "EventBuses": []
5159            }),
5160        );
5161        assert!(svc.create_endpoint(&req).is_err());
5162    }
5163
5164    // ---- DeauthorizeConnection tests ----
5165
5166    #[test]
5167    fn deauthorize_connection_sets_state() {
5168        let svc = make_service();
5169        create_connection(&svc, "deauth-conn");
5170
5171        let req = make_request("DeauthorizeConnection", json!({ "Name": "deauth-conn" }));
5172        let resp = svc.deauthorize_connection(&req).unwrap();
5173        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5174        assert_eq!(body["ConnectionState"], "DEAUTHORIZING");
5175        assert!(body["ConnectionArn"]
5176            .as_str()
5177            .unwrap()
5178            .contains("deauth-conn"));
5179
5180        // Verify via describe
5181        let req = make_request("DescribeConnection", json!({ "Name": "deauth-conn" }));
5182        let resp = svc.describe_connection(&req).unwrap();
5183        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5184        assert_eq!(body["ConnectionState"], "DEAUTHORIZING");
5185    }
5186
5187    #[test]
5188    fn deauthorize_connection_not_found() {
5189        let svc = make_service();
5190        let req = make_request("DeauthorizeConnection", json!({ "Name": "ghost-conn" }));
5191        assert!(svc.deauthorize_connection(&req).is_err());
5192    }
5193
5194    // ---- Partner event source tests ----
5195
5196    #[test]
5197    fn partner_event_source_crud() {
5198        let svc = make_service();
5199
5200        // Create
5201        let req = make_request(
5202            "CreatePartnerEventSource",
5203            json!({ "Name": "partner/test", "Account": "123456789012" }),
5204        );
5205        svc.create_partner_event_source(&req).unwrap();
5206
5207        // Describe
5208        let req = make_request(
5209            "DescribePartnerEventSource",
5210            json!({ "Name": "partner/test" }),
5211        );
5212        let resp = svc.describe_partner_event_source(&req).unwrap();
5213        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5214        assert_eq!(body["Name"], "partner/test");
5215
5216        // List
5217        let req = make_request("ListPartnerEventSources", json!({"NamePrefix": "partner/"}));
5218        let resp = svc.list_partner_event_sources(&req).unwrap();
5219        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5220        assert_eq!(body["PartnerEventSources"].as_array().unwrap().len(), 1);
5221
5222        // ListPartnerEventSourceAccounts
5223        let req = make_request(
5224            "ListPartnerEventSourceAccounts",
5225            json!({ "EventSourceName": "partner/test" }),
5226        );
5227        let resp = svc.list_partner_event_source_accounts(&req).unwrap();
5228        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5229        assert_eq!(
5230            body["PartnerEventSourceAccounts"].as_array().unwrap().len(),
5231            1
5232        );
5233
5234        // DescribeEventSource
5235        let req = make_request("DescribeEventSource", json!({ "Name": "partner/test" }));
5236        let resp = svc.describe_event_source(&req).unwrap();
5237        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5238        assert_eq!(body["Name"], "partner/test");
5239        assert_eq!(body["State"], "ACTIVE");
5240
5241        // ListEventSources
5242        let req = make_request("ListEventSources", json!({}));
5243        let resp = svc.list_event_sources(&req).unwrap();
5244        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5245        assert_eq!(body["EventSources"].as_array().unwrap().len(), 1);
5246
5247        // Delete
5248        let req = make_request(
5249            "DeletePartnerEventSource",
5250            json!({ "Name": "partner/test", "Account": "123456789012" }),
5251        );
5252        svc.delete_partner_event_source(&req).unwrap();
5253
5254        // Verify gone
5255        let req = make_request(
5256            "DescribePartnerEventSource",
5257            json!({ "Name": "partner/test" }),
5258        );
5259        assert!(svc.describe_partner_event_source(&req).is_err());
5260    }
5261
5262    #[test]
5263    fn activate_deactivate_event_source() {
5264        let svc = make_service();
5265
5266        // Create a partner event source first
5267        let req = make_request(
5268            "CreatePartnerEventSource",
5269            json!({ "Name": "aws.partner/test", "Account": "123456789012" }),
5270        );
5271        svc.create_partner_event_source(&req).unwrap();
5272
5273        // Deactivate it
5274        let req = make_request(
5275            "DeactivateEventSource",
5276            json!({ "Name": "aws.partner/test" }),
5277        );
5278        svc.deactivate_event_source(&req).unwrap();
5279        {
5280            let _mas = svc.state.read();
5281            let state = _mas.default_ref();
5282            assert_eq!(
5283                state.partner_event_sources["aws.partner/test"].state,
5284                "INACTIVE"
5285            );
5286        }
5287
5288        // Activate it
5289        let req = make_request("ActivateEventSource", json!({ "Name": "aws.partner/test" }));
5290        svc.activate_event_source(&req).unwrap();
5291        {
5292            let _mas = svc.state.read();
5293            let state = _mas.default_ref();
5294            assert_eq!(
5295                state.partner_event_sources["aws.partner/test"].state,
5296                "ACTIVE"
5297            );
5298        }
5299
5300        // Not-found returns error
5301        let req = make_request("ActivateEventSource", json!({ "Name": "nonexistent" }));
5302        assert!(svc.activate_event_source(&req).is_err());
5303
5304        let req = make_request("DeactivateEventSource", json!({ "Name": "nonexistent" }));
5305        assert!(svc.deactivate_event_source(&req).is_err());
5306    }
5307
5308    #[test]
5309    fn delete_partner_event_source_verifies_account() {
5310        let svc = make_service();
5311
5312        // Create a partner event source
5313        let req = make_request(
5314            "CreatePartnerEventSource",
5315            json!({ "Name": "aws.partner/test", "Account": "123456789012" }),
5316        );
5317        svc.create_partner_event_source(&req).unwrap();
5318
5319        // Deleting with wrong account fails
5320        let req = make_request(
5321            "DeletePartnerEventSource",
5322            json!({ "Name": "aws.partner/test", "Account": "999999999999" }),
5323        );
5324        assert!(svc.delete_partner_event_source(&req).is_err());
5325        // Source still exists
5326        assert!(svc
5327            .state
5328            .read()
5329            .default_ref()
5330            .partner_event_sources
5331            .contains_key("aws.partner/test"));
5332
5333        // Deleting with correct account succeeds
5334        let req = make_request(
5335            "DeletePartnerEventSource",
5336            json!({ "Name": "aws.partner/test", "Account": "123456789012" }),
5337        );
5338        svc.delete_partner_event_source(&req).unwrap();
5339        assert!(!svc
5340            .state
5341            .read()
5342            .default_ref()
5343            .partner_event_sources
5344            .contains_key("aws.partner/test"));
5345
5346        // Deleting non-existent source returns error
5347        let req = make_request(
5348            "DeletePartnerEventSource",
5349            json!({ "Name": "aws.partner/test", "Account": "123456789012" }),
5350        );
5351        assert!(svc.delete_partner_event_source(&req).is_err());
5352    }
5353
5354    #[test]
5355    fn put_partner_events() {
5356        let svc = make_service();
5357        let req = make_request(
5358            "PutPartnerEvents",
5359            json!({
5360                "Entries": [
5361                    { "Source": "partner.app", "DetailType": "Test", "Detail": "{}" }
5362                ]
5363            }),
5364        );
5365        let resp = svc.put_partner_events(&req).unwrap();
5366        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5367        assert_eq!(body["FailedEntryCount"], 0);
5368        assert_eq!(body["Entries"].as_array().unwrap().len(), 1);
5369        assert!(body["Entries"][0]["EventId"].as_str().is_some());
5370    }
5371
5372    // ---- Archive + Replay delivery tests ----
5373
5374    /// Helper: create a service with a mock SQS delivery that records messages.
5375    #[allow(clippy::type_complexity)]
5376    fn make_service_with_sqs_recorder() -> (
5377        EventBridgeService,
5378        Arc<parking_lot::Mutex<Vec<(String, String)>>>,
5379    ) {
5380        use fakecloud_core::delivery::SqsDelivery;
5381
5382        struct RecordingSqsDelivery {
5383            messages: Arc<parking_lot::Mutex<Vec<(String, String)>>>,
5384        }
5385
5386        impl SqsDelivery for RecordingSqsDelivery {
5387            fn deliver_to_queue(
5388                &self,
5389                queue_arn: &str,
5390                message_body: &str,
5391                _attributes: &HashMap<String, String>,
5392            ) {
5393                self.messages
5394                    .lock()
5395                    .push((queue_arn.to_string(), message_body.to_string()));
5396            }
5397        }
5398
5399        let messages: Arc<parking_lot::Mutex<Vec<(String, String)>>> =
5400            Arc::new(parking_lot::Mutex::new(Vec::new()));
5401        let state = Arc::new(RwLock::new(
5402            fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
5403        ));
5404        let delivery = Arc::new(DeliveryBus::new().with_sqs(Arc::new(RecordingSqsDelivery {
5405            messages: messages.clone(),
5406        })));
5407        let svc = EventBridgeService::new(state, delivery);
5408        (svc, messages)
5409    }
5410
5411    #[test]
5412    fn start_replay_delivers_archived_events_to_sqs_target() {
5413        let (svc, messages) = make_service_with_sqs_recorder();
5414        let queue_arn = "arn:aws:sqs:us-east-1:123456789012:replay-queue";
5415
5416        // Create a rule with an SQS target
5417        let req = make_request(
5418            "PutRule",
5419            json!({
5420                "Name": "replay-test-rule",
5421                "EventPattern": r#"{"source": ["my.app"]}"#,
5422                "State": "ENABLED"
5423            }),
5424        );
5425        svc.put_rule(&req).unwrap();
5426
5427        let req = make_request(
5428            "PutTargets",
5429            json!({
5430                "Rule": "replay-test-rule",
5431                "Targets": [{
5432                    "Id": "sqs-target",
5433                    "Arn": queue_arn
5434                }]
5435            }),
5436        );
5437        svc.put_targets(&req).unwrap();
5438
5439        // Create an archive on the default bus
5440        let req = make_request(
5441            "CreateArchive",
5442            json!({
5443                "ArchiveName": "test-archive",
5444                "EventSourceArn": "arn:aws:events:us-east-1:123456789012:event-bus/default"
5445            }),
5446        );
5447        svc.create_archive(&req).unwrap();
5448
5449        // PutEvents: these should get archived and delivered
5450        let req = make_request(
5451            "PutEvents",
5452            json!({
5453                "Entries": [
5454                    {
5455                        "Source": "my.app",
5456                        "DetailType": "OrderCreated",
5457                        "Detail": "{\"orderId\": \"1\"}",
5458                        "EventBusName": "default"
5459                    },
5460                    {
5461                        "Source": "my.app",
5462                        "DetailType": "OrderShipped",
5463                        "Detail": "{\"orderId\": \"2\"}",
5464                        "EventBusName": "default"
5465                    }
5466                ]
5467            }),
5468        );
5469        let resp = svc.put_events(&req).unwrap();
5470        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5471        assert_eq!(body["FailedEntryCount"], 0);
5472
5473        // Verify archive has 2 events
5474        {
5475            let _mas = svc.state.read();
5476            let state = _mas.default_ref();
5477            let archive = state.archives.get("test-archive").unwrap();
5478            assert_eq!(archive.events.len(), 2);
5479            assert_eq!(archive.event_count, 2);
5480        }
5481
5482        // Clear recorded messages from PutEvents delivery
5483        messages.lock().clear();
5484
5485        // StartReplay: should re-deliver the archived events
5486        let archive_arn = {
5487            let _mas = svc.state.read();
5488            let state = _mas.default_ref();
5489            state.archives.get("test-archive").unwrap().arn.clone()
5490        };
5491
5492        // Use a wide time range to capture all events
5493        let start_ts = 0.0_f64;
5494        let end_ts = (chrono::Utc::now().timestamp() + 3600) as f64;
5495
5496        let req = make_request(
5497            "StartReplay",
5498            json!({
5499                "ReplayName": "my-replay",
5500                "EventSourceArn": archive_arn,
5501                "Destination": {
5502                    "Arn": "arn:aws:events:us-east-1:123456789012:event-bus/default"
5503                },
5504                "EventStartTime": start_ts,
5505                "EventEndTime": end_ts
5506            }),
5507        );
5508        let resp = svc.start_replay(&req).unwrap();
5509        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5510        assert_eq!(body["State"], "STARTING");
5511
5512        // Verify the replay delivered events to SQS
5513        let delivered = messages.lock();
5514        assert_eq!(
5515            delivered.len(),
5516            2,
5517            "expected 2 replayed events delivered to SQS"
5518        );
5519        for (arn, msg) in delivered.iter() {
5520            assert_eq!(arn, queue_arn);
5521            let event: Value = serde_json::from_str(msg).unwrap();
5522            assert_eq!(event["source"], "my.app");
5523            // Replayed events should include replay-name
5524            assert!(event["replay-name"].as_str().is_some());
5525        }
5526
5527        // Verify replay is marked as COMPLETED
5528        let _mas = svc.state.read();
5529        let state = _mas.default_ref();
5530        let replay = state.replays.get("my-replay").unwrap();
5531        assert_eq!(replay.state, "COMPLETED");
5532    }
5533
5534    #[test]
5535    fn apply_connection_auth_api_key() {
5536        let conn = Connection {
5537            name: "test-conn".to_string(),
5538            arn: "arn:aws:events:us-east-1:123456789012:connection/test-conn/uuid".to_string(),
5539            description: None,
5540            authorization_type: "API_KEY".to_string(),
5541            auth_parameters: json!({
5542                "ApiKeyAuthParameters": {
5543                    "ApiKeyName": "x-api-key",
5544                    "ApiKeyValue": "my-secret"
5545                }
5546            }),
5547            connection_state: "AUTHORIZED".to_string(),
5548            secret_arn: "arn:aws:secretsmanager:us-east-1:123456789012:secret:test".to_string(),
5549            creation_time: Utc::now(),
5550            last_modified_time: Utc::now(),
5551            last_authorized_time: Utc::now(),
5552        };
5553
5554        let client = reqwest::Client::new();
5555        let builder = client
5556            .post("http://localhost:12345/test")
5557            .header("Content-Type", "application/json");
5558        let builder = apply_connection_auth(builder, &conn);
5559
5560        // Build and verify the header was applied
5561        let request = builder.body("{}").build().unwrap();
5562        assert_eq!(
5563            request
5564                .headers()
5565                .get("x-api-key")
5566                .unwrap()
5567                .to_str()
5568                .unwrap(),
5569            "my-secret"
5570        );
5571    }
5572
5573    #[test]
5574    fn apply_connection_auth_basic() {
5575        let conn = Connection {
5576            name: "basic-conn".to_string(),
5577            arn: "arn:aws:events:us-east-1:123456789012:connection/basic-conn/uuid".to_string(),
5578            description: None,
5579            authorization_type: "BASIC".to_string(),
5580            auth_parameters: json!({
5581                "BasicAuthParameters": {
5582                    "Username": "user",
5583                    "Password": "pass"
5584                }
5585            }),
5586            connection_state: "AUTHORIZED".to_string(),
5587            secret_arn: "arn:aws:secretsmanager:us-east-1:123456789012:secret:test".to_string(),
5588            creation_time: Utc::now(),
5589            last_modified_time: Utc::now(),
5590            last_authorized_time: Utc::now(),
5591        };
5592
5593        let client = reqwest::Client::new();
5594        let builder = client.post("http://localhost:12345/test");
5595        let builder = apply_connection_auth(builder, &conn);
5596
5597        let request = builder.body("{}").build().unwrap();
5598        let auth_header = request
5599            .headers()
5600            .get("authorization")
5601            .unwrap()
5602            .to_str()
5603            .unwrap();
5604        assert!(
5605            auth_header.starts_with("Basic "),
5606            "Expected Basic auth header, got: {auth_header}"
5607        );
5608    }
5609
5610    #[tokio::test]
5611    async fn put_events_with_api_destination_target_resolves_destination() {
5612        // This test verifies that the PutEvents code path correctly identifies
5613        // api-destination ARN targets and resolves the destination metadata.
5614        // The actual HTTP call goes to a non-existent host (fire-and-forget).
5615        let state = Arc::new(RwLock::new(
5616            fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
5617        ));
5618        let delivery = Arc::new(DeliveryBus::new());
5619        let svc = EventBridgeService::new(state, delivery);
5620
5621        // Create connection and api destination
5622        create_connection(&svc, "my-conn");
5623        let conn_arn = {
5624            let _mas = svc.state.read();
5625            let state = _mas.default_ref();
5626            state.connections.get("my-conn").unwrap().arn.clone()
5627        };
5628        let req = make_request(
5629            "CreateApiDestination",
5630            json!({
5631                "Name": "my-dest",
5632                "ConnectionArn": conn_arn,
5633                "InvocationEndpoint": "http://127.0.0.1:1/noop",
5634                "HttpMethod": "POST"
5635            }),
5636        );
5637        svc.create_api_destination(&req).unwrap();
5638
5639        let dest_arn = {
5640            let _mas = svc.state.read();
5641            let state = _mas.default_ref();
5642            state.api_destinations.get("my-dest").unwrap().arn.clone()
5643        };
5644
5645        // Create a rule that targets the api-destination
5646        let req = make_request(
5647            "PutRule",
5648            json!({
5649                "Name": "api-dest-rule",
5650                "EventPattern": r#"{"source":["test.app"]}"#,
5651                "State": "ENABLED"
5652            }),
5653        );
5654        svc.put_rule(&req).unwrap();
5655
5656        let req = make_request(
5657            "PutTargets",
5658            json!({
5659                "Rule": "api-dest-rule",
5660                "Targets": [{ "Id": "dest-target", "Arn": dest_arn }]
5661            }),
5662        );
5663        svc.put_targets(&req).unwrap();
5664
5665        // PutEvents - should match the rule and attempt delivery to ApiDestination
5666        let req = make_request(
5667            "PutEvents",
5668            json!({
5669                "Entries": [{
5670                    "Source": "test.app",
5671                    "DetailType": "TestEvent",
5672                    "Detail": r#"{"key":"value"}"#
5673                }]
5674            }),
5675        );
5676        let resp = svc.put_events(&req).unwrap();
5677        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5678        assert_eq!(body["FailedEntryCount"], 0);
5679        assert_eq!(body["Entries"].as_array().unwrap().len(), 1);
5680        assert!(body["Entries"][0]["EventId"].as_str().is_some());
5681    }
5682
5683    #[test]
5684    fn test_function_name_from_arn() {
5685        // Unqualified ARN
5686        assert_eq!(
5687            super::function_name_from_arn("arn:aws:lambda:us-east-1:123456789012:function:my-func"),
5688            "my-func"
5689        );
5690        // Qualified ARN with alias
5691        assert_eq!(
5692            super::function_name_from_arn(
5693                "arn:aws:lambda:us-east-1:123456789012:function:my-func:prod"
5694            ),
5695            "my-func"
5696        );
5697        // Qualified ARN with version
5698        assert_eq!(
5699            super::function_name_from_arn(
5700                "arn:aws:lambda:us-east-1:123456789012:function:my-func:42"
5701            ),
5702            "my-func"
5703        );
5704        // Plain function name (not an ARN)
5705        assert_eq!(super::function_name_from_arn("my-func"), "my-func");
5706    }
5707
5708    // ── Rules / targets / tags handler tests ────────────────────────
5709
5710    fn put_rule_simple(svc: &EventBridgeService, name: &str) {
5711        let req = make_request(
5712            "PutRule",
5713            json!({ "Name": name, "EventPattern": r#"{"source":["a"]}"# }),
5714        );
5715        svc.put_rule(&req).unwrap();
5716    }
5717
5718    #[test]
5719    fn put_rule_persists_event_pattern_and_state() {
5720        let svc = make_service();
5721        put_rule_simple(&svc, "r1");
5722        let _mas = svc.state.read();
5723        let state = _mas.default_ref();
5724        let rule = state
5725            .rules
5726            .get(&("default".to_string(), "r1".to_string()))
5727            .unwrap();
5728        assert_eq!(rule.state, "ENABLED");
5729        assert!(rule.event_pattern.is_some());
5730        assert!(rule.arn.contains("rule/r1"));
5731    }
5732
5733    #[test]
5734    fn put_rule_rejects_schedule_on_non_default_bus() {
5735        let svc = make_service();
5736        // Create a custom bus first.
5737        let bus_req = make_request("CreateEventBus", json!({ "Name": "custom" }));
5738        svc.create_event_bus(&bus_req).unwrap();
5739
5740        let req = make_request(
5741            "PutRule",
5742            json!({
5743                "Name": "r1",
5744                "EventBusName": "custom",
5745                "ScheduleExpression": "rate(5 minutes)"
5746            }),
5747        );
5748        let err = svc.put_rule(&req).err().expect("expected error");
5749        assert_eq!(err.code(), "ValidationException");
5750    }
5751
5752    #[test]
5753    fn put_rule_rejects_unknown_event_bus() {
5754        let svc = make_service();
5755        let req = make_request(
5756            "PutRule",
5757            json!({ "Name": "r1", "EventBusName": "ghost", "EventPattern": r#"{"source":["a"]}"# }),
5758        );
5759        let err = svc.put_rule(&req).err().expect("expected error");
5760        assert_eq!(err.code(), "ResourceNotFoundException");
5761    }
5762
5763    #[test]
5764    fn put_rule_overlay_preserves_existing_targets() {
5765        let svc = make_service();
5766        put_rule_simple(&svc, "r1");
5767        // Inject a target directly.
5768        {
5769            let mut _mas = svc.state.write();
5770            let state = _mas.default_mut();
5771            let rule = state
5772                .rules
5773                .get_mut(&("default".to_string(), "r1".to_string()))
5774                .unwrap();
5775            rule.targets.push(crate::state::EventTarget {
5776                id: "t1".to_string(),
5777                arn: "arn:aws:sqs:us-east-1:123456789012:q".to_string(),
5778                input: None,
5779                input_path: None,
5780                input_transformer: None,
5781                sqs_parameters: None,
5782            });
5783        }
5784
5785        // Re-PutRule with new description; targets should survive.
5786        let req = make_request(
5787            "PutRule",
5788            json!({ "Name": "r1", "Description": "updated", "EventPattern": r#"{"source":["a"]}"# }),
5789        );
5790        svc.put_rule(&req).unwrap();
5791        let _mas = svc.state.read();
5792        let state = _mas.default_ref();
5793        let rule = state
5794            .rules
5795            .get(&("default".to_string(), "r1".to_string()))
5796            .unwrap();
5797        assert_eq!(rule.description.as_deref(), Some("updated"));
5798        assert_eq!(rule.targets.len(), 1);
5799    }
5800
5801    #[test]
5802    fn delete_rule_with_targets_errors() {
5803        let svc = make_service();
5804        put_rule_simple(&svc, "r1");
5805        let put_targets_req = make_request(
5806            "PutTargets",
5807            json!({
5808                "Rule": "r1",
5809                "Targets": [{ "Id": "t1", "Arn": "arn:aws:sqs:us-east-1:123456789012:q" }]
5810            }),
5811        );
5812        svc.put_targets(&put_targets_req).unwrap();
5813
5814        let req = make_request("DeleteRule", json!({ "Name": "r1" }));
5815        let err = svc.delete_rule(&req).err().expect("expected error");
5816        assert_eq!(err.code(), "ValidationException");
5817    }
5818
5819    #[test]
5820    fn delete_rule_after_remove_targets_succeeds() {
5821        let svc = make_service();
5822        put_rule_simple(&svc, "r1");
5823        let put_t = make_request(
5824            "PutTargets",
5825            json!({
5826                "Rule": "r1",
5827                "Targets": [{ "Id": "t1", "Arn": "arn:aws:sqs:us-east-1:123456789012:q" }]
5828            }),
5829        );
5830        svc.put_targets(&put_t).unwrap();
5831        let rm_t = make_request("RemoveTargets", json!({ "Rule": "r1", "Ids": ["t1"] }));
5832        svc.remove_targets(&rm_t).unwrap();
5833        let del = make_request("DeleteRule", json!({ "Name": "r1" }));
5834        svc.delete_rule(&del).unwrap();
5835        assert!(!svc
5836            .state
5837            .read()
5838            .default_ref()
5839            .rules
5840            .contains_key(&("default".to_string(), "r1".to_string())));
5841    }
5842
5843    #[test]
5844    fn enable_disable_rule_toggles_state() {
5845        let svc = make_service();
5846        put_rule_simple(&svc, "r1");
5847        let dis = make_request("DisableRule", json!({ "Name": "r1" }));
5848        svc.disable_rule(&dis).unwrap();
5849        assert_eq!(
5850            svc.state
5851                .read()
5852                .default_ref()
5853                .rules
5854                .get(&("default".to_string(), "r1".to_string()))
5855                .unwrap()
5856                .state,
5857            "DISABLED"
5858        );
5859        let en = make_request("EnableRule", json!({ "Name": "r1" }));
5860        svc.enable_rule(&en).unwrap();
5861        assert_eq!(
5862            svc.state
5863                .read()
5864                .default_ref()
5865                .rules
5866                .get(&("default".to_string(), "r1".to_string()))
5867                .unwrap()
5868                .state,
5869            "ENABLED"
5870        );
5871    }
5872
5873    #[test]
5874    fn enable_rule_unknown_errors() {
5875        let svc = make_service();
5876        let req = make_request("EnableRule", json!({ "Name": "ghost" }));
5877        let err = svc.enable_rule(&req).err().expect("expected error");
5878        assert_eq!(err.code(), "ResourceNotFoundException");
5879    }
5880
5881    #[test]
5882    fn list_rules_with_name_prefix_filter() {
5883        let svc = make_service();
5884        put_rule_simple(&svc, "prod-orders");
5885        put_rule_simple(&svc, "prod-shipping");
5886        put_rule_simple(&svc, "dev-orders");
5887
5888        let req = make_request("ListRules", json!({ "NamePrefix": "prod-" }));
5889        let resp = svc.list_rules(&req).unwrap();
5890        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5891        let names: Vec<&str> = body["Rules"]
5892            .as_array()
5893            .unwrap()
5894            .iter()
5895            .map(|r| r["Name"].as_str().unwrap())
5896            .collect();
5897        assert_eq!(names.len(), 2);
5898        assert!(names.iter().all(|n| n.starts_with("prod-")));
5899    }
5900
5901    #[test]
5902    fn list_rules_pagination_emits_next_token() {
5903        let svc = make_service();
5904        for i in 0..5 {
5905            put_rule_simple(&svc, &format!("r{i}"));
5906        }
5907        let req = make_request("ListRules", json!({ "Limit": 2 }));
5908        let resp = svc.list_rules(&req).unwrap();
5909        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5910        assert_eq!(body["Rules"].as_array().unwrap().len(), 2);
5911        assert!(body["NextToken"].is_string());
5912    }
5913
5914    #[test]
5915    fn describe_rule_returns_persisted_fields() {
5916        let svc = make_service();
5917        let put = make_request(
5918            "PutRule",
5919            json!({
5920                "Name": "r1",
5921                "EventPattern": r#"{"source":["a"]}"#,
5922                "Description": "hi",
5923                "State": "DISABLED"
5924            }),
5925        );
5926        svc.put_rule(&put).unwrap();
5927        let desc = make_request("DescribeRule", json!({ "Name": "r1" }));
5928        let resp = svc.describe_rule(&desc).unwrap();
5929        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5930        assert_eq!(body["Name"], json!("r1"));
5931        assert_eq!(body["State"], json!("DISABLED"));
5932        assert_eq!(body["Description"], json!("hi"));
5933    }
5934
5935    #[test]
5936    fn describe_rule_unknown_errors() {
5937        let svc = make_service();
5938        let req = make_request("DescribeRule", json!({ "Name": "ghost" }));
5939        let err = svc.describe_rule(&req).err().expect("expected error");
5940        assert_eq!(err.code(), "ResourceNotFoundException");
5941    }
5942
5943    #[test]
5944    fn put_targets_rejects_fifo_without_sqs_parameters() {
5945        let svc = make_service();
5946        put_rule_simple(&svc, "r1");
5947        let req = make_request(
5948            "PutTargets",
5949            json!({
5950                "Rule": "r1",
5951                "Targets": [{ "Id": "t1", "Arn": "arn:aws:sqs:us-east-1:123456789012:q.fifo" }]
5952            }),
5953        );
5954        let err = svc.put_targets(&req).err().expect("expected error");
5955        assert_eq!(err.code(), "ValidationException");
5956    }
5957
5958    #[test]
5959    fn put_targets_rejects_invalid_arn() {
5960        let svc = make_service();
5961        put_rule_simple(&svc, "r1");
5962        let req = make_request(
5963            "PutTargets",
5964            json!({
5965                "Rule": "r1",
5966                "Targets": [{ "Id": "t1", "Arn": "not-an-arn" }]
5967            }),
5968        );
5969        let err = svc.put_targets(&req).err().expect("expected error");
5970        assert_eq!(err.code(), "ValidationException");
5971    }
5972
5973    #[test]
5974    fn put_targets_unknown_rule_errors() {
5975        let svc = make_service();
5976        let req = make_request(
5977            "PutTargets",
5978            json!({
5979                "Rule": "ghost",
5980                "Targets": [{ "Id": "t1", "Arn": "arn:aws:sqs:us-east-1:123456789012:q" }]
5981            }),
5982        );
5983        let err = svc.put_targets(&req).err().expect("expected error");
5984        assert_eq!(err.code(), "ResourceNotFoundException");
5985    }
5986
5987    #[test]
5988    fn put_targets_replaces_existing_with_same_id() {
5989        let svc = make_service();
5990        put_rule_simple(&svc, "r1");
5991        let first = make_request(
5992            "PutTargets",
5993            json!({
5994                "Rule": "r1",
5995                "Targets": [{ "Id": "t1", "Arn": "arn:aws:sqs:us-east-1:123456789012:q1" }]
5996            }),
5997        );
5998        svc.put_targets(&first).unwrap();
5999        let second = make_request(
6000            "PutTargets",
6001            json!({
6002                "Rule": "r1",
6003                "Targets": [{ "Id": "t1", "Arn": "arn:aws:sqs:us-east-1:123456789012:q2" }]
6004            }),
6005        );
6006        svc.put_targets(&second).unwrap();
6007
6008        let _mas = svc.state.read();
6009        let state = _mas.default_ref();
6010        let rule = state
6011            .rules
6012            .get(&("default".to_string(), "r1".to_string()))
6013            .unwrap();
6014        assert_eq!(rule.targets.len(), 1);
6015        assert!(rule.targets[0].arn.ends_with("q2"));
6016    }
6017
6018    #[test]
6019    fn list_targets_by_rule_returns_pagination_token() {
6020        let svc = make_service();
6021        put_rule_simple(&svc, "r1");
6022        for i in 0..4 {
6023            let req = make_request(
6024                "PutTargets",
6025                json!({
6026                    "Rule": "r1",
6027                    "Targets": [{
6028                        "Id": format!("t{i}"),
6029                        "Arn": format!("arn:aws:sqs:us-east-1:123456789012:q{i}")
6030                    }]
6031                }),
6032            );
6033            svc.put_targets(&req).unwrap();
6034        }
6035        let req = make_request("ListTargetsByRule", json!({ "Rule": "r1", "Limit": 2 }));
6036        let resp = svc.list_targets_by_rule(&req).unwrap();
6037        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6038        assert_eq!(body["Targets"].as_array().unwrap().len(), 2);
6039        assert!(body["NextToken"].is_string());
6040    }
6041
6042    #[test]
6043    fn list_rule_names_by_target_groups_by_arn() {
6044        let svc = make_service();
6045        put_rule_simple(&svc, "r1");
6046        put_rule_simple(&svc, "r2");
6047        for rule in ["r1", "r2"] {
6048            let req = make_request(
6049                "PutTargets",
6050                json!({
6051                    "Rule": rule,
6052                    "Targets": [{
6053                        "Id": "t1",
6054                        "Arn": "arn:aws:sqs:us-east-1:123456789012:shared"
6055                    }]
6056                }),
6057            );
6058            svc.put_targets(&req).unwrap();
6059        }
6060        let req = make_request(
6061            "ListRuleNamesByTarget",
6062            json!({ "TargetArn": "arn:aws:sqs:us-east-1:123456789012:shared" }),
6063        );
6064        let resp = svc.list_rule_names_by_target(&req).unwrap();
6065        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6066        let names: Vec<&str> = body["RuleNames"]
6067            .as_array()
6068            .unwrap()
6069            .iter()
6070            .map(|v| v.as_str().unwrap())
6071            .collect();
6072        assert_eq!(names, vec!["r1", "r2"]);
6073    }
6074
6075    // ── Tag operations ───────────────────────────────────────────────
6076
6077    #[test]
6078    fn tag_then_list_tags_for_rule() {
6079        let svc = make_service();
6080        put_rule_simple(&svc, "r1");
6081        let arn = svc
6082            .state
6083            .read()
6084            .default_ref()
6085            .rules
6086            .get(&("default".to_string(), "r1".to_string()))
6087            .unwrap()
6088            .arn
6089            .clone();
6090
6091        let tag_req = make_request(
6092            "TagResource",
6093            json!({
6094                "ResourceARN": arn,
6095                "Tags": [{ "Key": "env", "Value": "prod" }]
6096            }),
6097        );
6098        svc.tag_resource(&tag_req).unwrap();
6099
6100        let list_req = make_request("ListTagsForResource", json!({ "ResourceARN": arn }));
6101        let resp = svc.list_tags_for_resource(&list_req).unwrap();
6102        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6103        let tags = body["Tags"].as_array().unwrap();
6104        assert_eq!(tags.len(), 1);
6105        assert_eq!(tags[0]["Key"], json!("env"));
6106        assert_eq!(tags[0]["Value"], json!("prod"));
6107    }
6108
6109    #[test]
6110    fn untag_resource_removes_listed_keys() {
6111        let svc = make_service();
6112        put_rule_simple(&svc, "r1");
6113        let arn = svc
6114            .state
6115            .read()
6116            .default_ref()
6117            .rules
6118            .get(&("default".to_string(), "r1".to_string()))
6119            .unwrap()
6120            .arn
6121            .clone();
6122        let tag_req = make_request(
6123            "TagResource",
6124            json!({
6125                "ResourceARN": &arn,
6126                "Tags": [{ "Key": "env", "Value": "prod" }, { "Key": "team", "Value": "core" }]
6127            }),
6128        );
6129        svc.tag_resource(&tag_req).unwrap();
6130
6131        let untag = make_request(
6132            "UntagResource",
6133            json!({ "ResourceARN": &arn, "TagKeys": ["env"] }),
6134        );
6135        svc.untag_resource(&untag).unwrap();
6136
6137        let _mas = svc.state.read();
6138        let state = _mas.default_ref();
6139        let rule = state
6140            .rules
6141            .get(&("default".to_string(), "r1".to_string()))
6142            .unwrap();
6143        assert!(!rule.tags.contains_key("env"));
6144        assert_eq!(rule.tags.get("team").map(String::as_str), Some("core"));
6145    }
6146
6147    // ── TestEventPattern ─────────────────────────────────────────────
6148
6149    #[test]
6150    fn test_event_pattern_returns_result_field() {
6151        let svc = make_service();
6152        let req = make_request(
6153            "TestEventPattern",
6154            json!({
6155                "EventPattern": r#"{"source":["my.app"]}"#,
6156                "Event": r#"{"source":"my.app","detail-type":"x","detail":{}}"#
6157            }),
6158        );
6159        let resp = svc.test_event_pattern(&req).unwrap();
6160        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6161        assert_eq!(body["Result"], json!(true));
6162    }
6163
6164    // ── Event bus describe / delete ──────────────────────────────────
6165
6166    #[test]
6167    fn describe_event_bus_default_returns_arn() {
6168        let svc = make_service();
6169        let req = make_request("DescribeEventBus", json!({}));
6170        let resp = svc.describe_event_bus(&req).unwrap();
6171        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6172        assert_eq!(body["Name"], json!("default"));
6173        assert!(body["Arn"].as_str().unwrap().contains("event-bus/default"));
6174    }
6175
6176    #[test]
6177    fn delete_event_bus_default_fails() {
6178        let svc = make_service();
6179        let req = make_request("DeleteEventBus", json!({ "Name": "default" }));
6180        let err = svc.delete_event_bus(&req).err().expect("expected error");
6181        assert_eq!(err.code(), "ValidationException");
6182    }
6183
6184    // ── Error branch tests ──
6185
6186    #[test]
6187    fn describe_rule_not_found() {
6188        let svc = make_service();
6189        let req = make_request("DescribeRule", json!({"Name": "nonexistent"}));
6190        let err = svc.describe_rule(&req).err().expect("expected error");
6191        assert_eq!(err.code(), "ResourceNotFoundException");
6192    }
6193
6194    #[test]
6195    fn delete_rule_nonexistent_is_noop() {
6196        let svc = make_service();
6197        let req = make_request("DeleteRule", json!({"Name": "nope"}));
6198        // EventBridge returns success for deleting nonexistent rules
6199        svc.delete_rule(&req).unwrap();
6200    }
6201
6202    #[test]
6203    fn put_targets_rule_not_found() {
6204        let svc = make_service();
6205        let req = make_request(
6206            "PutTargets",
6207            json!({"Rule": "ghost", "Targets": [{"Id": "t1", "Arn": "arn:a"}]}),
6208        );
6209        let err = svc.put_targets(&req).err().expect("expected error");
6210        assert_eq!(err.code(), "ResourceNotFoundException");
6211    }
6212
6213    #[test]
6214    fn remove_targets_rule_not_found() {
6215        let svc = make_service();
6216        let req = make_request("RemoveTargets", json!({"Rule": "ghost", "Ids": ["t1"]}));
6217        let err = svc.remove_targets(&req).err().expect("expected error");
6218        assert_eq!(err.code(), "ResourceNotFoundException");
6219    }
6220
6221    #[test]
6222    fn list_targets_by_rule_not_found() {
6223        let svc = make_service();
6224        let req = make_request("ListTargetsByRule", json!({"Rule": "ghost"}));
6225        let err = svc
6226            .list_targets_by_rule(&req)
6227            .err()
6228            .expect("expected error");
6229        assert_eq!(err.code(), "ResourceNotFoundException");
6230    }
6231
6232    #[test]
6233    fn enable_rule_not_found() {
6234        let svc = make_service();
6235        let req = make_request("EnableRule", json!({"Name": "ghost"}));
6236        let err = svc.enable_rule(&req).err().expect("expected error");
6237        assert_eq!(err.code(), "ResourceNotFoundException");
6238    }
6239
6240    #[test]
6241    fn disable_rule_not_found() {
6242        let svc = make_service();
6243        let req = make_request("DisableRule", json!({"Name": "ghost"}));
6244        let err = svc.disable_rule(&req).err().expect("expected error");
6245        assert_eq!(err.code(), "ResourceNotFoundException");
6246    }
6247
6248    #[test]
6249    fn describe_event_bus_not_found() {
6250        let svc = make_service();
6251        let req = make_request("DescribeEventBus", json!({"Name": "nonexistent-bus"}));
6252        let err = svc.describe_event_bus(&req).err().expect("expected error");
6253        assert_eq!(err.code(), "ResourceNotFoundException");
6254    }
6255
6256    #[test]
6257    fn tag_resource_not_found() {
6258        let svc = make_service();
6259        let req = make_request(
6260            "TagResource",
6261            json!({"ResourceARN": "arn:aws:events:us-east-1:123:nope", "Tags": [{"Key": "k", "Value": "v"}]}),
6262        );
6263        let err = svc.tag_resource(&req).err().expect("expected error");
6264        assert_eq!(err.code(), "ResourceNotFoundException");
6265    }
6266
6267    #[test]
6268    fn untag_resource_not_found() {
6269        let svc = make_service();
6270        let req = make_request(
6271            "UntagResource",
6272            json!({"ResourceARN": "arn:aws:events:us-east-1:123:nope", "TagKeys": ["k"]}),
6273        );
6274        let err = svc.untag_resource(&req).err().expect("expected error");
6275        assert_eq!(err.code(), "ResourceNotFoundException");
6276    }
6277
6278    #[test]
6279    fn describe_archive_not_found() {
6280        let svc = make_service();
6281        let req = make_request("DescribeArchive", json!({"ArchiveName": "ghost"}));
6282        let err = svc.describe_archive(&req).err().expect("expected error");
6283        assert_eq!(err.code(), "ResourceNotFoundException");
6284    }
6285
6286    #[test]
6287    fn delete_archive_not_found() {
6288        let svc = make_service();
6289        let req = make_request("DeleteArchive", json!({"ArchiveName": "ghost"}));
6290        let err = svc.delete_archive(&req).err().expect("expected error");
6291        assert_eq!(err.code(), "ResourceNotFoundException");
6292    }
6293
6294    #[test]
6295    fn describe_connection_not_found() {
6296        let svc = make_service();
6297        let req = make_request("DescribeConnection", json!({"Name": "ghost"}));
6298        let err = svc.describe_connection(&req).err().expect("expected error");
6299        assert_eq!(err.code(), "ResourceNotFoundException");
6300    }
6301
6302    #[test]
6303    fn describe_api_destination_not_found() {
6304        let svc = make_service();
6305        let req = make_request("DescribeApiDestination", json!({"Name": "ghost"}));
6306        let err = svc
6307            .describe_api_destination(&req)
6308            .err()
6309            .expect("expected error");
6310        assert_eq!(err.code(), "ResourceNotFoundException");
6311    }
6312
6313    #[test]
6314    fn describe_replay_not_found() {
6315        let svc = make_service();
6316        let req = make_request("DescribeReplay", json!({"ReplayName": "ghost"}));
6317        let err = svc.describe_replay(&req).err().expect("expected error");
6318        assert_eq!(err.code(), "ResourceNotFoundException");
6319    }
6320
6321    #[test]
6322    fn create_event_bus_duplicate() {
6323        let svc = make_service();
6324        let req = make_request("CreateEventBus", json!({"Name": "dup-bus"}));
6325        svc.create_event_bus(&req).unwrap();
6326        let err = svc.create_event_bus(&req).err().expect("expected error");
6327        assert_eq!(err.code(), "ResourceAlreadyExistsException");
6328    }
6329
6330    // ── Rule lifecycle ──
6331
6332    #[test]
6333    fn rule_put_describe_enable_disable_delete() {
6334        let svc = make_service();
6335        svc.put_rule(&make_request(
6336            "PutRule",
6337            json!({"Name": "my-rule", "EventPattern": "{\"source\":[\"aws.s3\"]}", "State": "ENABLED"}),
6338        ))
6339        .unwrap();
6340
6341        let resp = svc
6342            .describe_rule(&make_request("DescribeRule", json!({"Name": "my-rule"})))
6343            .unwrap();
6344        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6345        assert_eq!(body["State"], "ENABLED");
6346
6347        svc.disable_rule(&make_request("DisableRule", json!({"Name": "my-rule"})))
6348            .unwrap();
6349        svc.enable_rule(&make_request("EnableRule", json!({"Name": "my-rule"})))
6350            .unwrap();
6351        svc.delete_rule(&make_request("DeleteRule", json!({"Name": "my-rule"})))
6352            .unwrap();
6353    }
6354
6355    #[test]
6356    fn list_rules_returns_created() {
6357        let svc = make_service();
6358        for name in &["r1", "r2", "r3"] {
6359            svc.put_rule(&make_request(
6360                "PutRule",
6361                json!({"Name": name, "EventPattern": "{\"source\":[\"aws.s3\"]}"}),
6362            ))
6363            .unwrap();
6364        }
6365        let resp = svc
6366            .list_rules(&make_request("ListRules", json!({})))
6367            .unwrap();
6368        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6369        assert_eq!(body["Rules"].as_array().unwrap().len(), 3);
6370    }
6371
6372    // ── Targets ──
6373
6374    #[test]
6375    fn put_list_remove_targets() {
6376        let svc = make_service();
6377        svc.put_rule(&make_request(
6378            "PutRule",
6379            json!({"Name": "tr", "EventPattern": "{\"source\":[\"aws.s3\"]}"}),
6380        ))
6381        .unwrap();
6382
6383        svc.put_targets(&make_request(
6384            "PutTargets",
6385            json!({
6386                "Rule": "tr",
6387                "Targets": [
6388                    {"Id": "t1", "Arn": "arn:aws:sqs:us-east-1:123456789012:q1"},
6389                    {"Id": "t2", "Arn": "arn:aws:lambda:us-east-1:123456789012:function:fn1"},
6390                ]
6391            }),
6392        ))
6393        .unwrap();
6394
6395        let resp = svc
6396            .list_targets_by_rule(&make_request("ListTargetsByRule", json!({"Rule": "tr"})))
6397            .unwrap();
6398        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6399        assert_eq!(body["Targets"].as_array().unwrap().len(), 2);
6400
6401        svc.remove_targets(&make_request(
6402            "RemoveTargets",
6403            json!({"Rule": "tr", "Ids": ["t1"]}),
6404        ))
6405        .unwrap();
6406
6407        let resp = svc
6408            .list_targets_by_rule(&make_request("ListTargetsByRule", json!({"Rule": "tr"})))
6409            .unwrap();
6410        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6411        assert_eq!(body["Targets"].as_array().unwrap().len(), 1);
6412    }
6413
6414    // ── PutEvents ──
6415
6416    #[test]
6417    fn put_events_basic() {
6418        let svc = make_service();
6419        let resp = svc
6420            .put_events(&make_request(
6421                "PutEvents",
6422                json!({
6423                    "Entries": [
6424                        {"Source": "aws.s3", "DetailType": "Object Created", "Detail": "{}"},
6425                    ]
6426                }),
6427            ))
6428            .unwrap();
6429        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6430        assert_eq!(body["FailedEntryCount"], 0);
6431    }
6432
6433    // ── Archives ──
6434
6435    #[test]
6436    fn archive_create_describe_list_delete() {
6437        let svc = make_service();
6438
6439        svc.create_archive(&make_request(
6440            "CreateArchive",
6441            json!({
6442                "ArchiveName": "my-archive",
6443                "EventSourceArn": "arn:aws:events:us-east-1:123456789012:event-bus/default",
6444            }),
6445        ))
6446        .unwrap();
6447
6448        let resp = svc
6449            .describe_archive(&make_request(
6450                "DescribeArchive",
6451                json!({"ArchiveName": "my-archive"}),
6452            ))
6453            .unwrap();
6454        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6455        assert_eq!(body["ArchiveName"], "my-archive");
6456
6457        let resp = svc
6458            .list_archives(&make_request("ListArchives", json!({})))
6459            .unwrap();
6460        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6461        assert!(!body["Archives"].as_array().unwrap().is_empty());
6462
6463        svc.delete_archive(&make_request(
6464            "DeleteArchive",
6465            json!({"ArchiveName": "my-archive"}),
6466        ))
6467        .unwrap();
6468    }
6469
6470    // ── Connections ──
6471
6472    #[test]
6473    fn connection_create_list_describe_deauthorize() {
6474        let svc = make_service();
6475
6476        svc.create_connection(&make_request(
6477            "CreateConnection",
6478            json!({
6479                "Name": "my-conn",
6480                "AuthorizationType": "API_KEY",
6481                "AuthParameters": {
6482                    "ApiKeyAuthParameters": {"ApiKeyName": "x-key", "ApiKeyValue": "secret"}
6483                }
6484            }),
6485        ))
6486        .unwrap();
6487
6488        let resp = svc
6489            .list_connections(&make_request("ListConnections", json!({})))
6490            .unwrap();
6491        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6492        assert!(!body["Connections"].as_array().unwrap().is_empty());
6493
6494        svc.describe_connection(&make_request(
6495            "DescribeConnection",
6496            json!({"Name": "my-conn"}),
6497        ))
6498        .unwrap();
6499        svc.deauthorize_connection(&make_request(
6500            "DeauthorizeConnection",
6501            json!({"Name": "my-conn"}),
6502        ))
6503        .unwrap();
6504    }
6505
6506    // ── Event bus list ──
6507
6508    #[test]
6509    fn list_event_buses_returns_default_and_custom() {
6510        let svc = make_service();
6511        svc.create_event_bus(&make_request(
6512            "CreateEventBus",
6513            json!({"Name": "custom-bus"}),
6514        ))
6515        .unwrap();
6516
6517        let resp = svc
6518            .list_event_buses(&make_request("ListEventBuses", json!({})))
6519            .unwrap();
6520        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6521        let names: Vec<&str> = body["EventBuses"]
6522            .as_array()
6523            .unwrap()
6524            .iter()
6525            .map(|v| v["Name"].as_str().unwrap())
6526            .collect();
6527        assert!(names.contains(&"default"));
6528        assert!(names.contains(&"custom-bus"));
6529    }
6530
6531    // ── Tags ──
6532
6533    #[test]
6534    fn tag_list_untag_rule_resource() {
6535        let svc = make_service();
6536        svc.put_rule(&make_request(
6537            "PutRule",
6538            json!({"Name": "tagged-rule", "EventPattern": "{\"source\":[\"aws.s3\"]}"}),
6539        ))
6540        .unwrap();
6541
6542        let arn = "arn:aws:events:us-east-1:123456789012:rule/tagged-rule";
6543
6544        svc.tag_resource(&make_request(
6545            "TagResource",
6546            json!({"ResourceARN": arn, "Tags": [{"Key": "env", "Value": "prod"}]}),
6547        ))
6548        .unwrap();
6549
6550        let resp = svc
6551            .list_tags_for_resource(&make_request(
6552                "ListTagsForResource",
6553                json!({"ResourceARN": arn}),
6554            ))
6555            .unwrap();
6556        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6557        assert_eq!(body["Tags"].as_array().unwrap().len(), 1);
6558
6559        svc.untag_resource(&make_request(
6560            "UntagResource",
6561            json!({"ResourceARN": arn, "TagKeys": ["env"]}),
6562        ))
6563        .unwrap();
6564    }
6565
6566    // ── put_permission / remove_permission ──
6567
6568    #[test]
6569    fn put_permission_with_policy_json() {
6570        let svc = make_service();
6571        let policy = r#"{"Version":"2012-10-17","Statement":[]}"#;
6572        let req = make_request("PutPermission", json!({"Policy": policy}));
6573        svc.put_permission(&req).unwrap();
6574    }
6575
6576    #[test]
6577    fn put_permission_invalid_action_errors() {
6578        let svc = make_service();
6579        let req = make_request(
6580            "PutPermission",
6581            json!({
6582                "Action": "events:NotARealAction",
6583                "Principal": "123456789012",
6584                "StatementId": "s1"
6585            }),
6586        );
6587        assert!(svc.put_permission(&req).is_err());
6588    }
6589
6590    #[test]
6591    fn put_permission_unknown_bus_errors() {
6592        let svc = make_service();
6593        let req = make_request(
6594            "PutPermission",
6595            json!({
6596                "EventBusName": "missing",
6597                "Action": "events:PutEvents",
6598                "Principal": "123456789012",
6599                "StatementId": "s1"
6600            }),
6601        );
6602        assert!(svc.put_permission(&req).is_err());
6603    }
6604
6605    #[test]
6606    fn put_permission_add_and_remove_statement() {
6607        let svc = make_service();
6608        let req = make_request(
6609            "PutPermission",
6610            json!({
6611                "Action": "events:PutEvents",
6612                "Principal": "123456789012",
6613                "StatementId": "s1"
6614            }),
6615        );
6616        svc.put_permission(&req).unwrap();
6617
6618        let req = make_request("RemovePermission", json!({"StatementId": "s1"}));
6619        svc.remove_permission(&req).unwrap();
6620    }
6621
6622    #[test]
6623    fn remove_permission_remove_all_flag() {
6624        let svc = make_service();
6625        let req = make_request(
6626            "PutPermission",
6627            json!({
6628                "Action": "events:PutEvents",
6629                "Principal": "123456789012",
6630                "StatementId": "s1"
6631            }),
6632        );
6633        svc.put_permission(&req).unwrap();
6634
6635        let req = make_request("RemovePermission", json!({"RemoveAllPermissions": true}));
6636        svc.remove_permission(&req).unwrap();
6637    }
6638
6639    #[test]
6640    fn remove_permission_unknown_bus_errors() {
6641        let svc = make_service();
6642        let req = make_request(
6643            "RemovePermission",
6644            json!({"EventBusName": "missing", "StatementId": "s1"}),
6645        );
6646        assert!(svc.remove_permission(&req).is_err());
6647    }
6648
6649    #[test]
6650    fn remove_permission_no_policy_errors() {
6651        let svc = make_service();
6652        let req = make_request("RemovePermission", json!({"StatementId": "s1"}));
6653        assert!(svc.remove_permission(&req).is_err());
6654    }
6655
6656    #[test]
6657    fn remove_permission_unknown_statement_errors() {
6658        let svc = make_service();
6659        svc.put_permission(&make_request(
6660            "PutPermission",
6661            json!({
6662                "Action": "events:PutEvents",
6663                "Principal": "123456789012",
6664                "StatementId": "s1"
6665            }),
6666        ))
6667        .unwrap();
6668
6669        let req = make_request("RemovePermission", json!({"StatementId": "ghost"}));
6670        assert!(svc.remove_permission(&req).is_err());
6671    }
6672
6673    // ── put_rule invalid schedule expression ──
6674
6675    #[test]
6676    fn put_rule_missing_name_errors() {
6677        let svc = make_service();
6678        let req = make_request("PutRule", json!({}));
6679        assert!(svc.put_rule(&req).is_err());
6680    }
6681
6682    #[test]
6683    fn put_rule_name_too_long_errors() {
6684        let svc = make_service();
6685        let name = "x".repeat(65);
6686        let req = make_request("PutRule", json!({"Name": name}));
6687        assert!(svc.put_rule(&req).is_err());
6688    }
6689
6690    #[test]
6691    fn put_rule_invalid_state_errors() {
6692        let svc = make_service();
6693        let req = make_request("PutRule", json!({"Name": "r1", "State": "BOGUS"}));
6694        assert!(svc.put_rule(&req).is_err());
6695    }
6696
6697    // ── create_connection variants ──
6698
6699    #[test]
6700    fn create_connection_api_key_auth() {
6701        let svc = make_service();
6702        let req = make_request(
6703            "CreateConnection",
6704            json!({
6705                "Name": "conn-apikey",
6706                "AuthorizationType": "API_KEY",
6707                "AuthParameters": {
6708                    "ApiKeyAuthParameters": {
6709                        "ApiKeyName": "X-Api-Key",
6710                        "ApiKeyValue": "secret"
6711                    }
6712                }
6713            }),
6714        );
6715        svc.create_connection(&req).unwrap();
6716    }
6717
6718    #[test]
6719    fn create_connection_basic_auth() {
6720        let svc = make_service();
6721        let req = make_request(
6722            "CreateConnection",
6723            json!({
6724                "Name": "conn-basic",
6725                "AuthorizationType": "BASIC",
6726                "AuthParameters": {
6727                    "BasicAuthParameters": {
6728                        "Username": "u",
6729                        "Password": "p"
6730                    }
6731                }
6732            }),
6733        );
6734        svc.create_connection(&req).unwrap();
6735    }
6736
6737    #[test]
6738    fn create_connection_missing_name_errors() {
6739        let svc = make_service();
6740        let req = make_request("CreateConnection", json!({"AuthorizationType": "API_KEY"}));
6741        assert!(svc.create_connection(&req).is_err());
6742    }
6743
6744    #[test]
6745    fn create_connection_missing_auth_type_errors() {
6746        let svc = make_service();
6747        let req = make_request("CreateConnection", json!({"Name": "c-noauth"}));
6748        assert!(svc.create_connection(&req).is_err());
6749    }
6750
6751    #[test]
6752    fn delete_connection_not_found() {
6753        let svc = make_service();
6754        let req = make_request("DeleteConnection", json!({"Name": "ghost"}));
6755        assert!(svc.delete_connection(&req).is_err());
6756    }
6757
6758    // ── api destination validation ──
6759
6760    #[test]
6761    fn create_api_destination_missing_name_errors() {
6762        let svc = make_service();
6763        let req = make_request(
6764            "CreateApiDestination",
6765            json!({
6766                "ConnectionArn": "arn:aws:events:us-east-1:123456789012:connection/c",
6767                "InvocationEndpoint": "https://example.com",
6768                "HttpMethod": "POST"
6769            }),
6770        );
6771        assert!(svc.create_api_destination(&req).is_err());
6772    }
6773
6774    #[test]
6775    fn create_api_destination_invalid_method_errors() {
6776        let svc = make_service();
6777        create_connection(&svc, "conn-m");
6778        let guard = svc.state.read();
6779        let st = guard.default_ref();
6780        let conn_arn = st
6781            .connections
6782            .get("conn-m")
6783            .map(|c| c.arn.clone())
6784            .unwrap_or_default();
6785        drop(guard);
6786
6787        let req = make_request(
6788            "CreateApiDestination",
6789            json!({
6790                "Name": "d1",
6791                "ConnectionArn": conn_arn,
6792                "InvocationEndpoint": "https://example.com",
6793                "HttpMethod": "FLY"
6794            }),
6795        );
6796        assert!(svc.create_api_destination(&req).is_err());
6797    }
6798
6799    #[test]
6800    fn delete_api_destination_not_found() {
6801        let svc = make_service();
6802        let req = make_request("DeleteApiDestination", json!({"Name": "ghost"}));
6803        assert!(svc.delete_api_destination(&req).is_err());
6804    }
6805
6806    // ── archive error paths ──
6807
6808    #[test]
6809    fn create_archive_missing_name_errors() {
6810        let svc = make_service();
6811        let req = make_request(
6812            "CreateArchive",
6813            json!({"EventSourceArn": "arn:aws:events:us-east-1:123456789012:event-bus/default"}),
6814        );
6815        assert!(svc.create_archive(&req).is_err());
6816    }
6817
6818    #[test]
6819    fn create_archive_missing_source_arn_errors() {
6820        let svc = make_service();
6821        let req = make_request("CreateArchive", json!({"ArchiveName": "arc1"}));
6822        assert!(svc.create_archive(&req).is_err());
6823    }
6824
6825    #[test]
6826    fn delete_archive_missing_errors() {
6827        let svc = make_service();
6828        let req = make_request("DeleteArchive", json!({"ArchiveName": "ghost"}));
6829        assert!(svc.delete_archive(&req).is_err());
6830    }
6831
6832    // ── replay error paths ──
6833
6834    #[test]
6835    fn cancel_replay_not_found() {
6836        let svc = make_service();
6837        let req = make_request("CancelReplay", json!({"ReplayName": "ghost"}));
6838        assert!(svc.cancel_replay(&req).is_err());
6839    }
6840
6841    // ── put_events empty ──
6842
6843    #[test]
6844    fn put_events_empty_entries_errors() {
6845        let svc = make_service();
6846        let req = make_request("PutEvents", json!({"Entries": []}));
6847        assert!(svc.put_events(&req).is_err());
6848    }
6849
6850    #[test]
6851    fn put_events_success_count() {
6852        let svc = make_service();
6853        let req = make_request(
6854            "PutEvents",
6855            json!({
6856                "Entries": [
6857                    {"Source": "my.app", "DetailType": "Test", "Detail": "{}"}
6858                ]
6859            }),
6860        );
6861        let resp = svc.put_events(&req).unwrap();
6862        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6863        assert_eq!(body["FailedEntryCount"], 0);
6864        assert_eq!(body["Entries"].as_array().unwrap().len(), 1);
6865    }
6866
6867    // ── list_tags_for_resource on unknown ARN ──
6868
6869    #[test]
6870    fn list_tags_for_resource_unknown_errors() {
6871        let svc = make_service();
6872        let req = make_request(
6873            "ListTagsForResource",
6874            json!({
6875                "ResourceARN": "arn:aws:events:us-east-1:123456789012:rule/ghost"
6876            }),
6877        );
6878        assert!(svc.list_tags_for_resource(&req).is_err());
6879    }
6880
6881    // ── describe_rule with EventBusName ──
6882
6883    #[test]
6884    fn describe_rule_custom_bus() {
6885        let svc = make_service();
6886        svc.create_event_bus(&make_request("CreateEventBus", json!({"Name": "cb"})))
6887            .unwrap();
6888
6889        svc.put_rule(&make_request(
6890            "PutRule",
6891            json!({
6892                "Name": "r-cb",
6893                "EventPattern": "{\"source\":[\"aws.s3\"]}",
6894                "EventBusName": "cb"
6895            }),
6896        ))
6897        .unwrap();
6898
6899        let resp = svc
6900            .describe_rule(&make_request(
6901                "DescribeRule",
6902                json!({"Name": "r-cb", "EventBusName": "cb"}),
6903            ))
6904            .unwrap();
6905        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6906        assert_eq!(body["Name"], "r-cb");
6907    }
6908
6909    // ── enable/disable rule on custom bus ──
6910
6911    #[test]
6912    fn disable_rule_on_custom_bus() {
6913        let svc = make_service();
6914        svc.create_event_bus(&make_request("CreateEventBus", json!({"Name": "dcb"})))
6915            .unwrap();
6916        svc.put_rule(&make_request(
6917            "PutRule",
6918            json!({
6919                "Name": "r-d",
6920                "EventPattern": "{\"source\":[\"s\"]}",
6921                "EventBusName": "dcb"
6922            }),
6923        ))
6924        .unwrap();
6925        svc.disable_rule(&make_request(
6926            "DisableRule",
6927            json!({"Name": "r-d", "EventBusName": "dcb"}),
6928        ))
6929        .unwrap();
6930    }
6931
6932    // ── describe_event_bus with custom bus ──
6933
6934    #[test]
6935    fn describe_event_bus_custom() {
6936        let svc = make_service();
6937        svc.create_event_bus(&make_request("CreateEventBus", json!({"Name": "deb"})))
6938            .unwrap();
6939        let resp = svc
6940            .describe_event_bus(&make_request("DescribeEventBus", json!({"Name": "deb"})))
6941            .unwrap();
6942        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6943        assert_eq!(body["Name"], "deb");
6944    }
6945
6946    #[test]
6947    fn list_event_buses_with_name_prefix() {
6948        let svc = make_service();
6949        for name in &["dev-x", "dev-y", "prod-z"] {
6950            svc.create_event_bus(&make_request("CreateEventBus", json!({"Name": name})))
6951                .unwrap();
6952        }
6953        let resp = svc
6954            .list_event_buses(&make_request(
6955                "ListEventBuses",
6956                json!({"NamePrefix": "dev-"}),
6957            ))
6958            .unwrap();
6959        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6960        assert_eq!(body["EventBuses"].as_array().unwrap().len(), 2);
6961    }
6962
6963    #[test]
6964    fn list_rules_on_custom_bus() {
6965        let svc = make_service();
6966        svc.create_event_bus(&make_request("CreateEventBus", json!({"Name": "lrcb"})))
6967            .unwrap();
6968        svc.put_rule(&make_request(
6969            "PutRule",
6970            json!({
6971                "Name": "r1",
6972                "EventPattern": "{\"source\":[\"s\"]}",
6973                "EventBusName": "lrcb"
6974            }),
6975        ))
6976        .unwrap();
6977
6978        let resp = svc
6979            .list_rules(&make_request("ListRules", json!({"EventBusName": "lrcb"})))
6980            .unwrap();
6981        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6982        assert_eq!(body["Rules"].as_array().unwrap().len(), 1);
6983    }
6984
6985    // ── put_targets on custom bus ──
6986
6987    #[test]
6988    fn put_targets_on_custom_bus() {
6989        let svc = make_service();
6990        svc.create_event_bus(&make_request("CreateEventBus", json!({"Name": "ptcb"})))
6991            .unwrap();
6992        svc.put_rule(&make_request(
6993            "PutRule",
6994            json!({
6995                "Name": "rt",
6996                "EventPattern": "{\"source\":[\"s\"]}",
6997                "EventBusName": "ptcb"
6998            }),
6999        ))
7000        .unwrap();
7001
7002        svc.put_targets(&make_request(
7003            "PutTargets",
7004            json!({
7005                "Rule": "rt",
7006                "EventBusName": "ptcb",
7007                "Targets": [{"Id": "t1", "Arn": "arn:aws:sqs:us-east-1:123456789012:q1"}]
7008            }),
7009        ))
7010        .unwrap();
7011    }
7012
7013    // ── remove_targets unknown target ids ──
7014
7015    #[test]
7016    fn remove_targets_unknown_ids_returns_failed() {
7017        let svc = make_service();
7018        svc.put_rule(&make_request(
7019            "PutRule",
7020            json!({"Name": "rmt", "EventPattern": "{\"source\":[\"s\"]}"}),
7021        ))
7022        .unwrap();
7023
7024        let resp = svc
7025            .remove_targets(&make_request(
7026                "RemoveTargets",
7027                json!({"Rule": "rmt", "Ids": ["ghost1", "ghost2"]}),
7028            ))
7029            .unwrap();
7030        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
7031        // Unknown ids are silently ok in many implementations; at least we hit the code path
7032        assert!(body.is_object());
7033    }
7034
7035    #[test]
7036    fn describe_event_source_unknown_errors() {
7037        let svc = make_service();
7038        let req = make_request("DescribeEventSource", json!({"Name": "ghost"}));
7039        assert!(svc.describe_event_source(&req).is_err());
7040    }
7041
7042    #[test]
7043    fn describe_partner_event_source_unknown_errors() {
7044        let svc = make_service();
7045        let req = make_request("DescribePartnerEventSource", json!({"Name": "ghost"}));
7046        assert!(svc.describe_partner_event_source(&req).is_err());
7047    }
7048
7049    #[test]
7050    fn list_partner_event_sources_empty_ok() {
7051        let svc = make_service();
7052        let req = make_request(
7053            "ListPartnerEventSources",
7054            json!({"NamePrefix": "aws.partner"}),
7055        );
7056        let resp = svc.list_partner_event_sources(&req).unwrap();
7057        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
7058        assert!(body["PartnerEventSources"].is_array());
7059    }
7060
7061    #[test]
7062    fn list_event_sources_empty_ok() {
7063        let svc = make_service();
7064        let req = make_request("ListEventSources", json!({}));
7065        let resp = svc.list_event_sources(&req).unwrap();
7066        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
7067        assert!(body["EventSources"].is_array());
7068    }
7069
7070    #[test]
7071    fn update_connection_unknown_errors() {
7072        let svc = make_service();
7073        let req = make_request(
7074            "UpdateConnection",
7075            json!({"Name": "ghost", "AuthorizationType": "API_KEY"}),
7076        );
7077        assert!(svc.update_connection(&req).is_err());
7078    }
7079
7080    #[test]
7081    fn describe_api_destination_unknown_errors() {
7082        let svc = make_service();
7083        let req = make_request("DescribeApiDestination", json!({"Name": "ghost"}));
7084        assert!(svc.describe_api_destination(&req).is_err());
7085    }
7086
7087    #[test]
7088    fn update_api_destination_unknown_errors() {
7089        let svc = make_service();
7090        let req = make_request("UpdateApiDestination", json!({"Name": "ghost"}));
7091        assert!(svc.update_api_destination(&req).is_err());
7092    }
7093
7094    #[test]
7095    fn update_archive_unknown_errors() {
7096        let svc = make_service();
7097        let req = make_request("UpdateArchive", json!({"ArchiveName": "ghost"}));
7098        assert!(svc.update_archive(&req).is_err());
7099    }
7100
7101    #[test]
7102    fn describe_archive_unknown_errors_b() {
7103        let svc = make_service();
7104        let req = make_request("DescribeArchive", json!({"ArchiveName": "ghost"}));
7105        assert!(svc.describe_archive(&req).is_err());
7106    }
7107
7108    #[test]
7109    fn list_archives_empty_ok() {
7110        let svc = make_service();
7111        let req = make_request("ListArchives", json!({}));
7112        let resp = svc.list_archives(&req).unwrap();
7113        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
7114        assert!(body["Archives"].is_array());
7115    }
7116
7117    #[test]
7118    fn list_replays_empty_ok() {
7119        let svc = make_service();
7120        let req = make_request("ListReplays", json!({}));
7121        let resp = svc.list_replays(&req).unwrap();
7122        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
7123        assert!(body["Replays"].is_array());
7124    }
7125
7126    #[test]
7127    fn describe_endpoint_unknown_errors() {
7128        let svc = make_service();
7129        let req = make_request("DescribeEndpoint", json!({"Name": "ghost"}));
7130        assert!(svc.describe_endpoint(&req).is_err());
7131    }
7132
7133    #[test]
7134    fn delete_endpoint_unknown_errors() {
7135        let svc = make_service();
7136        let req = make_request("DeleteEndpoint", json!({"Name": "ghost"}));
7137        assert!(svc.delete_endpoint(&req).is_err());
7138    }
7139
7140    #[test]
7141    fn list_endpoints_empty_ok() {
7142        let svc = make_service();
7143        let req = make_request("ListEndpoints", json!({}));
7144        let resp = svc.list_endpoints(&req).unwrap();
7145        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
7146        assert!(body["Endpoints"].is_array());
7147    }
7148}