Skip to main content

fakecloud_eventbridge/
service.rs

1use async_trait::async_trait;
2use chrono::{DateTime, Utc};
3use http::StatusCode;
4use serde_json::{json, Value};
5
6use std::collections::HashMap;
7use std::sync::Arc;
8
9use tokio::sync::Mutex as AsyncMutex;
10
11use fakecloud_aws::arn::Arn;
12use fakecloud_core::delivery::DeliveryBus;
13use fakecloud_core::pagination::paginate;
14use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
15use fakecloud_core::validation::*;
16use fakecloud_persistence::SnapshotStore;
17
18use fakecloud_lambda::runtime::ContainerRuntime;
19use fakecloud_lambda::state::{LambdaInvocation, SharedLambdaState};
20use fakecloud_logs::state::SharedLogsState;
21
22use crate::state::{
23    ApiDestination, Archive, Connection, Endpoint, EventBridgeSnapshot, EventBridgeState, EventBus,
24    EventRule, EventTarget, PartnerEventSource, PutEvent, Replay, SharedEventBridgeState,
25    EVENTBRIDGE_SNAPSHOT_SCHEMA_VERSION,
26};
27
28/// Validate a single `PutEvents` entry's required fields (`Source`,
29/// `DetailType`, `Detail`) and that `Detail` is a well-formed JSON
30/// object. Returns the JSON error body AWS surfaces in the matching
31/// `Entries[]` slot on failure.
32fn validate_put_events_entry(source: &str, detail_type: &str, detail: &str) -> Result<(), Value> {
33    if source.is_empty() {
34        return Err(json!({
35            "ErrorCode": "InvalidArgument",
36            "ErrorMessage": "Parameter Source is not valid. Reason: Source is a required argument.",
37        }));
38    }
39    if detail_type.is_empty() {
40        return Err(json!({
41            "ErrorCode": "InvalidArgument",
42            "ErrorMessage": "Parameter DetailType is not valid. Reason: DetailType is a required argument.",
43        }));
44    }
45    if detail.is_empty() {
46        return Err(json!({
47            "ErrorCode": "InvalidArgument",
48            "ErrorMessage": "Parameter Detail is not valid. Reason: Detail is a required argument.",
49        }));
50    }
51    if serde_json::from_str::<Value>(detail).is_err() {
52        return Err(json!({
53            "ErrorCode": "MalformedDetail",
54            "ErrorMessage": "Detail is malformed.",
55        }));
56    }
57    Ok(())
58}
59
60/// Parse an entry's `Time` field, tolerating the three formats AWS
61/// accepts (RFC 3339 string, fractional seconds as a float, integer
62/// seconds). Falls back to "now" if the field is absent or
63/// unparseable, which matches the real service.
64fn parse_put_events_time(raw: &Value) -> DateTime<Utc> {
65    if let Some(s) = raw.as_str() {
66        return DateTime::parse_from_rfc3339(s)
67            .map(|dt| dt.with_timezone(&Utc))
68            .unwrap_or_else(|_| Utc::now());
69    }
70    if let Some(ts) = raw.as_f64() {
71        return DateTime::from_timestamp(ts as i64, ((ts.fract()) * 1_000_000_000.0) as u32)
72            .unwrap_or_else(Utc::now);
73    }
74    if let Some(ts) = raw.as_i64() {
75        return DateTime::from_timestamp(ts, 0).unwrap_or_else(Utc::now);
76    }
77    Utc::now()
78}
79
80/// Actions that mutate EventBridge state.
81fn is_mutating_action(action: &str) -> bool {
82    matches!(
83        action,
84        "CreateEventBus"
85            | "DeleteEventBus"
86            | "UpdateEventBus"
87            | "PutRule"
88            | "DeleteRule"
89            | "EnableRule"
90            | "DisableRule"
91            | "PutTargets"
92            | "RemoveTargets"
93            | "PutEvents"
94            | "PutPermission"
95            | "RemovePermission"
96            | "TagResource"
97            | "UntagResource"
98            | "CreateArchive"
99            | "UpdateArchive"
100            | "DeleteArchive"
101            | "CreateConnection"
102            | "UpdateConnection"
103            | "DeleteConnection"
104            | "DeauthorizeConnection"
105            | "CreateApiDestination"
106            | "UpdateApiDestination"
107            | "DeleteApiDestination"
108            | "StartReplay"
109            | "CancelReplay"
110            | "CreatePartnerEventSource"
111            | "DeletePartnerEventSource"
112            | "ActivateEventSource"
113            | "DeactivateEventSource"
114            | "PutPartnerEvents"
115            | "CreateEndpoint"
116            | "DeleteEndpoint"
117            | "UpdateEndpoint"
118    )
119}
120
121pub struct EventBridgeService {
122    state: SharedEventBridgeState,
123    delivery: Arc<DeliveryBus>,
124    lambda_state: Option<SharedLambdaState>,
125    logs_state: Option<SharedLogsState>,
126    container_runtime: Option<Arc<ContainerRuntime>>,
127    snapshot_store: Option<Arc<dyn SnapshotStore>>,
128    snapshot_lock: Arc<AsyncMutex<()>>,
129}
130
131impl EventBridgeService {
132    pub fn new(state: SharedEventBridgeState, delivery: Arc<DeliveryBus>) -> Self {
133        Self {
134            state,
135            delivery,
136            lambda_state: None,
137            logs_state: None,
138            container_runtime: None,
139            snapshot_store: None,
140            snapshot_lock: Arc::new(AsyncMutex::new(())),
141        }
142    }
143
144    pub fn with_lambda(mut self, lambda_state: SharedLambdaState) -> Self {
145        self.lambda_state = Some(lambda_state);
146        self
147    }
148
149    pub fn with_logs(mut self, logs_state: SharedLogsState) -> Self {
150        self.logs_state = Some(logs_state);
151        self
152    }
153
154    pub fn with_runtime(mut self, runtime: Arc<ContainerRuntime>) -> Self {
155        self.container_runtime = Some(runtime);
156        self
157    }
158
159    pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
160        self.snapshot_store = Some(store);
161        self
162    }
163
164    /// Persist current state as a snapshot. Held across the
165    /// clone-serialize-write sequence to prevent stale-last writes,
166    /// with serde + file I/O offloaded to the blocking pool.
167    async fn save_snapshot(&self) {
168        let Some(store) = self.snapshot_store.clone() else {
169            return;
170        };
171        let _guard = self.snapshot_lock.lock().await;
172        let snapshot = EventBridgeSnapshot {
173            schema_version: EVENTBRIDGE_SNAPSHOT_SCHEMA_VERSION,
174            accounts: Some(self.state.read().clone()),
175            state: None,
176        };
177        let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
178            let bytes = serde_json::to_vec(&snapshot)
179                .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
180            store.save(&bytes)
181        })
182        .await;
183        match join {
184            Ok(Ok(())) => {}
185            Ok(Err(err)) => tracing::error!(%err, "failed to write eventbridge snapshot"),
186            Err(err) => tracing::error!(%err, "eventbridge snapshot task panicked"),
187        }
188    }
189}
190
191#[async_trait]
192impl AwsService for EventBridgeService {
193    fn service_name(&self) -> &str {
194        "events"
195    }
196
197    async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
198        let mutates = is_mutating_action(req.action.as_str());
199        let result = match req.action.as_str() {
200            "CreateEventBus" => self.create_event_bus(&req),
201            "DeleteEventBus" => self.delete_event_bus(&req),
202            "ListEventBuses" => self.list_event_buses(&req),
203            "DescribeEventBus" => self.describe_event_bus(&req),
204            "PutRule" => self.put_rule(&req),
205            "DeleteRule" => self.delete_rule(&req),
206            "ListRules" => self.list_rules(&req),
207            "DescribeRule" => self.describe_rule(&req),
208            "EnableRule" => self.enable_rule(&req),
209            "DisableRule" => self.disable_rule(&req),
210            "PutTargets" => self.put_targets(&req),
211            "RemoveTargets" => self.remove_targets(&req),
212            "ListTargetsByRule" => self.list_targets_by_rule(&req),
213            "ListRuleNamesByTarget" => self.list_rule_names_by_target(&req),
214            "PutEvents" => self.put_events(&req),
215            "PutPermission" => self.put_permission(&req),
216            "RemovePermission" => self.remove_permission(&req),
217            "TagResource" => self.tag_resource(&req),
218            "UntagResource" => self.untag_resource(&req),
219            "ListTagsForResource" => self.list_tags_for_resource(&req),
220            "CreateArchive" => self.create_archive(&req),
221            "DescribeArchive" => self.describe_archive(&req),
222            "ListArchives" => self.list_archives(&req),
223            "UpdateArchive" => self.update_archive(&req),
224            "DeleteArchive" => self.delete_archive(&req),
225            "CreateConnection" => self.create_connection(&req),
226            "DescribeConnection" => self.describe_connection(&req),
227            "ListConnections" => self.list_connections(&req),
228            "UpdateConnection" => self.update_connection(&req),
229            "DeleteConnection" => self.delete_connection(&req),
230            "CreateApiDestination" => self.create_api_destination(&req),
231            "DescribeApiDestination" => self.describe_api_destination(&req),
232            "ListApiDestinations" => self.list_api_destinations(&req),
233            "UpdateApiDestination" => self.update_api_destination(&req),
234            "DeleteApiDestination" => self.delete_api_destination(&req),
235            "StartReplay" => self.start_replay(&req),
236            "DescribeReplay" => self.describe_replay(&req),
237            "ListReplays" => self.list_replays(&req),
238            "CancelReplay" => self.cancel_replay(&req),
239            "CreatePartnerEventSource" => self.create_partner_event_source(&req),
240            "DeletePartnerEventSource" => self.delete_partner_event_source(&req),
241            "DescribePartnerEventSource" => self.describe_partner_event_source(&req),
242            "ListPartnerEventSources" => self.list_partner_event_sources(&req),
243            "ListPartnerEventSourceAccounts" => self.list_partner_event_source_accounts(&req),
244            "ActivateEventSource" => self.activate_event_source(&req),
245            "DeactivateEventSource" => self.deactivate_event_source(&req),
246            "DescribeEventSource" => self.describe_event_source(&req),
247            "ListEventSources" => self.list_event_sources(&req),
248            "PutPartnerEvents" => self.put_partner_events(&req),
249            "TestEventPattern" => self.test_event_pattern(&req),
250            "UpdateEventBus" => self.update_event_bus(&req),
251            "CreateEndpoint" => self.create_endpoint(&req),
252            "DeleteEndpoint" => self.delete_endpoint(&req),
253            "DescribeEndpoint" => self.describe_endpoint(&req),
254            "ListEndpoints" => self.list_endpoints(&req),
255            "UpdateEndpoint" => self.update_endpoint(&req),
256            "DeauthorizeConnection" => self.deauthorize_connection(&req),
257            _ => Err(AwsServiceError::action_not_implemented(
258                "events",
259                &req.action,
260            )),
261        };
262        if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
263            self.save_snapshot().await;
264        }
265        result
266    }
267
268    fn supported_actions(&self) -> &[&str] {
269        &[
270            "CreateEventBus",
271            "DeleteEventBus",
272            "ListEventBuses",
273            "DescribeEventBus",
274            "PutRule",
275            "DeleteRule",
276            "ListRules",
277            "DescribeRule",
278            "EnableRule",
279            "DisableRule",
280            "PutTargets",
281            "RemoveTargets",
282            "ListTargetsByRule",
283            "ListRuleNamesByTarget",
284            "PutEvents",
285            "PutPermission",
286            "RemovePermission",
287            "TagResource",
288            "UntagResource",
289            "ListTagsForResource",
290            "CreateArchive",
291            "DescribeArchive",
292            "ListArchives",
293            "UpdateArchive",
294            "DeleteArchive",
295            "CreateConnection",
296            "DescribeConnection",
297            "ListConnections",
298            "UpdateConnection",
299            "DeleteConnection",
300            "CreateApiDestination",
301            "DescribeApiDestination",
302            "ListApiDestinations",
303            "UpdateApiDestination",
304            "DeleteApiDestination",
305            "StartReplay",
306            "DescribeReplay",
307            "ListReplays",
308            "CancelReplay",
309            "CreatePartnerEventSource",
310            "DeletePartnerEventSource",
311            "DescribePartnerEventSource",
312            "ListPartnerEventSources",
313            "ListPartnerEventSourceAccounts",
314            "ActivateEventSource",
315            "DeactivateEventSource",
316            "DescribeEventSource",
317            "ListEventSources",
318            "PutPartnerEvents",
319            "TestEventPattern",
320            "UpdateEventBus",
321            "CreateEndpoint",
322            "DeleteEndpoint",
323            "DescribeEndpoint",
324            "ListEndpoints",
325            "UpdateEndpoint",
326            "DeauthorizeConnection",
327        ]
328    }
329}
330
331fn parse_tags(body: &Value) -> HashMap<String, String> {
332    let mut tags = HashMap::new();
333    if let Some(arr) = body["Tags"].as_array() {
334        for tag in arr {
335            if let (Some(key), Some(val)) = (tag["Key"].as_str(), tag["Value"].as_str()) {
336                tags.insert(key.to_string(), val.to_string());
337            }
338        }
339    }
340    tags
341}
342
343fn parse_target(target: &Value) -> EventTarget {
344    EventTarget {
345        id: target["Id"].as_str().unwrap_or("").to_string(),
346        arn: target["Arn"].as_str().unwrap_or("").to_string(),
347        input: target["Input"].as_str().map(|s| s.to_string()),
348        input_path: target["InputPath"].as_str().map(|s| s.to_string()),
349        input_transformer: target.get("InputTransformer").cloned(),
350        sqs_parameters: target.get("SqsParameters").cloned(),
351    }
352}
353
354fn target_to_json(t: &EventTarget) -> Value {
355    let mut obj = json!({ "Id": t.id, "Arn": t.arn });
356    if let Some(ref input) = t.input {
357        obj["Input"] = json!(input);
358    }
359    if let Some(ref input_path) = t.input_path {
360        obj["InputPath"] = json!(input_path);
361    }
362    if let Some(ref it) = t.input_transformer {
363        obj["InputTransformer"] = it.clone();
364    }
365    if let Some(ref sp) = t.sqs_parameters {
366        obj["SqsParameters"] = sp.clone();
367    }
368    obj
369}
370
371// ─── Event Bus Operations ───────────────────────────────────────────
372impl EventBridgeService {
373    fn create_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
374        let body = req.json_body();
375        validate_required("Name", &body["Name"])?;
376        let name = body["Name"]
377            .as_str()
378            .ok_or_else(|| missing("Name"))?
379            .to_string();
380        validate_string_length("name", &name, 1, 256)?;
381        validate_optional_string_length(
382            "eventSourceName",
383            body["EventSourceName"].as_str(),
384            1,
385            256,
386        )?;
387        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
388        validate_optional_string_length(
389            "kmsKeyIdentifier",
390            body["KmsKeyIdentifier"].as_str(),
391            0,
392            2048,
393        )?;
394
395        // Validate name doesn't contain '/' (unless partner bus)
396        if name.contains('/') && !name.starts_with("aws.partner/") {
397            return Err(AwsServiceError::aws_error(
398                StatusCode::BAD_REQUEST,
399                "ValidationException",
400                "Event bus name must not contain '/'.",
401            ));
402        }
403
404        // Partner event bus validation
405        if name.starts_with("aws.partner/") {
406            let event_source = body["EventSourceName"].as_str().unwrap_or("");
407            let accounts_r = self.state.read();
408            let empty_r = EventBridgeState::new(&req.account_id, &req.region);
409            let state_r = accounts_r.get(&req.account_id).unwrap_or(&empty_r);
410            let has_source = state_r.partner_event_sources.contains_key(event_source);
411            drop(accounts_r);
412            if !has_source {
413                return Err(AwsServiceError::aws_error(
414                    StatusCode::BAD_REQUEST,
415                    "ResourceNotFoundException",
416                    format!("Event source {event_source} does not exist."),
417                ));
418            }
419        }
420
421        let mut accounts = self.state.write();
422        let state = accounts.get_or_create(&req.account_id);
423
424        if state.buses.contains_key(&name) {
425            return Err(AwsServiceError::aws_error(
426                StatusCode::BAD_REQUEST,
427                "ResourceAlreadyExistsException",
428                format!("Event bus {name} already exists."),
429            ));
430        }
431
432        let arn = format!(
433            "arn:aws:events:{}:{}:event-bus/{}",
434            req.region, state.account_id, name
435        );
436        let now = Utc::now();
437        let description = body["Description"].as_str().map(|s| s.to_string());
438        let kms_key_identifier = body["KmsKeyIdentifier"].as_str().map(|s| s.to_string());
439        let dead_letter_config = body.get("DeadLetterConfig").cloned();
440
441        let tags = parse_tags(&body);
442
443        let bus = EventBus {
444            name: name.clone(),
445            arn: arn.clone(),
446            tags,
447            policy: None,
448            description,
449            kms_key_identifier,
450            dead_letter_config,
451            creation_time: now,
452            last_modified_time: now,
453        };
454        state.buses.insert(name, bus);
455
456        Ok(AwsResponse::ok_json(json!({ "EventBusArn": arn })))
457    }
458
459    fn delete_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
460        let body = req.json_body();
461        validate_required("Name", &body["Name"])?;
462        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
463        validate_string_length("name", name, 1, 256)?;
464
465        if name == "default" {
466            return Err(AwsServiceError::aws_error(
467                StatusCode::BAD_REQUEST,
468                "ValidationException",
469                format!("Cannot delete event bus {name}."),
470            ));
471        }
472
473        let mut accounts = self.state.write();
474        let state = accounts.get_or_create(&req.account_id);
475        state.buses.remove(name);
476        state.rules.retain(|k, _| k.0 != name);
477
478        Ok(AwsResponse::ok_json(json!({})))
479    }
480
481    fn list_event_buses(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
482        let body = req.json_body();
483        validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 256)?;
484        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
485        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
486        let name_prefix = body["NamePrefix"].as_str();
487        let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
488        if let Some(t) = body["NextToken"].as_str() {
489            t.parse::<usize>().map_err(|_| {
490                AwsServiceError::aws_error(
491                    StatusCode::BAD_REQUEST,
492                    "InvalidNextTokenException",
493                    format!("Invalid NextToken value: '{t}'"),
494                )
495            })?;
496        }
497
498        let accounts = self.state.read();
499        let empty = EventBridgeState::new(&req.account_id, &req.region);
500        let state = accounts.get(&req.account_id).unwrap_or(&empty);
501        let filtered: Vec<&_> = state
502            .buses
503            .values()
504            .filter(|b| match name_prefix {
505                Some(prefix) => b.name.starts_with(prefix),
506                None => true,
507            })
508            .collect();
509
510        let (page, next_token) = paginate(&filtered, body["NextToken"].as_str(), limit);
511        let buses: Vec<Value> = page
512            .iter()
513            .map(|b| json!({ "Name": b.name, "Arn": b.arn }))
514            .collect();
515        let mut resp = json!({ "EventBuses": buses });
516        if let Some(token) = next_token {
517            resp["NextToken"] = json!(token);
518        }
519
520        Ok(AwsResponse::ok_json(resp))
521    }
522
523    fn describe_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
524        let body = req.json_body();
525        validate_optional_string_length("name", body["Name"].as_str(), 1, 1600)?;
526        let name = body["Name"].as_str().unwrap_or("default");
527
528        let accounts = self.state.read();
529        let empty = EventBridgeState::new(&req.account_id, &req.region);
530        let state = accounts.get(&req.account_id).unwrap_or(&empty);
531        let bus = state.buses.get(name).ok_or_else(|| {
532            AwsServiceError::aws_error(
533                StatusCode::BAD_REQUEST,
534                "ResourceNotFoundException",
535                format!("Event bus {name} does not exist."),
536            )
537        })?;
538
539        let mut resp = json!({
540            "Name": bus.name,
541            "Arn": bus.arn,
542            "CreationTime": bus.creation_time.timestamp() as f64,
543            "LastModifiedTime": bus.last_modified_time.timestamp() as f64,
544        });
545
546        if let Some(ref policy) = bus.policy {
547            resp["Policy"] = Value::String(serde_json::to_string(policy).unwrap());
548        }
549        if let Some(ref desc) = bus.description {
550            resp["Description"] = json!(desc);
551        }
552        if let Some(ref kms) = bus.kms_key_identifier {
553            resp["KmsKeyIdentifier"] = json!(kms);
554        }
555        if let Some(ref dlc) = bus.dead_letter_config {
556            resp["DeadLetterConfig"] = dlc.clone();
557        }
558
559        Ok(AwsResponse::ok_json(resp))
560    }
561
562    // ─── Permission Operations ──────────────────────────────────────────
563
564    fn put_permission(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
565        let body = req.json_body();
566        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 256)?;
567        validate_optional_string_length("action", body["Action"].as_str(), 1, 64)?;
568        validate_optional_string_length("principal", body["Principal"].as_str(), 1, 12)?;
569        validate_optional_string_length("statementId", body["StatementId"].as_str(), 1, 64)?;
570        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
571
572        let mut accounts = self.state.write();
573        let state = accounts.get_or_create(&req.account_id);
574
575        let bus = state.buses.get_mut(event_bus_name).ok_or_else(|| {
576            AwsServiceError::aws_error(
577                StatusCode::BAD_REQUEST,
578                "ResourceNotFoundException",
579                format!("Event bus {event_bus_name} does not exist."),
580            )
581        })?;
582
583        // Check if Policy is provided (new-style)
584        if let Some(policy_str) = body["Policy"].as_str() {
585            if let Ok(policy) = serde_json::from_str::<Value>(policy_str) {
586                bus.policy = Some(policy);
587                return Ok(AwsResponse::ok_json(json!({})));
588            }
589        }
590
591        // Old-style: Action, Principal, StatementId
592        let action = body["Action"].as_str().unwrap_or("");
593        let principal = body["Principal"].as_str().unwrap_or("");
594        let statement_id = body["StatementId"].as_str().unwrap_or("");
595
596        // Validate action
597        if action != "events:PutEvents" {
598            return Err(AwsServiceError::aws_error(
599                StatusCode::BAD_REQUEST,
600                "ValidationException",
601                "Provided value in parameter 'action' is not supported.",
602            ));
603        }
604
605        let statement = json!({
606            "Sid": statement_id,
607            "Effect": "Allow",
608            "Principal": { "AWS": Arn::global("iam", principal, "root").to_string() },
609            "Action": action,
610            "Resource": bus.arn,
611        });
612
613        let policy = bus.policy.get_or_insert_with(|| {
614            json!({
615                "Version": "2012-10-17",
616                "Statement": [],
617            })
618        });
619
620        if let Some(stmts) = policy["Statement"].as_array_mut() {
621            stmts.push(statement);
622        }
623
624        Ok(AwsResponse::ok_json(json!({})))
625    }
626
627    fn remove_permission(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
628        let body = req.json_body();
629        validate_optional_string_length("statementId", body["StatementId"].as_str(), 1, 64)?;
630        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 256)?;
631        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
632        let statement_id = body["StatementId"].as_str().unwrap_or("");
633        let remove_all = body["RemoveAllPermissions"].as_bool().unwrap_or(false);
634
635        let mut accounts = self.state.write();
636        let state = accounts.get_or_create(&req.account_id);
637
638        let bus = state.buses.get_mut(event_bus_name).ok_or_else(|| {
639            AwsServiceError::aws_error(
640                StatusCode::BAD_REQUEST,
641                "ResourceNotFoundException",
642                format!("Event bus {event_bus_name} does not exist."),
643            )
644        })?;
645
646        if remove_all {
647            bus.policy = None;
648            return Ok(AwsResponse::ok_json(json!({})));
649        }
650
651        let policy = bus.policy.as_mut().ok_or_else(|| {
652            AwsServiceError::aws_error(
653                StatusCode::BAD_REQUEST,
654                "ResourceNotFoundException",
655                "EventBus does not have a policy.",
656            )
657        })?;
658
659        if let Some(stmts) = policy["Statement"].as_array_mut() {
660            let before = stmts.len();
661            stmts.retain(|s| s["Sid"].as_str() != Some(statement_id));
662            if stmts.len() == before {
663                return Err(AwsServiceError::aws_error(
664                    StatusCode::BAD_REQUEST,
665                    "ResourceNotFoundException",
666                    "Statement with the provided id does not exist.",
667                ));
668            }
669            if stmts.is_empty() {
670                bus.policy = None;
671            }
672        }
673
674        Ok(AwsResponse::ok_json(json!({})))
675    }
676
677    // ─── Rule Operations ────────────────────────────────────────────────
678
679    fn put_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
680        let body = req.json_body();
681        validate_required("Name", &body["Name"])?;
682        let name = body["Name"]
683            .as_str()
684            .ok_or_else(|| missing("Name"))?
685            .to_string();
686        validate_string_length("name", &name, 1, 64)?;
687        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
688        validate_optional_string_length(
689            "scheduleExpression",
690            body["ScheduleExpression"].as_str(),
691            0,
692            256,
693        )?;
694        validate_optional_string_length("eventPattern", body["EventPattern"].as_str(), 0, 4096)?;
695        validate_optional_enum(
696            "state",
697            body["State"].as_str(),
698            &[
699                "ENABLED",
700                "DISABLED",
701                "ENABLED_WITH_ALL_CLOUDTRAIL_MANAGEMENT_EVENTS",
702            ],
703        )?;
704        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
705        validate_optional_string_length("roleArn", body["RoleArn"].as_str(), 1, 1600)?;
706
707        let raw_bus = body["EventBusName"]
708            .as_str()
709            .unwrap_or("default")
710            .to_string();
711
712        let mut accounts = self.state.write();
713        let state = accounts.get_or_create(&req.account_id);
714        let event_bus_name = state.resolve_bus_name(&raw_bus);
715
716        let event_pattern = body["EventPattern"].as_str().and_then(|s| {
717            if s.is_empty() {
718                None
719            } else {
720                Some(s.to_string())
721            }
722        });
723        let schedule_expression = body["ScheduleExpression"].as_str().and_then(|s| {
724            if s.is_empty() {
725                None
726            } else {
727                Some(s.to_string())
728            }
729        });
730        let description = body["Description"].as_str().map(|s| s.to_string());
731        let role_arn = body["RoleArn"].as_str().map(|s| s.to_string());
732        let rule_state = body["State"].as_str().unwrap_or("ENABLED").to_string();
733
734        // Validate: schedule expressions only on default bus
735        if schedule_expression.is_some() && event_bus_name != "default" {
736            return Err(AwsServiceError::aws_error(
737                StatusCode::BAD_REQUEST,
738                "ValidationException",
739                "ScheduleExpression is supported only on the default event bus.",
740            ));
741        }
742
743        if !state.buses.contains_key(&event_bus_name) {
744            return Err(AwsServiceError::aws_error(
745                StatusCode::BAD_REQUEST,
746                "ResourceNotFoundException",
747                format!("Event bus {event_bus_name} does not exist."),
748            ));
749        }
750
751        let arn = if event_bus_name == "default" {
752            format!(
753                "arn:aws:events:{}:{}:rule/{}",
754                req.region, state.account_id, name
755            )
756        } else {
757            format!(
758                "arn:aws:events:{}:{}:rule/{}/{}",
759                req.region, state.account_id, event_bus_name, name
760            )
761        };
762
763        let key = (event_bus_name.clone(), name.clone());
764        let targets = state
765            .rules
766            .get(&key)
767            .map(|r| r.targets.clone())
768            .unwrap_or_default();
769
770        let tags = parse_tags(&body);
771
772        let rule = EventRule {
773            name: name.clone(),
774            arn: arn.clone(),
775            event_bus_name,
776            event_pattern,
777            schedule_expression,
778            state: rule_state,
779            description,
780            role_arn,
781            managed_by: None,
782            created_by: None,
783            targets,
784            tags,
785            last_fired: None,
786        };
787
788        state.rules.insert(key, rule);
789        Ok(AwsResponse::ok_json(json!({ "RuleArn": arn })))
790    }
791
792    fn delete_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
793        let body = req.json_body();
794        validate_required("Name", &body["Name"])?;
795        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
796        validate_string_length("name", name, 1, 64)?;
797        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
798        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
799
800        let mut accounts = self.state.write();
801        let state = accounts.get_or_create(&req.account_id);
802        let bus_name = state.resolve_bus_name(event_bus_name);
803        let key = (bus_name, name.to_string());
804
805        // Check if rule has targets
806        if let Some(rule) = state.rules.get(&key) {
807            if !rule.targets.is_empty() {
808                return Err(AwsServiceError::aws_error(
809                    StatusCode::BAD_REQUEST,
810                    "ValidationException",
811                    "Rule can't be deleted since it has targets.",
812                ));
813            }
814        }
815
816        state.rules.remove(&key);
817        Ok(AwsResponse::ok_json(json!({})))
818    }
819
820    fn list_rules(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
821        let body = req.json_body();
822        validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 64)?;
823        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
824        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
825        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
826        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
827        let name_prefix = body["NamePrefix"].as_str();
828        let limit = body["Limit"].as_u64().map(|n| n as usize);
829        let next_token = body["NextToken"].as_str();
830
831        let accounts = self.state.read();
832        let empty = EventBridgeState::new(&req.account_id, &req.region);
833        let state = accounts.get(&req.account_id).unwrap_or(&empty);
834        let bus_name = state.resolve_bus_name(event_bus_name);
835
836        let mut rules: Vec<&EventRule> = state
837            .rules
838            .values()
839            .filter(|r| r.event_bus_name == bus_name)
840            .filter(|r| match name_prefix {
841                Some(prefix) => r.name.starts_with(prefix),
842                None => true,
843            })
844            .collect();
845        rules.sort_by(|a, b| a.name.cmp(&b.name));
846
847        // Pagination
848        let start = next_token
849            .and_then(|t| t.parse::<usize>().ok())
850            .unwrap_or(0)
851            .min(rules.len());
852        let rules_slice = &rules[start..];
853
854        let (page, new_next_token) = if let Some(lim) = limit {
855            if rules_slice.len() > lim {
856                (&rules_slice[..lim], Some((start + lim).to_string()))
857            } else {
858                (rules_slice, None)
859            }
860        } else {
861            (rules_slice, None)
862        };
863
864        let rules_json: Vec<Value> = page
865            .iter()
866            .map(|r| {
867                let mut obj = json!({
868                    "Name": r.name,
869                    "Arn": r.arn,
870                    "EventBusName": r.event_bus_name,
871                    "State": r.state,
872                });
873                if let Some(ref desc) = r.description {
874                    obj["Description"] = json!(desc);
875                }
876                if let Some(ref ep) = r.event_pattern {
877                    obj["EventPattern"] = json!(ep);
878                }
879                if let Some(ref se) = r.schedule_expression {
880                    obj["ScheduleExpression"] = json!(se);
881                }
882                if let Some(ref mb) = r.managed_by {
883                    obj["ManagedBy"] = json!(mb);
884                }
885                obj
886            })
887            .collect();
888
889        let mut resp = json!({ "Rules": rules_json });
890        if let Some(token) = new_next_token {
891            resp["NextToken"] = json!(token);
892        }
893
894        Ok(AwsResponse::ok_json(resp))
895    }
896
897    fn describe_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
898        let body = req.json_body();
899        validate_required("Name", &body["Name"])?;
900        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
901        validate_string_length("name", name, 1, 64)?;
902        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
903        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
904
905        let accounts = self.state.read();
906        let empty = EventBridgeState::new(&req.account_id, &req.region);
907        let state = accounts.get(&req.account_id).unwrap_or(&empty);
908        let bus_name = state.resolve_bus_name(event_bus_name);
909        let key = (bus_name.clone(), name.to_string());
910
911        let rule = state.rules.get(&key).ok_or_else(|| {
912            AwsServiceError::aws_error(
913                StatusCode::BAD_REQUEST,
914                "ResourceNotFoundException",
915                format!("Rule {name} does not exist."),
916            )
917        })?;
918
919        let mut resp = json!({
920            "Name": rule.name,
921            "Arn": rule.arn,
922            "EventBusName": rule.event_bus_name,
923            "State": rule.state,
924        });
925
926        if let Some(ref desc) = rule.description {
927            resp["Description"] = json!(desc);
928        }
929        if let Some(ref ep) = rule.event_pattern {
930            resp["EventPattern"] = json!(ep);
931        }
932        if let Some(ref se) = rule.schedule_expression {
933            resp["ScheduleExpression"] = json!(se);
934        }
935        if let Some(ref role) = rule.role_arn {
936            resp["RoleArn"] = json!(role);
937        }
938        if let Some(ref mb) = rule.managed_by {
939            resp["ManagedBy"] = json!(mb);
940        }
941        if let Some(ref cb) = rule.created_by {
942            resp["CreatedBy"] = json!(cb);
943        }
944        // If non-default bus, set CreatedBy to account_id
945        if rule.event_bus_name != "default" && rule.created_by.is_none() {
946            resp["CreatedBy"] = json!(state.account_id);
947        }
948
949        Ok(AwsResponse::ok_json(resp))
950    }
951
952    fn enable_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
953        let body = req.json_body();
954        validate_required("Name", &body["Name"])?;
955        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
956        validate_string_length("name", name, 1, 64)?;
957        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
958        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
959
960        let mut accounts = self.state.write();
961        let state = accounts.get_or_create(&req.account_id);
962        let bus_name = state.resolve_bus_name(event_bus_name);
963        let key = (bus_name, name.to_string());
964
965        let rule = state.rules.get_mut(&key).ok_or_else(|| {
966            AwsServiceError::aws_error(
967                StatusCode::BAD_REQUEST,
968                "ResourceNotFoundException",
969                format!("Rule {name} does not exist."),
970            )
971        })?;
972
973        rule.state = "ENABLED".to_string();
974        Ok(AwsResponse::ok_json(json!({})))
975    }
976
977    fn disable_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
978        let body = req.json_body();
979        validate_required("Name", &body["Name"])?;
980        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
981        validate_string_length("name", name, 1, 64)?;
982        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
983        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
984
985        let mut accounts = self.state.write();
986        let state = accounts.get_or_create(&req.account_id);
987        let bus_name = state.resolve_bus_name(event_bus_name);
988        let key = (bus_name, name.to_string());
989
990        let rule = state.rules.get_mut(&key).ok_or_else(|| {
991            AwsServiceError::aws_error(
992                StatusCode::BAD_REQUEST,
993                "ResourceNotFoundException",
994                format!("Rule {name} does not exist."),
995            )
996        })?;
997
998        rule.state = "DISABLED".to_string();
999        Ok(AwsResponse::ok_json(json!({})))
1000    }
1001
1002    // ─── Target Operations ──────────────────────────────────────────────
1003
1004    fn put_targets(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1005        let body = req.json_body();
1006        validate_required("Rule", &body["Rule"])?;
1007        let rule_name = body["Rule"].as_str().ok_or_else(|| missing("Rule"))?;
1008        validate_string_length("rule", rule_name, 1, 64)?;
1009        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
1010        validate_required("Targets", &body["Targets"])?;
1011        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
1012        let targets = body["Targets"]
1013            .as_array()
1014            .ok_or_else(|| missing("Targets"))?;
1015
1016        // Validate targets - check for FIFO SQS without SqsParameters
1017        for target in targets {
1018            let target_id = target["Id"].as_str().unwrap_or("");
1019            let target_arn = target["Arn"].as_str().unwrap_or("");
1020
1021            if target_arn.ends_with(".fifo") && target.get("SqsParameters").is_none() {
1022                return Err(AwsServiceError::aws_error(
1023                    StatusCode::BAD_REQUEST,
1024                    "ValidationException",
1025                    format!(
1026                        "Parameter(s) SqsParameters must be specified for target: {target_id}."
1027                    ),
1028                ));
1029            }
1030
1031            // Validate ARN format
1032            if !target_arn.starts_with("arn:") {
1033                return Err(AwsServiceError::aws_error(
1034                    StatusCode::BAD_REQUEST,
1035                    "ValidationException",
1036                    format!(
1037                        "Parameter {target_arn} is not valid. Reason: Provided Arn is not in correct format."
1038                    ),
1039                ));
1040            }
1041        }
1042
1043        let mut accounts = self.state.write();
1044        let state = accounts.get_or_create(&req.account_id);
1045        let bus_name = state.resolve_bus_name(event_bus_name);
1046        let key = (bus_name.clone(), rule_name.to_string());
1047
1048        let rule = state.rules.get_mut(&key).ok_or_else(|| {
1049            AwsServiceError::aws_error(
1050                StatusCode::BAD_REQUEST,
1051                "ResourceNotFoundException",
1052                format!("Rule {rule_name} does not exist on EventBus {bus_name}."),
1053            )
1054        })?;
1055
1056        for target in targets {
1057            let et = parse_target(target);
1058            // Remove existing target with same ID
1059            rule.targets.retain(|t| t.id != et.id);
1060            rule.targets.push(et);
1061        }
1062
1063        Ok(AwsResponse::ok_json(json!({
1064            "FailedEntryCount": 0,
1065            "FailedEntries": [],
1066        })))
1067    }
1068
1069    fn remove_targets(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1070        let body = req.json_body();
1071        validate_required("Rule", &body["Rule"])?;
1072        let rule_name = body["Rule"].as_str().ok_or_else(|| missing("Rule"))?;
1073        validate_string_length("rule", rule_name, 1, 64)?;
1074        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
1075        validate_required("Ids", &body["Ids"])?;
1076        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
1077        let ids = body["Ids"].as_array().ok_or_else(|| missing("Ids"))?;
1078
1079        let target_ids: Vec<String> = ids
1080            .iter()
1081            .filter_map(|v| v.as_str().map(|s| s.to_string()))
1082            .collect();
1083
1084        let mut accounts = self.state.write();
1085        let state = accounts.get_or_create(&req.account_id);
1086        let bus_name = state.resolve_bus_name(event_bus_name);
1087        let key = (bus_name.clone(), rule_name.to_string());
1088
1089        let rule = state.rules.get_mut(&key).ok_or_else(|| {
1090            AwsServiceError::aws_error(
1091                StatusCode::BAD_REQUEST,
1092                "ResourceNotFoundException",
1093                format!("Rule {rule_name} does not exist on EventBus {bus_name}."),
1094            )
1095        })?;
1096
1097        rule.targets.retain(|t| !target_ids.contains(&t.id));
1098
1099        Ok(AwsResponse::ok_json(json!({
1100            "FailedEntryCount": 0,
1101            "FailedEntries": [],
1102        })))
1103    }
1104
1105    fn list_targets_by_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1106        let body = req.json_body();
1107        validate_required("Rule", &body["Rule"])?;
1108        let rule_name = body["Rule"].as_str().ok_or_else(|| missing("Rule"))?;
1109        validate_string_length("rule", rule_name, 1, 64)?;
1110        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
1111        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1112        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1113        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
1114        let limit = body["Limit"].as_u64().map(|n| n as usize);
1115        let next_token = body["NextToken"].as_str();
1116
1117        let accounts = self.state.read();
1118        let empty = EventBridgeState::new(&req.account_id, &req.region);
1119        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1120        let bus_name = state.resolve_bus_name(event_bus_name);
1121        let key = (bus_name, rule_name.to_string());
1122
1123        let rule = state.rules.get(&key).ok_or_else(|| {
1124            AwsServiceError::aws_error(
1125                StatusCode::BAD_REQUEST,
1126                "ResourceNotFoundException",
1127                format!("Rule {rule_name} does not exist."),
1128            )
1129        })?;
1130
1131        let all_targets = &rule.targets;
1132        let start = next_token
1133            .and_then(|t| t.parse::<usize>().ok())
1134            .unwrap_or(0)
1135            .min(all_targets.len());
1136        let slice = &all_targets[start..];
1137
1138        let (page, new_next_token) = if let Some(lim) = limit {
1139            if slice.len() > lim {
1140                (&slice[..lim], Some((start + lim).to_string()))
1141            } else {
1142                (slice, None)
1143            }
1144        } else {
1145            (slice, None)
1146        };
1147
1148        let targets: Vec<Value> = page.iter().map(target_to_json).collect();
1149
1150        let mut resp = json!({ "Targets": targets });
1151        if let Some(token) = new_next_token {
1152            resp["NextToken"] = json!(token);
1153        }
1154
1155        Ok(AwsResponse::ok_json(resp))
1156    }
1157
1158    fn list_rule_names_by_target(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1159        let body = req.json_body();
1160        validate_required("TargetArn", &body["TargetArn"])?;
1161        let target_arn = body["TargetArn"]
1162            .as_str()
1163            .ok_or_else(|| missing("TargetArn"))?;
1164        validate_string_length("targetArn", target_arn, 1, 1600)?;
1165        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
1166        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1167        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1168        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
1169        let limit = body["Limit"].as_u64().map(|n| n as usize);
1170        let next_token = body["NextToken"].as_str();
1171
1172        let accounts = self.state.read();
1173        let empty = EventBridgeState::new(&req.account_id, &req.region);
1174        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1175        let bus_name = state.resolve_bus_name(event_bus_name);
1176
1177        // Deduplicate rule names
1178        let mut rule_names: Vec<String> = Vec::new();
1179        for rule in state.rules.values() {
1180            if rule.event_bus_name == bus_name
1181                && rule.targets.iter().any(|t| t.arn == target_arn)
1182                && !rule_names.contains(&rule.name)
1183            {
1184                rule_names.push(rule.name.clone());
1185            }
1186        }
1187        rule_names.sort();
1188
1189        let start = next_token
1190            .and_then(|t| t.parse::<usize>().ok())
1191            .unwrap_or(0)
1192            .min(rule_names.len());
1193        let slice = &rule_names[start..];
1194
1195        let (page, new_next_token) = if let Some(lim) = limit {
1196            if slice.len() > lim {
1197                (&slice[..lim], Some((start + lim).to_string()))
1198            } else {
1199                (slice, None)
1200            }
1201        } else {
1202            (slice, None)
1203        };
1204
1205        let mut resp = json!({ "RuleNames": page });
1206        if let Some(token) = new_next_token {
1207            resp["NextToken"] = json!(token);
1208        }
1209
1210        Ok(AwsResponse::ok_json(resp))
1211    }
1212
1213    // ─── Partner Event Sources ────────────���───────────────────────────
1214
1215    fn create_partner_event_source(
1216        &self,
1217        req: &AwsRequest,
1218    ) -> Result<AwsResponse, AwsServiceError> {
1219        let body = req.json_body();
1220        validate_required("Name", &body["Name"])?;
1221        let name = body["Name"]
1222            .as_str()
1223            .ok_or_else(|| missing("Name"))?
1224            .to_string();
1225        validate_string_length("name", &name, 1, 256)?;
1226        validate_required("Account", &body["Account"])?;
1227        let account = body["Account"]
1228            .as_str()
1229            .ok_or_else(|| missing("Account"))?
1230            .to_string();
1231        validate_string_length("account", &account, 12, 12)?;
1232
1233        let mut accounts = self.state.write();
1234        let state = accounts.get_or_create(&req.account_id);
1235        if state.partner_event_sources.contains_key(&name) {
1236            return Err(AwsServiceError::aws_error(
1237                StatusCode::CONFLICT,
1238                "ResourceAlreadyExistsException",
1239                format!("Partner event source {name} already exists."),
1240            ));
1241        }
1242        let arn = format!(
1243            "arn:aws:events:{}::event-source/aws.partner/{}",
1244            state.region, name
1245        );
1246        let now = Utc::now();
1247        let ps = PartnerEventSource {
1248            name: name.clone(),
1249            arn: arn.clone(),
1250            account,
1251            creation_time: now,
1252            expiration_time: None,
1253            state: "ACTIVE".to_string(),
1254        };
1255        state.partner_event_sources.insert(name.clone(), ps);
1256
1257        Ok(AwsResponse::ok_json(json!({ "EventSourceArn": arn })))
1258    }
1259
1260    fn delete_partner_event_source(
1261        &self,
1262        req: &AwsRequest,
1263    ) -> Result<AwsResponse, AwsServiceError> {
1264        let body = req.json_body();
1265        validate_required("Name", &body["Name"])?;
1266        let name = body["Name"]
1267            .as_str()
1268            .ok_or_else(|| missing("Name"))?
1269            .to_string();
1270        validate_required("Account", &body["Account"])?;
1271        let account = body["Account"]
1272            .as_str()
1273            .ok_or_else(|| missing("Account"))?
1274            .to_string();
1275
1276        let mut accounts = self.state.write();
1277        let state = accounts.get_or_create(&req.account_id);
1278        match state.partner_event_sources.get(&name) {
1279            Some(ps) if ps.account == account => {
1280                state.partner_event_sources.remove(&name);
1281            }
1282            Some(_) => {
1283                return Err(AwsServiceError::aws_error(
1284                    StatusCode::NOT_FOUND,
1285                    "ResourceNotFoundException",
1286                    format!("Partner event source {name} does not exist for account {account}."),
1287                ));
1288            }
1289            None => {
1290                return Err(AwsServiceError::aws_error(
1291                    StatusCode::NOT_FOUND,
1292                    "ResourceNotFoundException",
1293                    format!("Partner event source {name} does not exist."),
1294                ));
1295            }
1296        }
1297
1298        Ok(AwsResponse::ok_json(json!({})))
1299    }
1300
1301    fn describe_partner_event_source(
1302        &self,
1303        req: &AwsRequest,
1304    ) -> Result<AwsResponse, AwsServiceError> {
1305        let body = req.json_body();
1306        validate_required("Name", &body["Name"])?;
1307        let name = body["Name"]
1308            .as_str()
1309            .ok_or_else(|| missing("Name"))?
1310            .to_string();
1311        validate_string_length("name", &name, 1, 256)?;
1312
1313        let accounts = self.state.read();
1314        let empty = EventBridgeState::new(&req.account_id, &req.region);
1315        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1316        let ps = state.partner_event_sources.get(&name).ok_or_else(|| {
1317            AwsServiceError::aws_error(
1318                StatusCode::NOT_FOUND,
1319                "ResourceNotFoundException",
1320                format!("Partner event source {name} does not exist."),
1321            )
1322        })?;
1323
1324        Ok(AwsResponse::ok_json(json!({
1325            "Arn": ps.arn,
1326            "Name": ps.name,
1327        })))
1328    }
1329
1330    fn list_partner_event_sources(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1331        let body = req.json_body();
1332        validate_required("namePrefix", &body["NamePrefix"])?;
1333        let name_prefix = body["NamePrefix"]
1334            .as_str()
1335            .ok_or_else(|| missing("NamePrefix"))?;
1336        validate_string_length("namePrefix", name_prefix, 1, 256)?;
1337        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1338        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1339        let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
1340
1341        let accounts = self.state.read();
1342        let empty = EventBridgeState::new(&req.account_id, &req.region);
1343        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1344        let all: Vec<Value> = state
1345            .partner_event_sources
1346            .values()
1347            .filter(|ps| ps.name.starts_with(name_prefix))
1348            .map(|ps| {
1349                json!({
1350                    "Arn": ps.arn,
1351                    "Name": ps.name,
1352                })
1353            })
1354            .collect();
1355
1356        let (sources, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
1357        let mut resp = json!({ "PartnerEventSources": sources });
1358        if let Some(token) = next_token {
1359            resp["NextToken"] = json!(token);
1360        }
1361
1362        Ok(AwsResponse::ok_json(resp))
1363    }
1364
1365    fn list_partner_event_source_accounts(
1366        &self,
1367        req: &AwsRequest,
1368    ) -> Result<AwsResponse, AwsServiceError> {
1369        let body = req.json_body();
1370        validate_required("EventSourceName", &body["EventSourceName"])?;
1371        let event_source_name = body["EventSourceName"]
1372            .as_str()
1373            .ok_or_else(|| missing("EventSourceName"))?;
1374        validate_string_length("eventSourceName", event_source_name, 1, 256)?;
1375        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1376        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1377
1378        let accounts = self.state.read();
1379        let empty = EventBridgeState::new(&req.account_id, &req.region);
1380        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1381        let accounts: Vec<Value> = state
1382            .partner_event_sources
1383            .values()
1384            .filter(|ps| ps.name == event_source_name)
1385            .map(|ps| json!({ "Account": ps.account }))
1386            .collect();
1387
1388        Ok(AwsResponse::ok_json(json!({
1389            "PartnerEventSourceAccounts": accounts
1390        })))
1391    }
1392
1393    fn activate_event_source(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1394        let body = req.json_body();
1395        validate_required("Name", &body["Name"])?;
1396        let name = body["Name"]
1397            .as_str()
1398            .ok_or_else(|| missing("Name"))?
1399            .to_string();
1400
1401        let mut accounts = self.state.write();
1402        let state = accounts.get_or_create(&req.account_id);
1403        let ps = state.partner_event_sources.get_mut(&name).ok_or_else(|| {
1404            AwsServiceError::aws_error(
1405                StatusCode::NOT_FOUND,
1406                "ResourceNotFoundException",
1407                format!("Event source {name} does not exist."),
1408            )
1409        })?;
1410        ps.state = "ACTIVE".to_string();
1411
1412        Ok(AwsResponse::ok_json(json!({})))
1413    }
1414
1415    fn deactivate_event_source(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1416        let body = req.json_body();
1417        validate_required("Name", &body["Name"])?;
1418        let name = body["Name"]
1419            .as_str()
1420            .ok_or_else(|| missing("Name"))?
1421            .to_string();
1422
1423        let mut accounts = self.state.write();
1424        let state = accounts.get_or_create(&req.account_id);
1425        let ps = state.partner_event_sources.get_mut(&name).ok_or_else(|| {
1426            AwsServiceError::aws_error(
1427                StatusCode::NOT_FOUND,
1428                "ResourceNotFoundException",
1429                format!("Event source {name} does not exist."),
1430            )
1431        })?;
1432        ps.state = "INACTIVE".to_string();
1433
1434        Ok(AwsResponse::ok_json(json!({})))
1435    }
1436
1437    fn describe_event_source(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1438        let body = req.json_body();
1439        validate_required("Name", &body["Name"])?;
1440        let name = body["Name"]
1441            .as_str()
1442            .ok_or_else(|| missing("Name"))?
1443            .to_string();
1444
1445        let accounts = self.state.read();
1446        let empty = EventBridgeState::new(&req.account_id, &req.region);
1447        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1448        let ps = state.partner_event_sources.get(&name).ok_or_else(|| {
1449            AwsServiceError::aws_error(
1450                StatusCode::NOT_FOUND,
1451                "ResourceNotFoundException",
1452                format!("Event source {name} does not exist."),
1453            )
1454        })?;
1455
1456        Ok(AwsResponse::ok_json(json!({
1457            "Arn": ps.arn,
1458            "Name": ps.name,
1459            "CreatedBy": ps.account,
1460            "CreationTime": ps.creation_time.timestamp() as f64,
1461            "State": ps.state,
1462        })))
1463    }
1464
1465    fn list_event_sources(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1466        let body = req.json_body();
1467        validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 256)?;
1468        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1469        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1470        let name_prefix = body["NamePrefix"].as_str();
1471        let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
1472
1473        let accounts = self.state.read();
1474        let empty = EventBridgeState::new(&req.account_id, &req.region);
1475        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1476        let all: Vec<Value> = state
1477            .partner_event_sources
1478            .values()
1479            .filter(|ps| match name_prefix {
1480                Some(prefix) => ps.name.starts_with(prefix),
1481                None => true,
1482            })
1483            .map(|ps| {
1484                json!({
1485                    "Arn": ps.arn,
1486                    "Name": ps.name,
1487                    "CreatedBy": ps.account,
1488                    "CreationTime": ps.creation_time.timestamp() as f64,
1489                    "State": ps.state,
1490                })
1491            })
1492            .collect();
1493
1494        let (sources, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
1495        let mut resp = json!({ "EventSources": sources });
1496        if let Some(token) = next_token {
1497            resp["NextToken"] = json!(token);
1498        }
1499
1500        Ok(AwsResponse::ok_json(resp))
1501    }
1502
1503    fn put_partner_events(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1504        let body = req.json_body();
1505        validate_required("Entries", &body["Entries"])?;
1506        let entries = body["Entries"]
1507            .as_array()
1508            .ok_or_else(|| missing("Entries"))?;
1509
1510        let mut result_entries = Vec::new();
1511        for _entry in entries {
1512            let event_id = uuid::Uuid::new_v4().to_string();
1513            result_entries.push(json!({ "EventId": event_id }));
1514        }
1515
1516        Ok(AwsResponse::ok_json(json!({
1517            "FailedEntryCount": 0,
1518            "Entries": result_entries,
1519        })))
1520    }
1521
1522    // ─── TestEventPattern ────────────────────────────────────────────────
1523
1524    fn test_event_pattern(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1525        let body = req.json_body();
1526        validate_required("EventPattern", &body["EventPattern"])?;
1527        validate_required("Event", &body["Event"])?;
1528        let event_pattern = body["EventPattern"]
1529            .as_str()
1530            .ok_or_else(|| missing("EventPattern"))?;
1531        let event_str = body["Event"].as_str().ok_or_else(|| missing("Event"))?;
1532
1533        // Parse the event JSON
1534        let event: Value = serde_json::from_str(event_str).map_err(|_| {
1535            AwsServiceError::aws_error(
1536                StatusCode::BAD_REQUEST,
1537                "InvalidEventPatternException",
1538                "Event is not valid JSON.",
1539            )
1540        })?;
1541
1542        // Parse the pattern JSON
1543        let _pattern: Value = serde_json::from_str(event_pattern).map_err(|_| {
1544            AwsServiceError::aws_error(
1545                StatusCode::BAD_REQUEST,
1546                "InvalidEventPatternException",
1547                "Event pattern is not valid JSON.",
1548            )
1549        })?;
1550
1551        let source = event["source"].as_str().unwrap_or("");
1552        let detail_type = event["detail-type"].as_str().unwrap_or("");
1553        let detail = event
1554            .get("detail")
1555            .map(|v| serde_json::to_string(v).unwrap_or_default())
1556            .unwrap_or_else(|| "{}".to_string());
1557        let account = event["account"].as_str().unwrap_or("");
1558        let region = event["region"].as_str().unwrap_or("");
1559        let resources: Vec<String> = event["resources"]
1560            .as_array()
1561            .map(|arr| {
1562                arr.iter()
1563                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
1564                    .collect()
1565            })
1566            .unwrap_or_default();
1567
1568        let result = matches_pattern(
1569            Some(event_pattern),
1570            source,
1571            detail_type,
1572            &detail,
1573            account,
1574            region,
1575            &resources,
1576        );
1577
1578        Ok(AwsResponse::ok_json(json!({ "Result": result })))
1579    }
1580
1581    // ─── UpdateEventBus ─────────────────────────────────────────────────
1582
1583    fn update_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1584        let body = req.json_body();
1585        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
1586        validate_optional_string_length(
1587            "kmsKeyIdentifier",
1588            body["KmsKeyIdentifier"].as_str(),
1589            0,
1590            2048,
1591        )?;
1592        let name = body["Name"].as_str().unwrap_or("default");
1593
1594        let mut accounts = self.state.write();
1595        let state = accounts.get_or_create(&req.account_id);
1596        let bus = state.buses.get_mut(name).ok_or_else(|| {
1597            AwsServiceError::aws_error(
1598                StatusCode::BAD_REQUEST,
1599                "ResourceNotFoundException",
1600                format!("Event bus {name} does not exist."),
1601            )
1602        })?;
1603
1604        if let Some(desc) = body["Description"].as_str() {
1605            bus.description = Some(desc.to_string());
1606        }
1607        if let Some(kms) = body["KmsKeyIdentifier"].as_str() {
1608            bus.kms_key_identifier = Some(kms.to_string());
1609        }
1610        if let Some(dlc) = body.get("DeadLetterConfig") {
1611            bus.dead_letter_config = Some(dlc.clone());
1612        }
1613        bus.last_modified_time = Utc::now();
1614
1615        let arn = bus.arn.clone();
1616        let bus_name = bus.name.clone();
1617
1618        Ok(AwsResponse::ok_json(json!({
1619            "Arn": arn,
1620            "Name": bus_name,
1621        })))
1622    }
1623
1624    // ─── Endpoint Operations ────────────────────────────────────────────
1625
1626    fn create_endpoint(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1627        let body = req.json_body();
1628        validate_required("Name", &body["Name"])?;
1629        let name = body["Name"]
1630            .as_str()
1631            .ok_or_else(|| missing("Name"))?
1632            .to_string();
1633        validate_string_length("name", &name, 1, 64)?;
1634        validate_required("RoutingConfig", &body["RoutingConfig"])?;
1635        validate_required("EventBuses", &body["EventBuses"])?;
1636
1637        let description = body["Description"].as_str().map(|s| s.to_string());
1638        let routing_config = body["RoutingConfig"].clone();
1639        let replication_config = body.get("ReplicationConfig").cloned();
1640        let event_buses = body["EventBuses"].as_array().cloned().unwrap_or_default();
1641        let role_arn = body["RoleArn"].as_str().map(|s| s.to_string());
1642
1643        let mut accounts = self.state.write();
1644        let state = accounts.get_or_create(&req.account_id);
1645        if state.endpoints.contains_key(&name) {
1646            return Err(AwsServiceError::aws_error(
1647                StatusCode::CONFLICT,
1648                "ResourceAlreadyExistsException",
1649                format!("Endpoint {name} already exists."),
1650            ));
1651        }
1652
1653        let endpoint_id = format!("{}.abc123", name);
1654        let arn = format!(
1655            "arn:aws:events:{}:{}:endpoint/{}",
1656            req.region, state.account_id, name
1657        );
1658        let endpoint_url = format!(
1659            "https://{}.endpoint.events.{}.amazonaws.com",
1660            endpoint_id, req.region
1661        );
1662        let now = Utc::now();
1663
1664        let endpoint = Endpoint {
1665            name: name.clone(),
1666            arn: arn.clone(),
1667            endpoint_id: endpoint_id.clone(),
1668            endpoint_url: Some(endpoint_url),
1669            description,
1670            routing_config: routing_config.clone(),
1671            replication_config: replication_config.clone(),
1672            event_buses: event_buses.clone(),
1673            role_arn: role_arn.clone(),
1674            state: "ACTIVE".to_string(),
1675            creation_time: now,
1676            last_modified_time: now,
1677        };
1678        state.endpoints.insert(name.clone(), endpoint);
1679
1680        let mut resp = json!({
1681            "Name": name,
1682            "Arn": arn,
1683            "State": "ACTIVE",
1684            "RoutingConfig": routing_config,
1685            "EventBuses": event_buses,
1686        });
1687        if let Some(ref rc) = replication_config {
1688            resp["ReplicationConfig"] = rc.clone();
1689        }
1690        if let Some(ref ra) = role_arn {
1691            resp["RoleArn"] = json!(ra);
1692        }
1693
1694        Ok(AwsResponse::ok_json(resp))
1695    }
1696
1697    fn delete_endpoint(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1698        let body = req.json_body();
1699        validate_required("Name", &body["Name"])?;
1700        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
1701
1702        let mut accounts = self.state.write();
1703        let state = accounts.get_or_create(&req.account_id);
1704        state.endpoints.remove(name).ok_or_else(|| {
1705            AwsServiceError::aws_error(
1706                StatusCode::BAD_REQUEST,
1707                "ResourceNotFoundException",
1708                format!("Endpoint '{name}' does not exist."),
1709            )
1710        })?;
1711
1712        Ok(AwsResponse::ok_json(json!({})))
1713    }
1714
1715    fn describe_endpoint(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1716        let body = req.json_body();
1717        validate_required("Name", &body["Name"])?;
1718        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
1719
1720        let accounts = self.state.read();
1721        let empty = EventBridgeState::new(&req.account_id, &req.region);
1722        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1723        let ep = state.endpoints.get(name).ok_or_else(|| {
1724            AwsServiceError::aws_error(
1725                StatusCode::BAD_REQUEST,
1726                "ResourceNotFoundException",
1727                format!("Endpoint '{name}' does not exist."),
1728            )
1729        })?;
1730
1731        let mut resp = json!({
1732            "Name": ep.name,
1733            "Arn": ep.arn,
1734            "EndpointId": ep.endpoint_id,
1735            "State": ep.state,
1736            "RoutingConfig": ep.routing_config,
1737            "EventBuses": ep.event_buses,
1738            "CreationTime": ep.creation_time.timestamp() as f64,
1739            "LastModifiedTime": ep.last_modified_time.timestamp() as f64,
1740        });
1741        if let Some(ref url) = ep.endpoint_url {
1742            resp["EndpointUrl"] = json!(url);
1743        }
1744        if let Some(ref desc) = ep.description {
1745            resp["Description"] = json!(desc);
1746        }
1747        if let Some(ref rc) = ep.replication_config {
1748            resp["ReplicationConfig"] = rc.clone();
1749        }
1750        if let Some(ref ra) = ep.role_arn {
1751            resp["RoleArn"] = json!(ra);
1752        }
1753
1754        Ok(AwsResponse::ok_json(resp))
1755    }
1756
1757    fn list_endpoints(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1758        let body = req.json_body();
1759        validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 64)?;
1760        validate_optional_string_length("homeRegion", body["HomeRegion"].as_str(), 9, 20)?;
1761        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1762        validate_optional_range_i64("maxResults", body["MaxResults"].as_i64(), 1, 100)?;
1763        let name_prefix = body["NamePrefix"].as_str();
1764        let limit = body["MaxResults"].as_i64().unwrap_or(100) as usize;
1765
1766        let accounts = self.state.read();
1767        let empty = EventBridgeState::new(&req.account_id, &req.region);
1768        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1769        let all: Vec<Value> = state
1770            .endpoints
1771            .values()
1772            .filter(|ep| match name_prefix {
1773                Some(prefix) => ep.name.starts_with(prefix),
1774                None => true,
1775            })
1776            .map(|ep| {
1777                let mut obj = json!({
1778                    "Name": ep.name,
1779                    "Arn": ep.arn,
1780                    "EndpointId": ep.endpoint_id,
1781                    "State": ep.state,
1782                    "RoutingConfig": ep.routing_config,
1783                    "EventBuses": ep.event_buses,
1784                    "CreationTime": ep.creation_time.timestamp() as f64,
1785                    "LastModifiedTime": ep.last_modified_time.timestamp() as f64,
1786                });
1787                if let Some(ref url) = ep.endpoint_url {
1788                    obj["EndpointUrl"] = json!(url);
1789                }
1790                obj
1791            })
1792            .collect();
1793
1794        let (endpoints, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
1795        let mut resp = json!({ "Endpoints": endpoints });
1796        if let Some(token) = next_token {
1797            resp["NextToken"] = json!(token);
1798        }
1799
1800        Ok(AwsResponse::ok_json(resp))
1801    }
1802
1803    fn update_endpoint(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1804        let body = req.json_body();
1805        validate_required("Name", &body["Name"])?;
1806        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
1807
1808        let mut accounts = self.state.write();
1809        let state = accounts.get_or_create(&req.account_id);
1810        let ep = state.endpoints.get_mut(name).ok_or_else(|| {
1811            AwsServiceError::aws_error(
1812                StatusCode::BAD_REQUEST,
1813                "ResourceNotFoundException",
1814                format!("Endpoint '{name}' does not exist."),
1815            )
1816        })?;
1817
1818        if let Some(desc) = body["Description"].as_str() {
1819            ep.description = Some(desc.to_string());
1820        }
1821        if !body["RoutingConfig"].is_null() {
1822            ep.routing_config = body["RoutingConfig"].clone();
1823        }
1824        if let Some(rc) = body.get("ReplicationConfig") {
1825            ep.replication_config = Some(rc.clone());
1826        }
1827        if let Some(buses) = body["EventBuses"].as_array() {
1828            ep.event_buses = buses.clone();
1829        }
1830        if let Some(ra) = body["RoleArn"].as_str() {
1831            ep.role_arn = Some(ra.to_string());
1832        }
1833        ep.last_modified_time = Utc::now();
1834
1835        let resp = json!({
1836            "Name": ep.name,
1837            "Arn": ep.arn,
1838            "EndpointId": ep.endpoint_id,
1839            "State": ep.state,
1840            "RoutingConfig": ep.routing_config,
1841            "EventBuses": ep.event_buses,
1842        });
1843
1844        Ok(AwsResponse::ok_json(resp))
1845    }
1846
1847    // ─── DeauthorizeConnection ──────────────────────────────────────────
1848
1849    fn deauthorize_connection(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1850        let body = req.json_body();
1851        validate_required("Name", &body["Name"])?;
1852        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
1853        validate_string_length("name", name, 1, 64)?;
1854
1855        let mut accounts = self.state.write();
1856        let state = accounts.get_or_create(&req.account_id);
1857        let conn = state.connections.get_mut(name).ok_or_else(|| {
1858            AwsServiceError::aws_error(
1859                StatusCode::BAD_REQUEST,
1860                "ResourceNotFoundException",
1861                format!("Connection '{name}' does not exist."),
1862            )
1863        })?;
1864
1865        conn.connection_state = "DEAUTHORIZING".to_string();
1866        conn.last_modified_time = Utc::now();
1867
1868        let resp = json!({
1869            "ConnectionArn": conn.arn,
1870            "ConnectionState": conn.connection_state,
1871            "CreationTime": conn.creation_time.timestamp() as f64,
1872            "LastModifiedTime": conn.last_modified_time.timestamp() as f64,
1873            "LastAuthorizedTime": conn.last_authorized_time.timestamp() as f64,
1874        });
1875
1876        Ok(AwsResponse::ok_json(resp))
1877    }
1878
1879    // ─── PutEvents ──────────────────────────────────────────────────────
1880
1881    fn put_events(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1882        let body = req.json_body();
1883        validate_required("Entries", &body["Entries"])?;
1884        validate_optional_string_length("endpointId", body["EndpointId"].as_str(), 1, 50)?;
1885        let entries = body["Entries"]
1886            .as_array()
1887            .ok_or_else(|| missing("Entries"))?;
1888
1889        // Validate entries count
1890        if entries.is_empty() {
1891            return Err(AwsServiceError::aws_error(
1892                StatusCode::BAD_REQUEST,
1893                "ValidationException",
1894                "1 validation error detected: Value '[PutEventsRequestEntry]' at 'entries' failed to satisfy constraint: Member must have length greater than or equal to 1",
1895            ));
1896        }
1897        if entries.len() > 10 {
1898            return Err(AwsServiceError::aws_error(
1899                StatusCode::BAD_REQUEST,
1900                "ValidationException",
1901                "1 validation error detected: Value '[PutEventsRequestEntry]' at 'entries' failed to satisfy constraint: Member must have length less than or equal to 10",
1902            ));
1903        }
1904
1905        let mut accounts = self.state.write();
1906        let state = accounts.get_or_create(&req.account_id);
1907        let mut result_entries = Vec::new();
1908        let mut events_to_deliver = Vec::new();
1909        let mut failed_count = 0;
1910
1911        for entry in entries {
1912            let source = entry["Source"].as_str().unwrap_or("").to_string();
1913            let detail_type = entry["DetailType"].as_str().unwrap_or("").to_string();
1914            let detail = entry["Detail"].as_str().unwrap_or("").to_string();
1915
1916            if let Err(error) = validate_put_events_entry(&source, &detail_type, &detail) {
1917                failed_count += 1;
1918                result_entries.push(error);
1919                continue;
1920            }
1921
1922            let event_id = uuid::Uuid::new_v4().to_string();
1923            let raw_bus = entry["EventBusName"]
1924                .as_str()
1925                .unwrap_or("default")
1926                .to_string();
1927            let event_bus_name = state.resolve_bus_name(&raw_bus);
1928            let time = parse_put_events_time(&entry["Time"]);
1929            let resources: Vec<String> = entry["Resources"]
1930                .as_array()
1931                .map(|arr| {
1932                    arr.iter()
1933                        .filter_map(|v| v.as_str().map(|s| s.to_string()))
1934                        .collect()
1935                })
1936                .unwrap_or_default();
1937
1938            let event = PutEvent {
1939                event_id: event_id.clone(),
1940                source: source.clone(),
1941                detail_type: detail_type.clone(),
1942                detail: detail.clone(),
1943                event_bus_name: event_bus_name.clone(),
1944                time,
1945                resources: resources.clone(),
1946            };
1947
1948            archive_matching_event(
1949                state,
1950                &event,
1951                &event_bus_name,
1952                &source,
1953                &detail_type,
1954                &detail,
1955                &req.account_id,
1956                &req.region,
1957                &resources,
1958            );
1959
1960            state.events.push(event);
1961
1962            // Find matching rules and their targets
1963            let matching_targets: Vec<EventTarget> = state
1964                .rules
1965                .values()
1966                .filter(|r| {
1967                    r.event_bus_name == event_bus_name
1968                        && r.state == "ENABLED"
1969                        && matches_pattern(
1970                            r.event_pattern.as_deref(),
1971                            &source,
1972                            &detail_type,
1973                            &detail,
1974                            &req.account_id,
1975                            &req.region,
1976                            &resources,
1977                        )
1978                })
1979                .flat_map(|r| r.targets.clone())
1980                .collect();
1981
1982            if !matching_targets.is_empty() {
1983                events_to_deliver.push((
1984                    event_id.clone(),
1985                    source,
1986                    detail_type,
1987                    detail,
1988                    time,
1989                    resources,
1990                    matching_targets,
1991                ));
1992            }
1993
1994            result_entries.push(json!({ "EventId": event_id }));
1995        }
1996
1997        // Drop the lock before delivering
1998        drop(accounts);
1999
2000        // Deliver to targets
2001        for (event_id, source, detail_type, detail, time, resources, targets) in events_to_deliver {
2002            let detail_value: Value = serde_json::from_str(&detail).unwrap_or(json!({}));
2003            let event_json = json!({
2004                "version": "0",
2005                "id": event_id,
2006                "source": source,
2007                "account": req.account_id,
2008                "detail-type": detail_type,
2009                "detail": detail_value,
2010                "time": time.format("%Y-%m-%dT%H:%M:%SZ").to_string(),
2011                "region": req.region,
2012                "resources": resources,
2013            });
2014            let event_str = event_json.to_string();
2015
2016            for target in targets {
2017                let arn = &target.arn;
2018                // Compute the message body, applying InputTransformer if present
2019                let body_str = if let Some(ref transformer) = target.input_transformer {
2020                    apply_input_transformer(transformer, &event_json)
2021                } else if let Some(ref input) = target.input {
2022                    input.clone()
2023                } else if let Some(ref input_path) = target.input_path {
2024                    resolve_json_path(&event_json, input_path)
2025                        .map(|v| v.to_string())
2026                        .unwrap_or_else(|| event_str.clone())
2027                } else {
2028                    event_str.clone()
2029                };
2030
2031                if arn.contains(":sqs:") {
2032                    // Extract FIFO parameters (MessageGroupId)
2033                    let group_id = target
2034                        .sqs_parameters
2035                        .as_ref()
2036                        .and_then(|p| p["MessageGroupId"].as_str())
2037                        .map(|s| s.to_string());
2038                    if group_id.is_some() {
2039                        // FIFO queue: send with group ID but no dedup ID.
2040                        // Queues with content-based dedup will auto-generate one;
2041                        // queues without it will reject the message.
2042                        self.delivery.send_to_sqs_with_attrs(
2043                            arn,
2044                            &body_str,
2045                            &HashMap::new(),
2046                            group_id.as_deref(),
2047                            None,
2048                        );
2049                    } else {
2050                        self.delivery.send_to_sqs(arn, &body_str, &HashMap::new());
2051                    }
2052                } else if arn.contains(":sns:") {
2053                    self.delivery
2054                        .publish_to_sns(arn, &body_str, Some(&detail_type));
2055                } else if arn.contains(":lambda:") {
2056                    tracing::info!(
2057                        function_arn = %arn,
2058                        payload = %body_str,
2059                        "EventBridge delivering to Lambda function"
2060                    );
2061                    let now = Utc::now();
2062                    let mut accounts = self.state.write();
2063                    let state = accounts.get_or_create(&req.account_id);
2064                    state
2065                        .lambda_invocations
2066                        .push(crate::state::LambdaInvocation {
2067                            function_arn: arn.clone(),
2068                            payload: body_str.clone(),
2069                            timestamp: now,
2070                        });
2071                    drop(accounts);
2072                    // Record in Lambda state for cross-service visibility
2073                    if let Some(ref ls) = self.lambda_state {
2074                        ls.write().default_mut().invocations.push(LambdaInvocation {
2075                            function_arn: arn.clone(),
2076                            payload: body_str.clone(),
2077                            timestamp: now,
2078                            source: "aws:events".to_string(),
2079                        });
2080                    }
2081                    // Actually invoke the Lambda function if a container runtime is available
2082                    invoke_lambda_async(
2083                        &self.container_runtime,
2084                        &self.lambda_state,
2085                        arn,
2086                        &body_str,
2087                    );
2088                } else if arn.contains(":logs:") {
2089                    tracing::info!(
2090                        log_group_arn = %arn,
2091                        payload = %body_str,
2092                        "EventBridge delivering to CloudWatch Logs"
2093                    );
2094                    let now = Utc::now();
2095                    let mut accounts = self.state.write();
2096                    let state = accounts.get_or_create(&req.account_id);
2097                    state.log_deliveries.push(crate::state::LogDelivery {
2098                        log_group_arn: arn.clone(),
2099                        payload: body_str.clone(),
2100                        timestamp: now,
2101                    });
2102                    drop(accounts);
2103                    // Write event to CloudWatch Logs state
2104                    if let Some(ref log_state) = self.logs_state {
2105                        deliver_to_logs(log_state, arn, &body_str, now);
2106                    }
2107                } else if arn.contains(":kinesis:") {
2108                    tracing::info!(
2109                        stream_arn = %arn,
2110                        "EventBridge delivering to Kinesis stream"
2111                    );
2112                    // Use event ID as partition key for even distribution
2113                    self.delivery.send_to_kinesis(arn, &body_str, &event_id);
2114                } else if arn.contains(":states:") {
2115                    tracing::info!(
2116                        state_machine_arn = %arn,
2117                        "EventBridge delivering to Step Functions"
2118                    );
2119                    self.delivery.start_stepfunctions_execution(arn, &body_str);
2120                    let mut accounts = self.state.write();
2121                    let state = accounts.get_or_create(&req.account_id);
2122                    state
2123                        .step_function_executions
2124                        .push(crate::state::StepFunctionExecution {
2125                            state_machine_arn: arn.clone(),
2126                            payload: body_str.clone(),
2127                            timestamp: Utc::now(),
2128                        });
2129                } else if arn.contains(":api-destination/") {
2130                    // ApiDestination target: look up destination + connection, then POST
2131                    let accounts = self.state.read();
2132                    let empty = EventBridgeState::new(&req.account_id, &req.region);
2133                    let state = accounts.get(&req.account_id).unwrap_or(&empty);
2134                    let dest = state.api_destinations.values().find(|d| d.arn == *arn);
2135                    if let Some(dest) = dest {
2136                        let url = dest.invocation_endpoint.clone();
2137                        let method = dest.http_method.clone();
2138                        let conn = state
2139                            .connections
2140                            .values()
2141                            .find(|c| c.arn == dest.connection_arn)
2142                            .cloned();
2143                        drop(accounts);
2144
2145                        let payload = body_str.clone();
2146                        tokio::spawn(async move {
2147                            let client = reqwest::Client::new();
2148                            let mut req_builder = match method.as_str() {
2149                                "GET" => client.get(&url),
2150                                "PUT" => client.put(&url),
2151                                "DELETE" => client.delete(&url),
2152                                "PATCH" => client.patch(&url),
2153                                "HEAD" => client.head(&url),
2154                                _ => client.post(&url),
2155                            };
2156                            req_builder = req_builder.header("Content-Type", "application/json");
2157
2158                            // Apply auth from connection
2159                            if let Some(conn) = conn {
2160                                req_builder = apply_connection_auth(req_builder, &conn);
2161                            }
2162
2163                            let result = req_builder.body(payload).send().await;
2164                            if let Err(e) = result {
2165                                tracing::warn!(
2166                                    endpoint = %url,
2167                                    error = %e,
2168                                    "EventBridge ApiDestination delivery failed"
2169                                );
2170                            }
2171                        });
2172                    }
2173                } else if arn.starts_with("https://") || arn.starts_with("http://") {
2174                    // HTTP target — fire-and-forget POST
2175                    let url = arn.clone();
2176                    let payload = body_str.clone();
2177                    tokio::spawn(async move {
2178                        let client = reqwest::Client::new();
2179                        let result = client
2180                            .post(&url)
2181                            .header("Content-Type", "application/json")
2182                            .body(payload)
2183                            .send()
2184                            .await;
2185                        if let Err(e) = result {
2186                            tracing::warn!(
2187                                endpoint = %url,
2188                                error = %e,
2189                                "EventBridge HTTP target delivery failed"
2190                            );
2191                        }
2192                    });
2193                }
2194            }
2195        }
2196
2197        let resp = json!({
2198            "FailedEntryCount": failed_count,
2199            "Entries": result_entries,
2200        });
2201
2202        Ok(AwsResponse::ok_json(resp))
2203    }
2204
2205    // ─── Tagging ────────────────────────────────────────────────────────
2206
2207    fn tag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2208        let body = req.json_body();
2209        validate_required("ResourceARN", &body["ResourceARN"])?;
2210        let arn = body["ResourceARN"]
2211            .as_str()
2212            .ok_or_else(|| missing("ResourceARN"))?;
2213        validate_string_length("resourceARN", arn, 1, 1600)?;
2214        validate_required("Tags", &body["Tags"])?;
2215
2216        let mut accounts = self.state.write();
2217        let state = accounts.get_or_create(&req.account_id);
2218        let tag_map = find_tags_mut(state, arn)?;
2219
2220        fakecloud_core::tags::apply_tags(tag_map, &body, "Tags", "Key", "Value").map_err(|f| {
2221            AwsServiceError::aws_error(
2222                StatusCode::BAD_REQUEST,
2223                "ValidationException",
2224                format!("{f} must be a list"),
2225            )
2226        })?;
2227
2228        Ok(AwsResponse::ok_json(json!({})))
2229    }
2230
2231    fn untag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2232        let body = req.json_body();
2233        validate_required("ResourceARN", &body["ResourceARN"])?;
2234        let arn = body["ResourceARN"]
2235            .as_str()
2236            .ok_or_else(|| missing("ResourceARN"))?;
2237        validate_string_length("resourceARN", arn, 1, 1600)?;
2238        validate_required("TagKeys", &body["TagKeys"])?;
2239
2240        let mut accounts = self.state.write();
2241        let state = accounts.get_or_create(&req.account_id);
2242        let tag_map = find_tags_mut(state, arn)?;
2243
2244        fakecloud_core::tags::remove_tags(tag_map, &body, "TagKeys").map_err(|f| {
2245            AwsServiceError::aws_error(
2246                StatusCode::BAD_REQUEST,
2247                "ValidationException",
2248                format!("{f} must be a list"),
2249            )
2250        })?;
2251
2252        Ok(AwsResponse::ok_json(json!({})))
2253    }
2254
2255    fn list_tags_for_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2256        let body = req.json_body();
2257        validate_required("ResourceARN", &body["ResourceARN"])?;
2258        let arn = body["ResourceARN"]
2259            .as_str()
2260            .ok_or_else(|| missing("ResourceARN"))?;
2261        validate_string_length("resourceARN", arn, 1, 1600)?;
2262
2263        let accounts = self.state.read();
2264        let empty = EventBridgeState::new(&req.account_id, &req.region);
2265        let state = accounts.get(&req.account_id).unwrap_or(&empty);
2266        let tag_map = find_tags(state, arn)?;
2267
2268        let tags = fakecloud_core::tags::tags_to_json(tag_map, "Key", "Value");
2269
2270        Ok(AwsResponse::ok_json(json!({ "Tags": tags })))
2271    }
2272
2273    // ─── Archive Operations ─────────────────────────────────────────────
2274
2275    fn create_archive(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2276        let body = req.json_body();
2277        validate_required("ArchiveName", &body["ArchiveName"])?;
2278        let name = body["ArchiveName"]
2279            .as_str()
2280            .ok_or_else(|| missing("ArchiveName"))?
2281            .to_string();
2282        validate_string_length("archiveName", &name, 1, 48)?;
2283        validate_required("EventSourceArn", &body["EventSourceArn"])?;
2284        let event_source_arn = body["EventSourceArn"]
2285            .as_str()
2286            .ok_or_else(|| missing("EventSourceArn"))?
2287            .to_string();
2288        validate_string_length("eventSourceArn", &event_source_arn, 1, 1600)?;
2289        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2290        validate_optional_string_length("eventPattern", body["EventPattern"].as_str(), 0, 4096)?;
2291        if let Some(rd) = body["RetentionDays"].as_i64() {
2292            validate_range_i64("retentionDays", rd, 0, i64::MAX)?;
2293        }
2294        let description = body["Description"].as_str().map(|s| s.to_string());
2295        let event_pattern = body["EventPattern"].as_str().map(|s| s.to_string());
2296        let retention_days = body["RetentionDays"].as_i64().unwrap_or(0);
2297
2298        // Validate event pattern if provided
2299        if let Some(ref pattern) = event_pattern {
2300            validate_event_pattern(pattern)?;
2301        }
2302
2303        let mut accounts = self.state.write();
2304        let state = accounts.get_or_create(&req.account_id);
2305
2306        // Validate event bus exists
2307        let bus_name = state.resolve_bus_name(&event_source_arn);
2308        if !state.buses.contains_key(&bus_name) {
2309            return Err(AwsServiceError::aws_error(
2310                StatusCode::BAD_REQUEST,
2311                "ResourceNotFoundException",
2312                format!("Event bus {bus_name} does not exist."),
2313            ));
2314        }
2315
2316        // Check duplicate
2317        if state.archives.contains_key(&name) {
2318            return Err(AwsServiceError::aws_error(
2319                StatusCode::BAD_REQUEST,
2320                "ResourceAlreadyExistsException",
2321                format!("Archive {name} already exists."),
2322            ));
2323        }
2324
2325        let now = Utc::now();
2326        let arn = format!(
2327            "arn:aws:events:{}:{}:archive/{}",
2328            req.region, state.account_id, name
2329        );
2330
2331        let archive = Archive {
2332            name: name.clone(),
2333            arn: arn.clone(),
2334            event_source_arn: event_source_arn.clone(),
2335            description,
2336            event_pattern: event_pattern.clone(),
2337            retention_days,
2338            state: "ENABLED".to_string(),
2339            creation_time: now,
2340            event_count: 0,
2341            size_bytes: 0,
2342            events: Vec::new(),
2343        };
2344        state.archives.insert(name.clone(), archive);
2345
2346        // Create the archive rule
2347        let rule_name = format!("Events-Archive-{name}");
2348        let rule_arn = format!(
2349            "arn:aws:events:{}:{}:rule/{}",
2350            req.region, state.account_id, rule_name
2351        );
2352        // Merge archive event pattern with replay-name filter
2353        let rule_event_pattern = {
2354            let mut merged = if let Some(ref ep) = event_pattern {
2355                serde_json::from_str::<Value>(ep).unwrap_or_else(|_| json!({}))
2356            } else {
2357                json!({})
2358            };
2359            if let Some(obj) = merged.as_object_mut() {
2360                obj.insert("replay-name".to_string(), json!([{"exists": false}]));
2361            }
2362            serde_json::to_string(&merged).unwrap_or_default()
2363        };
2364
2365        // Build the archive target with InputTransformer
2366        let archive_target = EventTarget {
2367            id: name.clone(),
2368            arn: format!("arn:aws:events:{}:::", req.region),
2369            input: None,
2370            input_path: None,
2371            input_transformer: Some(json!({
2372                "InputPathsMap": {},
2373                "InputTemplate": format!(
2374                    "{{\"archive-arn\": \"{}\", \"event\": <aws.events.event.json>, \"ingestion-time\": <aws.events.event.ingestion-time>}}",
2375                    arn
2376                )
2377            })),
2378            sqs_parameters: None,
2379        };
2380
2381        let archive_rule = EventRule {
2382            name: rule_name.clone(),
2383            arn: rule_arn,
2384            event_bus_name: bus_name.clone(),
2385            event_pattern: Some(rule_event_pattern),
2386            schedule_expression: None,
2387            state: "ENABLED".to_string(),
2388            description: None,
2389            role_arn: None,
2390            managed_by: Some("prod.vhs.events.aws.internal".to_string()),
2391            created_by: Some(state.account_id.clone()),
2392            targets: vec![archive_target],
2393            tags: HashMap::new(),
2394            last_fired: None,
2395        };
2396        let key = (bus_name, rule_name);
2397        state.rules.insert(key, archive_rule);
2398
2399        Ok(AwsResponse::ok_json(json!({
2400            "ArchiveArn": arn,
2401            "CreationTime": now.timestamp() as f64,
2402            "State": "ENABLED",
2403        })))
2404    }
2405
2406    fn describe_archive(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2407        let body = req.json_body();
2408        validate_required("ArchiveName", &body["ArchiveName"])?;
2409        let name = body["ArchiveName"]
2410            .as_str()
2411            .ok_or_else(|| missing("ArchiveName"))?;
2412        validate_string_length("archiveName", name, 1, 48)?;
2413
2414        let accounts = self.state.read();
2415        let empty = EventBridgeState::new(&req.account_id, &req.region);
2416        let state = accounts.get(&req.account_id).unwrap_or(&empty);
2417        let archive = state.archives.get(name).ok_or_else(|| {
2418            AwsServiceError::aws_error(
2419                StatusCode::BAD_REQUEST,
2420                "ResourceNotFoundException",
2421                format!("Archive {name} does not exist."),
2422            )
2423        })?;
2424
2425        let mut resp = json!({
2426            "ArchiveArn": archive.arn,
2427            "ArchiveName": archive.name,
2428            "CreationTime": archive.creation_time.timestamp() as f64,
2429            "EventCount": archive.event_count,
2430            "EventSourceArn": archive.event_source_arn,
2431            "RetentionDays": archive.retention_days,
2432            "SizeBytes": archive.size_bytes,
2433            "State": archive.state,
2434        });
2435        if let Some(ref desc) = archive.description {
2436            resp["Description"] = json!(desc);
2437        }
2438        if let Some(ref ep) = archive.event_pattern {
2439            resp["EventPattern"] = json!(ep);
2440        }
2441
2442        Ok(AwsResponse::ok_json(resp))
2443    }
2444
2445    fn list_archives(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2446        let body = req.json_body();
2447        validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 48)?;
2448        validate_optional_string_length(
2449            "eventSourceArn",
2450            body["EventSourceArn"].as_str(),
2451            1,
2452            1600,
2453        )?;
2454        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
2455        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
2456        let name_prefix = body["NamePrefix"].as_str();
2457        let source_arn = body["EventSourceArn"].as_str();
2458        let archive_state = body["State"].as_str();
2459
2460        // Validate at most one filter
2461        let filter_count = [
2462            name_prefix.is_some(),
2463            source_arn.is_some(),
2464            archive_state.is_some(),
2465        ]
2466        .iter()
2467        .filter(|&&x| x)
2468        .count();
2469        if filter_count > 1 {
2470            return Err(AwsServiceError::aws_error(
2471                StatusCode::BAD_REQUEST,
2472                "ValidationException",
2473                "At most one filter is allowed for ListArchives. Use either : State, EventSourceArn, or NamePrefix.",
2474            ));
2475        }
2476
2477        // Validate state
2478        if let Some(s) = archive_state {
2479            let valid = [
2480                "ENABLED",
2481                "DISABLED",
2482                "CREATING",
2483                "UPDATING",
2484                "CREATE_FAILED",
2485                "UPDATE_FAILED",
2486            ];
2487            if !valid.contains(&s) {
2488                return Err(AwsServiceError::aws_error(
2489                    StatusCode::BAD_REQUEST,
2490                    "ValidationException",
2491                    format!(
2492                        "1 validation error detected: Value '{}' at 'state' failed to satisfy constraint: Member must satisfy enum value set: [ENABLED, DISABLED, CREATING, UPDATING, CREATE_FAILED, UPDATE_FAILED]",
2493                        s
2494                    ),
2495                ));
2496            }
2497        }
2498
2499        let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
2500
2501        let accounts = self.state.read();
2502        let empty = EventBridgeState::new(&req.account_id, &req.region);
2503        let state = accounts.get(&req.account_id).unwrap_or(&empty);
2504        let all: Vec<Value> = state
2505            .archives
2506            .values()
2507            .filter(|a| {
2508                if let Some(prefix) = name_prefix {
2509                    a.name.starts_with(prefix)
2510                } else if let Some(arn) = source_arn {
2511                    a.event_source_arn == arn
2512                } else if let Some(s) = archive_state {
2513                    a.state == s
2514                } else {
2515                    true
2516                }
2517            })
2518            .map(|a| {
2519                json!({
2520                    "ArchiveName": a.name,
2521                    "CreationTime": a.creation_time.timestamp() as f64,
2522                    "EventCount": a.event_count,
2523                    "EventSourceArn": a.event_source_arn,
2524                    "RetentionDays": a.retention_days,
2525                    "SizeBytes": a.size_bytes,
2526                    "State": a.state,
2527                })
2528            })
2529            .collect();
2530
2531        let (archives, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
2532        let mut resp = json!({ "Archives": archives });
2533        if let Some(token) = next_token {
2534            resp["NextToken"] = json!(token);
2535        }
2536
2537        Ok(AwsResponse::ok_json(resp))
2538    }
2539
2540    fn update_archive(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2541        let body = req.json_body();
2542        validate_required("ArchiveName", &body["ArchiveName"])?;
2543        let name = body["ArchiveName"]
2544            .as_str()
2545            .ok_or_else(|| missing("ArchiveName"))?;
2546        validate_string_length("archiveName", name, 1, 48)?;
2547        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2548        validate_optional_string_length("eventPattern", body["EventPattern"].as_str(), 0, 4096)?;
2549        if let Some(rd) = body["RetentionDays"].as_i64() {
2550            validate_range_i64("retentionDays", rd, 0, i64::MAX)?;
2551        }
2552
2553        // Validate event pattern if provided
2554        if let Some(pattern) = body["EventPattern"].as_str() {
2555            validate_event_pattern(pattern)?;
2556        }
2557
2558        let mut accounts = self.state.write();
2559        let state = accounts.get_or_create(&req.account_id);
2560        let archive = state.archives.get_mut(name).ok_or_else(|| {
2561            AwsServiceError::aws_error(
2562                StatusCode::BAD_REQUEST,
2563                "ResourceNotFoundException",
2564                format!("Archive {name} does not exist."),
2565            )
2566        })?;
2567
2568        if let Some(desc) = body["Description"].as_str() {
2569            archive.description = Some(desc.to_string());
2570        }
2571        if let Some(pattern) = body["EventPattern"].as_str() {
2572            archive.event_pattern = Some(pattern.to_string());
2573        }
2574        if let Some(days) = body["RetentionDays"].as_i64() {
2575            archive.retention_days = days;
2576        }
2577
2578        Ok(AwsResponse::ok_json(json!({
2579            "ArchiveArn": archive.arn,
2580            "CreationTime": archive.creation_time.timestamp() as f64,
2581            "State": archive.state,
2582        })))
2583    }
2584
2585    fn delete_archive(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2586        let body = req.json_body();
2587        validate_required("ArchiveName", &body["ArchiveName"])?;
2588        let name = body["ArchiveName"]
2589            .as_str()
2590            .ok_or_else(|| missing("ArchiveName"))?;
2591        validate_string_length("archiveName", name, 1, 48)?;
2592
2593        let mut accounts = self.state.write();
2594        let state = accounts.get_or_create(&req.account_id);
2595        if !state.archives.contains_key(name) {
2596            return Err(AwsServiceError::aws_error(
2597                StatusCode::BAD_REQUEST,
2598                "ResourceNotFoundException",
2599                format!("Archive {name} does not exist."),
2600            ));
2601        }
2602
2603        state.archives.remove(name);
2604
2605        // Remove the archive rule
2606        let rule_name = format!("Events-Archive-{name}");
2607        state.rules.retain(|k, _| k.1 != rule_name);
2608
2609        Ok(AwsResponse::ok_json(json!({})))
2610    }
2611
2612    // ─── Connection Operations ──────────────────────────────────────────
2613
2614    fn create_connection(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2615        let body = req.json_body();
2616        validate_required("Name", &body["Name"])?;
2617        let name = body["Name"]
2618            .as_str()
2619            .ok_or_else(|| missing("Name"))?
2620            .to_string();
2621        validate_string_length("name", &name, 1, 64)?;
2622        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2623        validate_required("AuthorizationType", &body["AuthorizationType"])?;
2624        let description = body["Description"].as_str().map(|s| s.to_string());
2625        let auth_type = body["AuthorizationType"]
2626            .as_str()
2627            .ok_or_else(|| missing("AuthorizationType"))?
2628            .to_string();
2629        validate_enum(
2630            "authorizationType",
2631            &auth_type,
2632            &["BASIC", "OAUTH_CLIENT_CREDENTIALS", "API_KEY"],
2633        )?;
2634        validate_optional_string_length(
2635            "kmsKeyIdentifier",
2636            body["KmsKeyIdentifier"].as_str(),
2637            0,
2638            2048,
2639        )?;
2640        validate_required("AuthParameters", &body["AuthParameters"])?;
2641        let auth_params = body["AuthParameters"].clone();
2642
2643        let mut accounts = self.state.write();
2644        let state = accounts.get_or_create(&req.account_id);
2645        let now = Utc::now();
2646        let conn_uuid = uuid::Uuid::new_v4();
2647        let arn = format!(
2648            "arn:aws:events:{}:{}:connection/{}/{}",
2649            req.region, state.account_id, name, conn_uuid
2650        );
2651        let secret_arn = format!(
2652            "arn:aws:secretsmanager:{}:{}:secret:events!connection/{}/{}",
2653            req.region, state.account_id, name, conn_uuid
2654        );
2655
2656        let conn = Connection {
2657            name: name.clone(),
2658            arn: arn.clone(),
2659            description,
2660            authorization_type: auth_type.clone(),
2661            auth_parameters: auth_params,
2662            connection_state: "AUTHORIZED".to_string(),
2663            secret_arn: secret_arn.clone(),
2664            creation_time: now,
2665            last_modified_time: now,
2666            last_authorized_time: now,
2667        };
2668        state.connections.insert(name, conn);
2669
2670        Ok(AwsResponse::ok_json(json!({
2671            "ConnectionArn": arn,
2672            "ConnectionState": "AUTHORIZED",
2673            "CreationTime": now.timestamp() as f64,
2674            "LastModifiedTime": now.timestamp() as f64,
2675        })))
2676    }
2677
2678    fn describe_connection(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2679        let body = req.json_body();
2680        validate_required("Name", &body["Name"])?;
2681        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
2682        validate_string_length("name", name, 1, 64)?;
2683
2684        let accounts = self.state.read();
2685        let empty = EventBridgeState::new(&req.account_id, &req.region);
2686        let state = accounts.get(&req.account_id).unwrap_or(&empty);
2687        let conn = state.connections.get(name).ok_or_else(|| {
2688            AwsServiceError::aws_error(
2689                StatusCode::BAD_REQUEST,
2690                "ResourceNotFoundException",
2691                format!("Connection '{name}' does not exist."),
2692            )
2693        })?;
2694
2695        // Build auth parameters response - strip secrets
2696        let auth_params_response =
2697            build_auth_params_response(&conn.authorization_type, &conn.auth_parameters);
2698
2699        let mut resp = json!({
2700            "ConnectionArn": conn.arn,
2701            "Name": conn.name,
2702            "AuthorizationType": conn.authorization_type,
2703            "AuthParameters": auth_params_response,
2704            "ConnectionState": conn.connection_state,
2705            "SecretArn": conn.secret_arn,
2706            "CreationTime": conn.creation_time.timestamp() as f64,
2707            "LastModifiedTime": conn.last_modified_time.timestamp() as f64,
2708            "LastAuthorizedTime": conn.last_authorized_time.timestamp() as f64,
2709        });
2710        if let Some(ref desc) = conn.description {
2711            resp["Description"] = json!(desc);
2712        }
2713
2714        Ok(AwsResponse::ok_json(resp))
2715    }
2716
2717    fn list_connections(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2718        let body = req.json_body();
2719        validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 64)?;
2720        validate_optional_enum(
2721            "connectionState",
2722            body["ConnectionState"].as_str(),
2723            &[
2724                "CREATING",
2725                "UPDATING",
2726                "DELETING",
2727                "AUTHORIZED",
2728                "DEAUTHORIZED",
2729                "AUTHORIZING",
2730                "DEAUTHORIZING",
2731                "ACTIVE",
2732                "FAILED_CONNECTIVITY",
2733            ],
2734        )?;
2735        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
2736        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
2737
2738        let name_prefix = body["NamePrefix"].as_str();
2739        let connection_state = body["ConnectionState"].as_str();
2740        let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
2741
2742        let accounts = self.state.read();
2743        let empty = EventBridgeState::new(&req.account_id, &req.region);
2744        let state = accounts.get(&req.account_id).unwrap_or(&empty);
2745        let all: Vec<Value> = state
2746            .connections
2747            .values()
2748            .filter(|c| {
2749                if let Some(prefix) = name_prefix {
2750                    if !c.name.starts_with(prefix) {
2751                        return false;
2752                    }
2753                }
2754                if let Some(cs) = connection_state {
2755                    if c.connection_state != cs {
2756                        return false;
2757                    }
2758                }
2759                true
2760            })
2761            .map(|c| {
2762                json!({
2763                    "ConnectionArn": c.arn,
2764                    "Name": c.name,
2765                    "AuthorizationType": c.authorization_type,
2766                    "ConnectionState": c.connection_state,
2767                    "CreationTime": c.creation_time.timestamp() as f64,
2768                    "LastModifiedTime": c.last_modified_time.timestamp() as f64,
2769                    "LastAuthorizedTime": c.last_authorized_time.timestamp() as f64,
2770                })
2771            })
2772            .collect();
2773
2774        let (conns, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
2775        let mut resp = json!({ "Connections": conns });
2776        if let Some(token) = next_token {
2777            resp["NextToken"] = json!(token);
2778        }
2779
2780        Ok(AwsResponse::ok_json(resp))
2781    }
2782
2783    fn update_connection(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2784        let body = req.json_body();
2785        validate_required("Name", &body["Name"])?;
2786        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
2787        validate_string_length("name", name, 1, 64)?;
2788        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2789        validate_optional_enum(
2790            "authorizationType",
2791            body["AuthorizationType"].as_str(),
2792            &["BASIC", "OAUTH_CLIENT_CREDENTIALS", "API_KEY"],
2793        )?;
2794
2795        let mut accounts = self.state.write();
2796        let state = accounts.get_or_create(&req.account_id);
2797        let conn = state.connections.get_mut(name).ok_or_else(|| {
2798            AwsServiceError::aws_error(
2799                StatusCode::BAD_REQUEST,
2800                "ResourceNotFoundException",
2801                format!("Connection '{name}' does not exist."),
2802            )
2803        })?;
2804
2805        if let Some(desc) = body["Description"].as_str() {
2806            conn.description = Some(desc.to_string());
2807        }
2808        if let Some(auth_type) = body["AuthorizationType"].as_str() {
2809            conn.authorization_type = auth_type.to_string();
2810        }
2811        if body.get("AuthParameters").is_some() {
2812            conn.auth_parameters = body["AuthParameters"].clone();
2813        }
2814        conn.last_modified_time = Utc::now();
2815
2816        Ok(AwsResponse::ok_json(json!({
2817            "ConnectionArn": conn.arn,
2818            "ConnectionState": conn.connection_state,
2819            "CreationTime": conn.creation_time.timestamp() as f64,
2820            "LastModifiedTime": conn.last_modified_time.timestamp() as f64,
2821            "LastAuthorizedTime": conn.last_authorized_time.timestamp() as f64,
2822        })))
2823    }
2824
2825    fn delete_connection(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2826        let body = req.json_body();
2827        validate_required("Name", &body["Name"])?;
2828        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
2829        validate_string_length("name", name, 1, 64)?;
2830
2831        let mut accounts = self.state.write();
2832        let state = accounts.get_or_create(&req.account_id);
2833        let conn = state.connections.remove(name).ok_or_else(|| {
2834            AwsServiceError::aws_error(
2835                StatusCode::BAD_REQUEST,
2836                "ResourceNotFoundException",
2837                format!("Connection '{name}' does not exist."),
2838            )
2839        })?;
2840
2841        Ok(AwsResponse::ok_json(json!({
2842            "ConnectionArn": conn.arn,
2843            "ConnectionState": conn.connection_state,
2844            "CreationTime": conn.creation_time.timestamp() as f64,
2845            "LastModifiedTime": conn.last_modified_time.timestamp() as f64,
2846            "LastAuthorizedTime": conn.last_authorized_time.timestamp() as f64,
2847        })))
2848    }
2849
2850    // ─── API Destination Operations ─────────────────────────────────────
2851
2852    fn create_api_destination(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2853        let body = req.json_body();
2854        validate_required("Name", &body["Name"])?;
2855        let name = body["Name"]
2856            .as_str()
2857            .ok_or_else(|| missing("Name"))?
2858            .to_string();
2859        validate_string_length("name", &name, 1, 64)?;
2860        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2861        validate_required("ConnectionArn", &body["ConnectionArn"])?;
2862        let description = body["Description"].as_str().map(|s| s.to_string());
2863        let connection_arn = body["ConnectionArn"]
2864            .as_str()
2865            .ok_or_else(|| missing("ConnectionArn"))?
2866            .to_string();
2867        validate_string_length("connectionArn", &connection_arn, 1, 1600)?;
2868        validate_required("InvocationEndpoint", &body["InvocationEndpoint"])?;
2869        let endpoint = body["InvocationEndpoint"]
2870            .as_str()
2871            .ok_or_else(|| missing("InvocationEndpoint"))?
2872            .to_string();
2873        validate_string_length("invocationEndpoint", &endpoint, 1, 2048)?;
2874        validate_required("HttpMethod", &body["HttpMethod"])?;
2875        let http_method = body["HttpMethod"]
2876            .as_str()
2877            .ok_or_else(|| missing("HttpMethod"))?
2878            .to_string();
2879        validate_enum(
2880            "httpMethod",
2881            &http_method,
2882            &["POST", "GET", "HEAD", "OPTIONS", "PUT", "PATCH", "DELETE"],
2883        )?;
2884        let rate_limit = body["InvocationRateLimitPerSecond"].as_i64();
2885        if let Some(r) = rate_limit {
2886            validate_range_i64("invocationRateLimitPerSecond", r, 1, i64::MAX)?;
2887        }
2888
2889        let mut accounts = self.state.write();
2890        let state = accounts.get_or_create(&req.account_id);
2891        let now = Utc::now();
2892        let dest_uuid = uuid::Uuid::new_v4();
2893        let arn = format!(
2894            "arn:aws:events:{}:{}:api-destination/{}/{}",
2895            req.region, state.account_id, name, dest_uuid
2896        );
2897
2898        let dest = ApiDestination {
2899            name: name.clone(),
2900            arn: arn.clone(),
2901            description,
2902            connection_arn,
2903            invocation_endpoint: endpoint,
2904            http_method,
2905            invocation_rate_limit_per_second: rate_limit,
2906            state: "ACTIVE".to_string(),
2907            creation_time: now,
2908            last_modified_time: now,
2909        };
2910        state.api_destinations.insert(name, dest);
2911
2912        Ok(AwsResponse::ok_json(json!({
2913            "ApiDestinationArn": arn,
2914            "ApiDestinationState": "ACTIVE",
2915            "CreationTime": now.timestamp() as f64,
2916            "LastModifiedTime": now.timestamp() as f64,
2917        })))
2918    }
2919
2920    fn describe_api_destination(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2921        let body = req.json_body();
2922        validate_required("Name", &body["Name"])?;
2923        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
2924        validate_string_length("name", name, 1, 64)?;
2925
2926        let accounts = self.state.read();
2927        let empty = EventBridgeState::new(&req.account_id, &req.region);
2928        let state = accounts.get(&req.account_id).unwrap_or(&empty);
2929        let dest = state.api_destinations.get(name).ok_or_else(|| {
2930            AwsServiceError::aws_error(
2931                StatusCode::BAD_REQUEST,
2932                "ResourceNotFoundException",
2933                format!("An api-destination '{name}' does not exist."),
2934            )
2935        })?;
2936
2937        let mut resp = json!({
2938            "ApiDestinationArn": dest.arn,
2939            "Name": dest.name,
2940            "ConnectionArn": dest.connection_arn,
2941            "InvocationEndpoint": dest.invocation_endpoint,
2942            "HttpMethod": dest.http_method,
2943            "ApiDestinationState": dest.state,
2944            "CreationTime": dest.creation_time.timestamp() as f64,
2945            "LastModifiedTime": dest.last_modified_time.timestamp() as f64,
2946        });
2947        if let Some(ref desc) = dest.description {
2948            resp["Description"] = json!(desc);
2949        }
2950        if let Some(rate) = dest.invocation_rate_limit_per_second {
2951            resp["InvocationRateLimitPerSecond"] = json!(rate);
2952        }
2953
2954        Ok(AwsResponse::ok_json(resp))
2955    }
2956
2957    fn list_api_destinations(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2958        let body = req.json_body();
2959        validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 64)?;
2960        validate_optional_string_length("connectionArn", body["ConnectionArn"].as_str(), 1, 1600)?;
2961        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
2962        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
2963
2964        let name_prefix = body["NamePrefix"].as_str();
2965        let connection_arn = body["ConnectionArn"].as_str();
2966        let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
2967
2968        let accounts = self.state.read();
2969        let empty = EventBridgeState::new(&req.account_id, &req.region);
2970        let state = accounts.get(&req.account_id).unwrap_or(&empty);
2971        let all: Vec<Value> = state
2972            .api_destinations
2973            .values()
2974            .filter(|d| {
2975                if let Some(prefix) = name_prefix {
2976                    if !d.name.starts_with(prefix) {
2977                        return false;
2978                    }
2979                }
2980                if let Some(arn) = connection_arn {
2981                    if d.connection_arn != arn {
2982                        return false;
2983                    }
2984                }
2985                true
2986            })
2987            .map(|d| {
2988                let mut obj = json!({
2989                    "ApiDestinationArn": d.arn,
2990                    "Name": d.name,
2991                    "ConnectionArn": d.connection_arn,
2992                    "InvocationEndpoint": d.invocation_endpoint,
2993                    "HttpMethod": d.http_method,
2994                    "ApiDestinationState": d.state,
2995                    "CreationTime": d.creation_time.timestamp() as f64,
2996                    "LastModifiedTime": d.last_modified_time.timestamp() as f64,
2997                });
2998                if let Some(rate) = d.invocation_rate_limit_per_second {
2999                    obj["InvocationRateLimitPerSecond"] = json!(rate);
3000                }
3001                obj
3002            })
3003            .collect();
3004
3005        let (dests, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
3006        let mut resp = json!({ "ApiDestinations": dests });
3007        if let Some(token) = next_token {
3008            resp["NextToken"] = json!(token);
3009        }
3010
3011        Ok(AwsResponse::ok_json(resp))
3012    }
3013
3014    fn update_api_destination(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3015        let body = req.json_body();
3016        validate_required("Name", &body["Name"])?;
3017        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
3018        validate_string_length("name", name, 1, 64)?;
3019        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
3020        validate_optional_string_length("connectionArn", body["ConnectionArn"].as_str(), 1, 1600)?;
3021        validate_optional_string_length(
3022            "invocationEndpoint",
3023            body["InvocationEndpoint"].as_str(),
3024            1,
3025            2048,
3026        )?;
3027        validate_optional_enum(
3028            "httpMethod",
3029            body["HttpMethod"].as_str(),
3030            &["POST", "GET", "HEAD", "OPTIONS", "PUT", "PATCH", "DELETE"],
3031        )?;
3032        if let Some(r) = body["InvocationRateLimitPerSecond"].as_i64() {
3033            validate_range_i64("invocationRateLimitPerSecond", r, 1, i64::MAX)?;
3034        }
3035
3036        let mut accounts = self.state.write();
3037        let state = accounts.get_or_create(&req.account_id);
3038        let dest = state.api_destinations.get_mut(name).ok_or_else(|| {
3039            AwsServiceError::aws_error(
3040                StatusCode::BAD_REQUEST,
3041                "ResourceNotFoundException",
3042                format!("An api-destination '{name}' does not exist."),
3043            )
3044        })?;
3045
3046        if let Some(desc) = body["Description"].as_str() {
3047            dest.description = Some(desc.to_string());
3048        }
3049        if let Some(endpoint) = body["InvocationEndpoint"].as_str() {
3050            dest.invocation_endpoint = endpoint.to_string();
3051        }
3052        if let Some(method) = body["HttpMethod"].as_str() {
3053            dest.http_method = method.to_string();
3054        }
3055        if let Some(rate) = body["InvocationRateLimitPerSecond"].as_i64() {
3056            dest.invocation_rate_limit_per_second = Some(rate);
3057        }
3058        if let Some(conn) = body["ConnectionArn"].as_str() {
3059            dest.connection_arn = conn.to_string();
3060        }
3061        dest.last_modified_time = Utc::now();
3062
3063        Ok(AwsResponse::ok_json(json!({
3064            "ApiDestinationArn": dest.arn,
3065            "ApiDestinationState": dest.state,
3066            "CreationTime": dest.creation_time.timestamp() as f64,
3067            "LastModifiedTime": dest.last_modified_time.timestamp() as f64,
3068        })))
3069    }
3070
3071    fn delete_api_destination(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3072        let body = req.json_body();
3073        validate_required("Name", &body["Name"])?;
3074        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
3075        validate_string_length("name", name, 1, 64)?;
3076
3077        let mut accounts = self.state.write();
3078        let state = accounts.get_or_create(&req.account_id);
3079        if !state.api_destinations.contains_key(name) {
3080            return Err(AwsServiceError::aws_error(
3081                StatusCode::BAD_REQUEST,
3082                "ResourceNotFoundException",
3083                format!("An api-destination '{name}' does not exist."),
3084            ));
3085        }
3086        state.api_destinations.remove(name);
3087
3088        Ok(AwsResponse::ok_json(json!({})))
3089    }
3090
3091    // ─── Replay Operations ──────────────────────────────────────────────
3092
3093    fn start_replay(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3094        let input = StartReplayInput::from_body(&req.json_body())?;
3095
3096        let mut accounts = self.state.write();
3097        let state = accounts.get_or_create(&req.account_id);
3098
3099        // Validate event bus + archive, in the order the real service validates them.
3100        let bus_name = state.resolve_bus_name(&input.destination_arn);
3101        if !state.buses.contains_key(&bus_name) {
3102            return Err(AwsServiceError::aws_error(
3103                StatusCode::BAD_REQUEST,
3104                "ResourceNotFoundException",
3105                format!("Event bus {bus_name} does not exist."),
3106            ));
3107        }
3108
3109        let archive_name = input
3110            .event_source_arn
3111            .rsplit_once("archive/")
3112            .map(|(_, n)| n.to_string())
3113            .unwrap_or_default();
3114        let archive = state.archives.get(&archive_name).ok_or_else(|| {
3115            AwsServiceError::aws_error(
3116                StatusCode::BAD_REQUEST,
3117                "ValidationException",
3118                format!(
3119                    "Parameter EventSourceArn is not valid. Reason: Archive {archive_name} does not exist."
3120                ),
3121            )
3122        })?;
3123        let archive_bus = state.resolve_bus_name(&archive.event_source_arn);
3124        if archive_bus != bus_name {
3125            return Err(AwsServiceError::aws_error(
3126                StatusCode::BAD_REQUEST,
3127                "ValidationException",
3128                "Parameter Destination.Arn is not valid. Reason: Cross event bus replay is not permitted.",
3129            ));
3130        }
3131
3132        if input.event_end_time <= input.event_start_time {
3133            return Err(AwsServiceError::aws_error(
3134                StatusCode::BAD_REQUEST,
3135                "ValidationException",
3136                "Parameter EventEndTime is not valid. Reason: EventStartTime must be before EventEndTime.",
3137            ));
3138        }
3139
3140        if state.replays.contains_key(&input.name) {
3141            return Err(AwsServiceError::aws_error(
3142                StatusCode::BAD_REQUEST,
3143                "ResourceAlreadyExistsException",
3144                format!("Replay {} already exists.", input.name),
3145            ));
3146        }
3147
3148        let now = Utc::now();
3149        let arn = format!(
3150            "arn:aws:events:{}:{}:replay/{}",
3151            req.region, state.account_id, input.name
3152        );
3153
3154        let events_to_deliver = collect_replay_events_with_targets(
3155            state,
3156            &archive_name,
3157            &bus_name,
3158            input.event_start_time,
3159            input.event_end_time,
3160            &req.account_id,
3161            &req.region,
3162        );
3163
3164        let replay = Replay {
3165            name: input.name.clone(),
3166            arn: arn.clone(),
3167            description: input.description,
3168            event_source_arn: input.event_source_arn,
3169            destination: input.destination,
3170            event_start_time: input.event_start_time,
3171            event_end_time: input.event_end_time,
3172            state: "COMPLETED".to_string(),
3173            replay_start_time: now,
3174            replay_end_time: Some(now),
3175        };
3176        state.replays.insert(input.name, replay);
3177
3178        drop(accounts);
3179
3180        for (event, targets) in events_to_deliver {
3181            let detail_value: Value = serde_json::from_str(&event.detail).unwrap_or(json!({}));
3182            let event_json = json!({
3183                "version": "0",
3184                "id": event.event_id,
3185                "source": event.source,
3186                "account": req.account_id,
3187                "detail-type": event.detail_type,
3188                "detail": detail_value,
3189                "time": event.time.format("%Y-%m-%dT%H:%M:%SZ").to_string(),
3190                "region": req.region,
3191                "resources": event.resources,
3192                "replay-name": arn,
3193            });
3194            let event_str = event_json.to_string();
3195
3196            for target in targets {
3197                self.deliver_replay_event_to_target(
3198                    &target,
3199                    &event,
3200                    &event_json,
3201                    &event_str,
3202                    &req.account_id,
3203                );
3204            }
3205        }
3206
3207        Ok(AwsResponse::ok_json(json!({
3208            "ReplayArn": arn,
3209            "ReplayStartTime": now.timestamp() as f64,
3210            "State": "STARTING",
3211        })))
3212    }
3213
3214    fn deliver_replay_event_to_target(
3215        &self,
3216        target: &EventTarget,
3217        event: &PutEvent,
3218        event_json: &Value,
3219        event_str: &str,
3220        account_id: &str,
3221    ) {
3222        let target_arn = &target.arn;
3223        let body_str = if let Some(ref transformer) = target.input_transformer {
3224            apply_input_transformer(transformer, event_json)
3225        } else if let Some(ref input) = target.input {
3226            input.clone()
3227        } else if let Some(ref input_path) = target.input_path {
3228            resolve_json_path(event_json, input_path)
3229                .map(|v| v.to_string())
3230                .unwrap_or_else(|| event_str.to_string())
3231        } else {
3232            event_str.to_string()
3233        };
3234
3235        if target_arn.contains(":sqs:") {
3236            let group_id = target
3237                .sqs_parameters
3238                .as_ref()
3239                .and_then(|p| p["MessageGroupId"].as_str())
3240                .map(|s| s.to_string());
3241            if group_id.is_some() {
3242                self.delivery.send_to_sqs_with_attrs(
3243                    target_arn,
3244                    &body_str,
3245                    &HashMap::new(),
3246                    group_id.as_deref(),
3247                    None,
3248                );
3249            } else {
3250                self.delivery
3251                    .send_to_sqs(target_arn, &body_str, &HashMap::new());
3252            }
3253        } else if target_arn.contains(":sns:") {
3254            self.delivery
3255                .publish_to_sns(target_arn, &body_str, Some(&event.detail_type));
3256        } else if target_arn.contains(":lambda:") {
3257            let mut accounts = self.state.write();
3258            let state = accounts.get_or_create(account_id);
3259            state
3260                .lambda_invocations
3261                .push(crate::state::LambdaInvocation {
3262                    function_arn: target_arn.clone(),
3263                    payload: body_str.clone(),
3264                    timestamp: Utc::now(),
3265                });
3266            drop(accounts);
3267            if let Some(ref ls) = self.lambda_state {
3268                ls.write()
3269                    .get_or_create(account_id)
3270                    .invocations
3271                    .push(LambdaInvocation {
3272                        function_arn: target_arn.clone(),
3273                        payload: body_str.clone(),
3274                        timestamp: Utc::now(),
3275                        source: "aws:events".to_string(),
3276                    });
3277            }
3278            invoke_lambda_async(
3279                &self.container_runtime,
3280                &self.lambda_state,
3281                target_arn,
3282                &body_str,
3283            );
3284        } else if target_arn.contains(":logs:") {
3285            let mut accounts = self.state.write();
3286            let state = accounts.get_or_create(account_id);
3287            state.log_deliveries.push(crate::state::LogDelivery {
3288                log_group_arn: target_arn.clone(),
3289                payload: body_str.clone(),
3290                timestamp: Utc::now(),
3291            });
3292            drop(accounts);
3293            if let Some(ref log_state) = self.logs_state {
3294                deliver_to_logs(log_state, target_arn, &body_str, Utc::now());
3295            }
3296        } else if target_arn.contains(":states:") {
3297            self.delivery
3298                .start_stepfunctions_execution(target_arn, &body_str);
3299            let mut accounts = self.state.write();
3300            let state = accounts.get_or_create(account_id);
3301            state
3302                .step_function_executions
3303                .push(crate::state::StepFunctionExecution {
3304                    state_machine_arn: target_arn.clone(),
3305                    payload: body_str.clone(),
3306                    timestamp: Utc::now(),
3307                });
3308        } else if target_arn.starts_with("https://") || target_arn.starts_with("http://") {
3309            let url = target_arn.clone();
3310            let payload = body_str.clone();
3311            tokio::spawn(async move {
3312                let client = reqwest::Client::new();
3313                let _ = client
3314                    .post(&url)
3315                    .header("Content-Type", "application/json")
3316                    .body(payload)
3317                    .send()
3318                    .await;
3319            });
3320        }
3321    }
3322
3323    fn describe_replay(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3324        let body = req.json_body();
3325        validate_required("ReplayName", &body["ReplayName"])?;
3326        let name = body["ReplayName"]
3327            .as_str()
3328            .ok_or_else(|| missing("ReplayName"))?;
3329        validate_string_length("replayName", name, 1, 64)?;
3330
3331        let accounts = self.state.read();
3332        let empty = EventBridgeState::new(&req.account_id, &req.region);
3333        let state = accounts.get(&req.account_id).unwrap_or(&empty);
3334        let replay = state.replays.get(name).ok_or_else(|| {
3335            AwsServiceError::aws_error(
3336                StatusCode::BAD_REQUEST,
3337                "ResourceNotFoundException",
3338                format!("Replay {name} does not exist."),
3339            )
3340        })?;
3341
3342        let mut resp = json!({
3343            "Destination": replay.destination,
3344            "EventSourceArn": replay.event_source_arn,
3345            "EventStartTime": replay.event_start_time.timestamp() as f64,
3346            "EventEndTime": replay.event_end_time.timestamp() as f64,
3347            "ReplayArn": replay.arn,
3348            "ReplayName": replay.name,
3349            "ReplayStartTime": replay.replay_start_time.timestamp() as f64,
3350            "State": replay.state,
3351        });
3352        if let Some(ref desc) = replay.description {
3353            resp["Description"] = json!(desc);
3354        }
3355        if let Some(ref end) = replay.replay_end_time {
3356            resp["ReplayEndTime"] = json!(end.timestamp() as f64);
3357        }
3358
3359        Ok(AwsResponse::ok_json(resp))
3360    }
3361
3362    fn list_replays(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3363        let body = req.json_body();
3364        validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 64)?;
3365        validate_optional_string_length(
3366            "eventSourceArn",
3367            body["EventSourceArn"].as_str(),
3368            1,
3369            1600,
3370        )?;
3371        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
3372        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
3373        let name_prefix = body["NamePrefix"].as_str();
3374        let source_arn = body["EventSourceArn"].as_str();
3375        let replay_state = body["State"].as_str();
3376
3377        // Validate at most one filter
3378        let filter_count = [
3379            name_prefix.is_some(),
3380            source_arn.is_some(),
3381            replay_state.is_some(),
3382        ]
3383        .iter()
3384        .filter(|&&x| x)
3385        .count();
3386        if filter_count > 1 {
3387            return Err(AwsServiceError::aws_error(
3388                StatusCode::BAD_REQUEST,
3389                "ValidationException",
3390                "At most one filter is allowed for ListReplays. Use either : State, EventSourceArn, or NamePrefix.",
3391            ));
3392        }
3393
3394        // Validate state
3395        if let Some(s) = replay_state {
3396            let valid = [
3397                "CANCELLED",
3398                "CANCELLING",
3399                "COMPLETED",
3400                "FAILED",
3401                "RUNNING",
3402                "STARTING",
3403            ];
3404            if !valid.contains(&s) {
3405                return Err(AwsServiceError::aws_error(
3406                    StatusCode::BAD_REQUEST,
3407                    "ValidationException",
3408                    format!(
3409                        "1 validation error detected: Value '{}' at 'state' failed to satisfy constraint: Member must satisfy enum value set: [CANCELLED, CANCELLING, COMPLETED, FAILED, RUNNING, STARTING]",
3410                        s
3411                    ),
3412                ));
3413            }
3414        }
3415
3416        let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
3417
3418        let accounts = self.state.read();
3419        let empty = EventBridgeState::new(&req.account_id, &req.region);
3420        let state = accounts.get(&req.account_id).unwrap_or(&empty);
3421        let all: Vec<Value> = state
3422            .replays
3423            .values()
3424            .filter(|r| {
3425                if let Some(prefix) = name_prefix {
3426                    r.name.starts_with(prefix)
3427                } else if let Some(arn) = source_arn {
3428                    r.event_source_arn == arn
3429                } else if let Some(s) = replay_state {
3430                    r.state == s
3431                } else {
3432                    true
3433                }
3434            })
3435            .map(|r| {
3436                let mut obj = json!({
3437                    "EventSourceArn": r.event_source_arn,
3438                    "EventStartTime": r.event_start_time.timestamp() as f64,
3439                    "EventEndTime": r.event_end_time.timestamp() as f64,
3440                    "ReplayName": r.name,
3441                    "ReplayStartTime": r.replay_start_time.timestamp() as f64,
3442                    "State": r.state,
3443                });
3444                if let Some(ref end) = r.replay_end_time {
3445                    obj["ReplayEndTime"] = json!(end.timestamp() as f64);
3446                }
3447                obj
3448            })
3449            .collect();
3450
3451        let (replays, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
3452        let mut resp = json!({ "Replays": replays });
3453        if let Some(token) = next_token {
3454            resp["NextToken"] = json!(token);
3455        }
3456
3457        Ok(AwsResponse::ok_json(resp))
3458    }
3459
3460    fn cancel_replay(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3461        let body = req.json_body();
3462        validate_required("ReplayName", &body["ReplayName"])?;
3463        let name = body["ReplayName"]
3464            .as_str()
3465            .ok_or_else(|| missing("ReplayName"))?;
3466        validate_string_length("replayName", name, 1, 64)?;
3467
3468        let mut accounts = self.state.write();
3469        let state = accounts.get_or_create(&req.account_id);
3470        let replay = state.replays.get_mut(name).ok_or_else(|| {
3471            AwsServiceError::aws_error(
3472                StatusCode::BAD_REQUEST,
3473                "ResourceNotFoundException",
3474                format!("Replay {name} does not exist."),
3475            )
3476        })?;
3477
3478        // Can only cancel STARTING or RUNNING replays (or COMPLETED in our mock)
3479        if replay.state == "CANCELLED" || replay.state == "CANCELLING" {
3480            return Err(AwsServiceError::aws_error(
3481                StatusCode::BAD_REQUEST,
3482                "IllegalStatusException",
3483                format!("Replay {name} is not in a valid state for this operation."),
3484            ));
3485        }
3486
3487        let arn = replay.arn.clone();
3488        replay.state = "CANCELLED".to_string();
3489
3490        Ok(AwsResponse::ok_json(json!({
3491            "ReplayArn": arn,
3492            "State": "CANCELLING",
3493        })))
3494    }
3495}
3496
3497// ─── Tag Lookup Helpers ─────────────────────────────────────────────────
3498
3499fn find_tags_mut<'a>(
3500    state: &'a mut crate::state::EventBridgeState,
3501    arn: &str,
3502) -> Result<&'a mut HashMap<String, String>, AwsServiceError> {
3503    // Check buses
3504    for bus in state.buses.values_mut() {
3505        if bus.arn == arn {
3506            return Ok(&mut bus.tags);
3507        }
3508    }
3509    // Check rules
3510    for rule in state.rules.values_mut() {
3511        if rule.arn == arn {
3512            return Ok(&mut rule.tags);
3513        }
3514    }
3515
3516    // Parse ARN to give better error messages
3517    let error_msg = if arn.contains(":rule/") {
3518        // Extract rule name and bus from ARN
3519        let parts: Vec<&str> = arn.rsplitn(2, ":rule/").collect();
3520        if let Some(rule_path) = parts.first() {
3521            if let Some((bus, rule_name)) = rule_path.rsplit_once('/') {
3522                format!("Rule {rule_name} does not exist on EventBus {bus}.")
3523            } else {
3524                format!("Rule {} does not exist on EventBus default.", rule_path)
3525            }
3526        } else {
3527            format!("Resource {arn} not found.")
3528        }
3529    } else {
3530        format!("Resource {arn} not found.")
3531    };
3532
3533    Err(AwsServiceError::aws_error(
3534        StatusCode::BAD_REQUEST,
3535        "ResourceNotFoundException",
3536        error_msg,
3537    ))
3538}
3539
3540fn find_tags<'a>(
3541    state: &'a crate::state::EventBridgeState,
3542    arn: &str,
3543) -> Result<&'a HashMap<String, String>, AwsServiceError> {
3544    for bus in state.buses.values() {
3545        if bus.arn == arn {
3546            return Ok(&bus.tags);
3547        }
3548    }
3549    for rule in state.rules.values() {
3550        if rule.arn == arn {
3551            return Ok(&rule.tags);
3552        }
3553    }
3554
3555    let error_msg = if arn.contains(":rule/") {
3556        let parts: Vec<&str> = arn.rsplitn(2, ":rule/").collect();
3557        if let Some(rule_path) = parts.first() {
3558            if let Some((bus, rule_name)) = rule_path.rsplit_once('/') {
3559                format!("Rule {rule_name} does not exist on EventBus {bus}.")
3560            } else {
3561                format!("Rule {} does not exist on EventBus default.", rule_path)
3562            }
3563        } else {
3564            format!("Resource {arn} not found.")
3565        }
3566    } else {
3567        format!("Resource {arn} not found.")
3568    };
3569
3570    Err(AwsServiceError::aws_error(
3571        StatusCode::BAD_REQUEST,
3572        "ResourceNotFoundException",
3573        error_msg,
3574    ))
3575}
3576
3577// ─── Event Pattern Validation ────────────────────────────────────────
3578
3579fn validate_event_pattern(pattern: &str) -> Result<(), AwsServiceError> {
3580    let parsed: Value = serde_json::from_str(pattern).map_err(|_| {
3581        AwsServiceError::aws_error(
3582            StatusCode::BAD_REQUEST,
3583            "InvalidEventPatternException",
3584            "Event pattern is not valid. Reason: Invalid JSON",
3585        )
3586    })?;
3587
3588    validate_pattern_values(&parsed, "")?;
3589    Ok(())
3590}
3591
3592fn validate_pattern_values(value: &Value, path: &str) -> Result<(), AwsServiceError> {
3593    match value {
3594        Value::Object(obj) => {
3595            for (key, val) in obj {
3596                let new_path = if path.is_empty() {
3597                    key.clone()
3598                } else {
3599                    format!("{path}.{key}")
3600                };
3601                match val {
3602                    Value::Object(_) => validate_pattern_values(val, &new_path)?,
3603                    Value::Array(_) => {} // Arrays are fine at leaf level
3604                    _ => {
3605                        return Err(AwsServiceError::aws_error(
3606                            StatusCode::BAD_REQUEST,
3607                            "InvalidEventPatternException",
3608                            format!(
3609                                "Event pattern is not valid. Reason: '{}' must be an object or an array",
3610                                key
3611                            ),
3612                        ));
3613                    }
3614                }
3615            }
3616            Ok(())
3617        }
3618        _ => Ok(()),
3619    }
3620}
3621
3622// ─── Connection Auth Params Response Builder ────────────────────────
3623
3624fn build_auth_params_response(auth_type: &str, params: &Value) -> Value {
3625    match auth_type {
3626        "API_KEY" => {
3627            let mut resp = json!({});
3628            if let Some(api_key) = params.get("ApiKeyAuthParameters") {
3629                resp["ApiKeyAuthParameters"] = json!({
3630                    "ApiKeyName": api_key["ApiKeyName"],
3631                });
3632            }
3633            resp
3634        }
3635        "BASIC" => {
3636            let mut resp = json!({});
3637            if let Some(basic) = params.get("BasicAuthParameters") {
3638                resp["BasicAuthParameters"] = json!({
3639                    "Username": basic["Username"],
3640                });
3641            }
3642            resp
3643        }
3644        "OAUTH_CLIENT_CREDENTIALS" => {
3645            let mut resp = json!({});
3646            if let Some(oauth) = params.get("OAuthParameters") {
3647                resp["OAuthParameters"] = json!({
3648                    "AuthorizationEndpoint": oauth["AuthorizationEndpoint"],
3649                    "HttpMethod": oauth["HttpMethod"],
3650                    "ClientParameters": {
3651                        "ClientID": oauth.get("ClientParameters").and_then(|c| c.get("ClientID")),
3652                    },
3653                });
3654            }
3655            resp
3656        }
3657        _ => params.clone(),
3658    }
3659}
3660
3661// ─── Event Pattern Matching ─────────────────────────────────────────
3662
3663/// Match an event against an EventBridge event pattern.
3664pub fn matches_pattern(
3665    pattern_json: Option<&str>,
3666    source: &str,
3667    detail_type: &str,
3668    detail: &str,
3669    account: &str,
3670    region: &str,
3671    resources: &[String],
3672) -> bool {
3673    let pattern_json = match pattern_json {
3674        Some(p) => p,
3675        None => return true,
3676    };
3677
3678    let pattern: Value = match serde_json::from_str(pattern_json) {
3679        Ok(v) => v,
3680        Err(_) => return false,
3681    };
3682
3683    let pattern_obj = match pattern.as_object() {
3684        Some(o) => o,
3685        None => return false,
3686    };
3687
3688    let detail_value: Value = serde_json::from_str(detail).unwrap_or(json!({}));
3689    let event = json!({
3690        "source": source,
3691        "detail-type": detail_type,
3692        "detail": detail_value,
3693        "account": account,
3694        "region": region,
3695        "resources": resources,
3696    });
3697
3698    for (key, pattern_value) in pattern_obj {
3699        let event_value = &event[key];
3700        if !matches_value(pattern_value, event_value) {
3701            return false;
3702        }
3703    }
3704
3705    true
3706}
3707
3708fn matches_value(pattern: &Value, event_value: &Value) -> bool {
3709    match pattern {
3710        Value::Object(obj) => {
3711            for (key, sub_pattern) in obj {
3712                let sub_value = &event_value[key];
3713                if !matches_value(sub_pattern, sub_value) {
3714                    return false;
3715                }
3716            }
3717            true
3718        }
3719        Value::Array(arr) => arr.iter().any(|elem| matches_single(elem, event_value)),
3720        _ => false,
3721    }
3722}
3723
3724fn matches_single(pattern_elem: &Value, event_value: &Value) -> bool {
3725    match pattern_elem {
3726        Value::Object(obj) => {
3727            if let Some(prefix_val) = obj.get("prefix") {
3728                if let (Some(prefix), Some(actual)) = (prefix_val.as_str(), event_value.as_str()) {
3729                    return actual.starts_with(prefix);
3730                }
3731                return false;
3732            }
3733            if let Some(exists_val) = obj.get("exists") {
3734                let should_exist = exists_val.as_bool().unwrap_or(true);
3735                let does_exist = !event_value.is_null();
3736                return should_exist == does_exist;
3737            }
3738            if let Some(anything_but_val) = obj.get("anything-but") {
3739                return match anything_but_val {
3740                    Value::String(s) => event_value.as_str() != Some(s.as_str()),
3741                    Value::Array(arr) => !arr.iter().any(|v| values_equal(v, event_value)),
3742                    Value::Number(_) => event_value != anything_but_val,
3743                    _ => true,
3744                };
3745            }
3746            if let Some(numeric_val) = obj.get("numeric") {
3747                return matches_numeric(numeric_val, event_value);
3748            }
3749            false
3750        }
3751        _ => values_equal(pattern_elem, event_value),
3752    }
3753}
3754
3755/// For each archive on `event_bus_name` whose event pattern matches the
3756/// event, append a clone of it to the archive's stored events and bump
3757/// the archive's counters.
3758#[allow(clippy::too_many_arguments)]
3759fn archive_matching_event(
3760    state: &mut crate::state::EventBridgeState,
3761    event: &PutEvent,
3762    event_bus_name: &str,
3763    source: &str,
3764    detail_type: &str,
3765    detail: &str,
3766    account_id: &str,
3767    region: &str,
3768    resources: &[String],
3769) {
3770    let archive_keys: Vec<String> = state.archives.keys().cloned().collect();
3771    for akey in archive_keys {
3772        let (archive_bus, archive_pattern, archive_enabled) = {
3773            let a = &state.archives[&akey];
3774            (
3775                state.resolve_bus_name(&a.event_source_arn),
3776                a.event_pattern.clone(),
3777                a.state == "ENABLED",
3778            )
3779        };
3780        if archive_bus != event_bus_name || !archive_enabled {
3781            continue;
3782        }
3783        let pattern_matches = matches_pattern(
3784            archive_pattern.as_deref(),
3785            source,
3786            detail_type,
3787            detail,
3788            account_id,
3789            region,
3790            resources,
3791        );
3792        if !pattern_matches {
3793            continue;
3794        }
3795        if let Some(archive) = state.archives.get_mut(&akey) {
3796            archive.event_count += 1;
3797            archive.size_bytes += detail.len() as i64;
3798            archive.events.push(event.clone());
3799        }
3800    }
3801}
3802
3803/// Parsed + validated inputs for `StartReplay`.
3804struct StartReplayInput {
3805    name: String,
3806    description: Option<String>,
3807    event_source_arn: String,
3808    destination: Value,
3809    destination_arn: String,
3810    event_start_time: DateTime<Utc>,
3811    event_end_time: DateTime<Utc>,
3812}
3813
3814impl StartReplayInput {
3815    fn from_body(body: &Value) -> Result<Self, AwsServiceError> {
3816        validate_required("ReplayName", &body["ReplayName"])?;
3817        let name = body["ReplayName"]
3818            .as_str()
3819            .ok_or_else(|| missing("ReplayName"))?
3820            .to_string();
3821        validate_string_length("replayName", &name, 1, 64)?;
3822        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
3823        validate_required("EventSourceArn", &body["EventSourceArn"])?;
3824        let description = body["Description"].as_str().map(|s| s.to_string());
3825        let event_source_arn = body["EventSourceArn"]
3826            .as_str()
3827            .ok_or_else(|| missing("EventSourceArn"))?
3828            .to_string();
3829        validate_string_length("eventSourceArn", &event_source_arn, 1, 1600)?;
3830        validate_required("EventStartTime", &body["EventStartTime"])?;
3831        validate_required("EventEndTime", &body["EventEndTime"])?;
3832        validate_required("Destination", &body["Destination"])?;
3833        let destination = body["Destination"].clone();
3834
3835        let event_start_time = body["EventStartTime"]
3836            .as_f64()
3837            .and_then(|f| DateTime::from_timestamp(f as i64, 0))
3838            .unwrap_or_else(Utc::now);
3839        let event_end_time = body["EventEndTime"]
3840            .as_f64()
3841            .and_then(|f| DateTime::from_timestamp(f as i64, 0))
3842            .unwrap_or_else(Utc::now);
3843
3844        let destination_arn = destination["Arn"].as_str().unwrap_or("").to_string();
3845        if !destination_arn.contains(":event-bus/") {
3846            return Err(AwsServiceError::aws_error(
3847                StatusCode::BAD_REQUEST,
3848                "ValidationException",
3849                "Parameter Destination.Arn is not valid. Reason: Must contain an event bus ARN.",
3850            ));
3851        }
3852
3853        Ok(Self {
3854            name,
3855            description,
3856            event_source_arn,
3857            destination,
3858            destination_arn,
3859            event_start_time,
3860            event_end_time,
3861        })
3862    }
3863}
3864
3865/// Walk the named archive, filter events into the replay window, then
3866/// fan out each event against rules on `bus_name` to collect its
3867/// matching targets. Returns only events that matched at least one
3868/// target.
3869#[allow(clippy::too_many_arguments)]
3870fn collect_replay_events_with_targets(
3871    state: &crate::state::EventBridgeState,
3872    archive_name: &str,
3873    bus_name: &str,
3874    event_start_time: DateTime<Utc>,
3875    event_end_time: DateTime<Utc>,
3876    account_id: &str,
3877    region: &str,
3878) -> Vec<(PutEvent, Vec<EventTarget>)> {
3879    let Some(archive) = state.archives.get(archive_name) else {
3880        return Vec::new();
3881    };
3882
3883    let replay_events: Vec<PutEvent> = archive
3884        .events
3885        .iter()
3886        .filter(|e| e.time >= event_start_time && e.time < event_end_time)
3887        .cloned()
3888        .collect();
3889
3890    let mut events_to_deliver: Vec<(PutEvent, Vec<EventTarget>)> = Vec::new();
3891    for event in replay_events {
3892        let matching_targets: Vec<EventTarget> = state
3893            .rules
3894            .values()
3895            .filter(|r| {
3896                r.event_bus_name == bus_name
3897                    && r.state == "ENABLED"
3898                    && matches_pattern(
3899                        r.event_pattern.as_deref(),
3900                        &event.source,
3901                        &event.detail_type,
3902                        &event.detail,
3903                        account_id,
3904                        region,
3905                        &event.resources,
3906                    )
3907            })
3908            .flat_map(|r| r.targets.clone())
3909            .collect();
3910
3911        if !matching_targets.is_empty() {
3912            events_to_deliver.push((event, matching_targets));
3913        }
3914    }
3915    events_to_deliver
3916}
3917
3918fn matches_numeric(numeric_arr: &Value, event_value: &Value) -> bool {
3919    let arr = match numeric_arr.as_array() {
3920        Some(a) => a,
3921        None => return false,
3922    };
3923    let actual = match event_value.as_f64() {
3924        Some(n) => n,
3925        None => return false,
3926    };
3927    let mut i = 0;
3928    while i + 1 < arr.len() {
3929        let op = match arr[i].as_str() {
3930            Some(s) => s,
3931            None => return false,
3932        };
3933        let threshold = match arr[i + 1].as_f64() {
3934            Some(n) => n,
3935            None => return false,
3936        };
3937        let ok = match op {
3938            ">" => actual > threshold,
3939            ">=" => actual >= threshold,
3940            "<" => actual < threshold,
3941            "<=" => actual <= threshold,
3942            "=" => (actual - threshold).abs() < f64::EPSILON,
3943            _ => return false,
3944        };
3945        if !ok {
3946            return false;
3947        }
3948        i += 2;
3949    }
3950    true
3951}
3952
3953fn values_equal(a: &Value, b: &Value) -> bool {
3954    a == b
3955}
3956
3957/// Resolve a simple JSON path like `$.detail.name` against an event JSON value.
3958fn resolve_json_path(event: &Value, path: &str) -> Option<Value> {
3959    let path = path.strip_prefix('$').unwrap_or(path);
3960    let mut current = event;
3961    for segment in path.split('.') {
3962        if segment.is_empty() {
3963            continue;
3964        }
3965        current = current.get(segment)?;
3966    }
3967    Some(current.clone())
3968}
3969
3970/// Apply an EventBridge InputTransformer to an event.
3971fn apply_input_transformer(transformer: &Value, event: &Value) -> String {
3972    let input_paths_map = transformer
3973        .get("InputPathsMap")
3974        .and_then(|v| v.as_object())
3975        .cloned()
3976        .unwrap_or_default();
3977    let template = transformer
3978        .get("InputTemplate")
3979        .and_then(|v| v.as_str())
3980        .unwrap_or("")
3981        .to_string();
3982
3983    // Resolve all input paths
3984    let mut resolved: HashMap<String, Value> = HashMap::new();
3985    for (var_name, path_val) in &input_paths_map {
3986        if let Some(path_str) = path_val.as_str() {
3987            if let Some(val) = resolve_json_path(event, path_str) {
3988                resolved.insert(var_name.clone(), val);
3989            }
3990        }
3991    }
3992
3993    // Replace <varName> placeholders in template
3994    let mut result = template;
3995    for (var_name, val) in &resolved {
3996        let placeholder = format!("<{var_name}>");
3997        let replacement = match val {
3998            Value::String(s) => s.clone(),
3999            other => other.to_string(),
4000        };
4001        result = result.replace(&placeholder, &replacement);
4002    }
4003
4004    result
4005}
4006
4007fn missing(name: &str) -> AwsServiceError {
4008    AwsServiceError::aws_error(
4009        StatusCode::BAD_REQUEST,
4010        "ValidationException",
4011        format!("The request must contain the parameter {name}"),
4012    )
4013}
4014
4015/// Extract a Lambda function name from its ARN.
4016///
4017/// Handles both unqualified (`arn:aws:lambda:region:account:function:NAME`)
4018/// and qualified (`arn:aws:lambda:region:account:function:NAME:alias`) ARNs.
4019fn function_name_from_arn(arn: &str) -> &str {
4020    let parts: Vec<&str> = arn.split(':').collect();
4021    if parts.len() >= 7 && parts[5] == "function" {
4022        parts[6]
4023    } else {
4024        arn
4025    }
4026}
4027
4028/// Spawn a background task to invoke a Lambda function via ContainerRuntime.
4029/// This is fire-and-forget: EventBridge delivery is asynchronous.
4030pub fn invoke_lambda_async(
4031    container_runtime: &Option<Arc<ContainerRuntime>>,
4032    lambda_state: &Option<SharedLambdaState>,
4033    function_arn: &str,
4034    payload: &str,
4035) {
4036    let runtime = match container_runtime {
4037        Some(rt) => rt.clone(),
4038        None => return,
4039    };
4040    let lambda_state = match lambda_state {
4041        Some(ls) => ls.clone(),
4042        None => return,
4043    };
4044    let func_name = function_name_from_arn(function_arn).to_string();
4045    let payload = payload.as_bytes().to_vec();
4046
4047    tokio::spawn(async move {
4048        let func = {
4049            let accounts = lambda_state.read();
4050            let state = accounts.default_ref();
4051            state.functions.get(&func_name).cloned()
4052        };
4053        let func = match func {
4054            Some(f) => f,
4055            None => {
4056                tracing::warn!(
4057                    function = %func_name,
4058                    "EventBridge Lambda target not found, skipping invocation"
4059                );
4060                return;
4061            }
4062        };
4063        match runtime.invoke(&func, &payload).await {
4064            Ok(_) => {
4065                tracing::info!(function = %func_name, "EventBridge Lambda invocation succeeded");
4066            }
4067            Err(e) => {
4068                tracing::warn!(
4069                    function = %func_name,
4070                    error = %e,
4071                    "EventBridge Lambda invocation failed"
4072                );
4073            }
4074        }
4075    });
4076}
4077
4078/// Deliver an EventBridge event to CloudWatch Logs by writing a log event
4079/// to the appropriate log group and stream.
4080pub fn deliver_to_logs(
4081    logs_state: &SharedLogsState,
4082    log_group_arn: &str,
4083    payload: &str,
4084    timestamp: chrono::DateTime<chrono::Utc>,
4085) {
4086    // Extract log group name from ARN: arn:aws:logs:region:account:log-group:NAME
4087    // or just the name if it's not an ARN
4088    let group_name = if log_group_arn.contains(":log-group:") {
4089        log_group_arn
4090            .split(":log-group:")
4091            .nth(1)
4092            .unwrap_or(log_group_arn)
4093            .trim_end_matches(":*")
4094    } else {
4095        log_group_arn
4096    };
4097
4098    let stream_name = "events".to_string();
4099    let ts_millis = timestamp.timestamp_millis();
4100
4101    let mut accounts = logs_state.write();
4102    let state = accounts.default_mut();
4103    let region = state.region.clone();
4104    let account_id = state.account_id.clone();
4105
4106    // Auto-create log group and stream if they don't exist
4107    let group = state
4108        .log_groups
4109        .entry(group_name.to_string())
4110        .or_insert_with(|| fakecloud_logs::state::LogGroup {
4111            name: group_name.to_string(),
4112            arn: Arn::new(
4113                "logs",
4114                &region,
4115                &account_id,
4116                &format!("log-group:{group_name}"),
4117            )
4118            .to_string(),
4119            creation_time: ts_millis,
4120            retention_in_days: None,
4121            kms_key_id: None,
4122            tags: HashMap::new(),
4123            log_streams: HashMap::new(),
4124            stored_bytes: 0,
4125            subscription_filters: Vec::new(),
4126            data_protection_policy: None,
4127            index_policies: Vec::new(),
4128            transformer: None,
4129            deletion_protection: false,
4130            log_group_class: Some("STANDARD".to_string()),
4131        });
4132
4133    let stream = group
4134        .log_streams
4135        .entry(stream_name.clone())
4136        .or_insert_with(|| fakecloud_logs::state::LogStream {
4137            name: stream_name,
4138            arn: format!("{}:log-stream:events", group.arn),
4139            creation_time: ts_millis,
4140            first_event_timestamp: None,
4141            last_event_timestamp: None,
4142            last_ingestion_time: None,
4143            upload_sequence_token: "1".to_string(),
4144            events: Vec::new(),
4145        });
4146
4147    stream.events.push(fakecloud_logs::state::LogEvent {
4148        timestamp: ts_millis,
4149        message: payload.to_string(),
4150        ingestion_time: ts_millis,
4151    });
4152    stream.last_event_timestamp = Some(ts_millis);
4153    stream.last_ingestion_time = Some(ts_millis);
4154    if stream.first_event_timestamp.is_none() {
4155        stream.first_event_timestamp = Some(ts_millis);
4156    }
4157}
4158
4159/// Apply connection auth parameters to an outgoing HTTP request.
4160fn apply_connection_auth(
4161    mut builder: reqwest::RequestBuilder,
4162    conn: &Connection,
4163) -> reqwest::RequestBuilder {
4164    match conn.authorization_type.as_str() {
4165        "API_KEY" => {
4166            if let Some(params) = conn.auth_parameters.get("ApiKeyAuthParameters") {
4167                if let (Some(name), Some(value)) = (
4168                    params["ApiKeyName"].as_str(),
4169                    params["ApiKeyValue"].as_str(),
4170                ) {
4171                    builder = builder.header(name, value);
4172                }
4173            }
4174        }
4175        "BASIC" => {
4176            if let Some(params) = conn.auth_parameters.get("BasicAuthParameters") {
4177                if let (Some(user), Some(pass)) =
4178                    (params["Username"].as_str(), params["Password"].as_str())
4179                {
4180                    builder = builder.basic_auth(user, Some(pass));
4181                }
4182            }
4183        }
4184        "OAUTH_CLIENT_CREDENTIALS" => {
4185            // For OAuth, in a real implementation we'd exchange credentials for a token.
4186            // Here we pass client credentials as basic auth as a reasonable approximation.
4187            if let Some(params) = conn.auth_parameters.get("OAuthParameters") {
4188                if let (Some(client_id), Some(client_secret)) = (
4189                    params["ClientParameters"]["ClientID"].as_str(),
4190                    params["ClientParameters"]["ClientSecret"].as_str(),
4191                ) {
4192                    builder = builder.basic_auth(client_id, Some(client_secret));
4193                }
4194            }
4195        }
4196        _ => {}
4197    }
4198    builder
4199}
4200
4201#[cfg(test)]
4202mod tests {
4203    use super::*;
4204
4205    /// Test helper that calls matches_pattern with default account/region/resources
4206    fn test_matches(
4207        pattern_json: Option<&str>,
4208        source: &str,
4209        detail_type: &str,
4210        detail: &str,
4211    ) -> bool {
4212        matches_pattern(
4213            pattern_json,
4214            source,
4215            detail_type,
4216            detail,
4217            "123456789012",
4218            "us-east-1",
4219            &[],
4220        )
4221    }
4222
4223    #[test]
4224    fn pattern_matches_source() {
4225        assert!(test_matches(
4226            Some(r#"{"source": ["my.app"]}"#),
4227            "my.app",
4228            "OrderPlaced",
4229            "{}"
4230        ));
4231        assert!(!test_matches(
4232            Some(r#"{"source": ["other.app"]}"#),
4233            "my.app",
4234            "OrderPlaced",
4235            "{}"
4236        ));
4237    }
4238
4239    #[test]
4240    fn pattern_matches_detail_type() {
4241        assert!(test_matches(
4242            Some(r#"{"detail-type": ["OrderPlaced"]}"#),
4243            "my.app",
4244            "OrderPlaced",
4245            "{}"
4246        ));
4247        assert!(!test_matches(
4248            Some(r#"{"detail-type": ["OrderShipped"]}"#),
4249            "my.app",
4250            "OrderPlaced",
4251            "{}"
4252        ));
4253    }
4254
4255    #[test]
4256    fn pattern_matches_detail_field() {
4257        assert!(test_matches(
4258            Some(r#"{"detail": {"status": ["ACTIVE"]}}"#),
4259            "my.app",
4260            "StatusChange",
4261            r#"{"status": "ACTIVE"}"#
4262        ));
4263        assert!(!test_matches(
4264            Some(r#"{"detail": {"status": ["ACTIVE"]}}"#),
4265            "my.app",
4266            "StatusChange",
4267            r#"{"status": "INACTIVE"}"#
4268        ));
4269    }
4270
4271    #[test]
4272    fn no_pattern_matches_everything() {
4273        assert!(test_matches(None, "any", "any", "{}"));
4274    }
4275
4276    #[test]
4277    fn combined_pattern() {
4278        let pattern = r#"{"source": ["orders"], "detail-type": ["OrderPlaced"]}"#;
4279        assert!(test_matches(Some(pattern), "orders", "OrderPlaced", "{}"));
4280        assert!(!test_matches(Some(pattern), "orders", "OrderShipped", "{}"));
4281        assert!(!test_matches(Some(pattern), "other", "OrderPlaced", "{}"));
4282    }
4283
4284    #[test]
4285    fn nested_detail_pattern() {
4286        let pattern = r#"{"detail": {"order": {"status": ["PLACED"]}}}"#;
4287        assert!(test_matches(
4288            Some(pattern),
4289            "my.app",
4290            "OrderEvent",
4291            r#"{"order": {"status": "PLACED", "id": "123"}}"#
4292        ));
4293        assert!(!test_matches(
4294            Some(pattern),
4295            "my.app",
4296            "OrderEvent",
4297            r#"{"order": {"status": "SHIPPED", "id": "123"}}"#
4298        ));
4299        assert!(!test_matches(
4300            Some(pattern),
4301            "my.app",
4302            "OrderEvent",
4303            r#"{"order": {"id": "123"}}"#
4304        ));
4305    }
4306
4307    #[test]
4308    fn deeply_nested_detail_pattern() {
4309        let pattern = r#"{"detail": {"a": {"b": {"c": ["deep"]}}}}"#;
4310        assert!(test_matches(
4311            Some(pattern),
4312            "src",
4313            "type",
4314            r#"{"a": {"b": {"c": "deep"}}}"#
4315        ));
4316        assert!(!test_matches(
4317            Some(pattern),
4318            "src",
4319            "type",
4320            r#"{"a": {"b": {"c": "shallow"}}}"#
4321        ));
4322    }
4323
4324    #[test]
4325    fn prefix_matcher() {
4326        let pattern = r#"{"source": [{"prefix": "com.myapp"}]}"#;
4327        assert!(test_matches(
4328            Some(pattern),
4329            "com.myapp.orders",
4330            "OrderPlaced",
4331            "{}"
4332        ));
4333        assert!(test_matches(
4334            Some(pattern),
4335            "com.myapp",
4336            "OrderPlaced",
4337            "{}"
4338        ));
4339        assert!(!test_matches(
4340            Some(pattern),
4341            "com.other",
4342            "OrderPlaced",
4343            "{}"
4344        ));
4345    }
4346
4347    #[test]
4348    fn prefix_matcher_in_detail() {
4349        let pattern = r#"{"detail": {"region": [{"prefix": "us-"}]}}"#;
4350        assert!(test_matches(
4351            Some(pattern),
4352            "src",
4353            "type",
4354            r#"{"region": "us-east-1"}"#
4355        ));
4356        assert!(!test_matches(
4357            Some(pattern),
4358            "src",
4359            "type",
4360            r#"{"region": "eu-west-1"}"#
4361        ));
4362    }
4363
4364    #[test]
4365    fn exists_matcher() {
4366        let pattern = r#"{"detail": {"error": [{"exists": true}]}}"#;
4367        assert!(test_matches(
4368            Some(pattern),
4369            "src",
4370            "type",
4371            r#"{"error": "something broke"}"#
4372        ));
4373        assert!(!test_matches(
4374            Some(pattern),
4375            "src",
4376            "type",
4377            r#"{"status": "ok"}"#
4378        ));
4379
4380        let pattern = r#"{"detail": {"error": [{"exists": false}]}}"#;
4381        assert!(test_matches(
4382            Some(pattern),
4383            "src",
4384            "type",
4385            r#"{"status": "ok"}"#
4386        ));
4387        assert!(!test_matches(
4388            Some(pattern),
4389            "src",
4390            "type",
4391            r#"{"error": "something broke"}"#
4392        ));
4393    }
4394
4395    #[test]
4396    fn anything_but_matcher() {
4397        let pattern = r#"{"source": [{"anything-but": "internal"}]}"#;
4398        assert!(test_matches(Some(pattern), "external", "Event", "{}"));
4399        assert!(!test_matches(Some(pattern), "internal", "Event", "{}"));
4400
4401        let pattern = r#"{"source": [{"anything-but": ["internal", "test"]}]}"#;
4402        assert!(test_matches(Some(pattern), "external", "Event", "{}"));
4403        assert!(!test_matches(Some(pattern), "internal", "Event", "{}"));
4404        assert!(!test_matches(Some(pattern), "test", "Event", "{}"));
4405    }
4406
4407    #[test]
4408    fn anything_but_in_detail() {
4409        let pattern = r#"{"detail": {"env": [{"anything-but": "prod"}]}}"#;
4410        assert!(test_matches(
4411            Some(pattern),
4412            "src",
4413            "type",
4414            r#"{"env": "staging"}"#
4415        ));
4416        assert!(!test_matches(
4417            Some(pattern),
4418            "src",
4419            "type",
4420            r#"{"env": "prod"}"#
4421        ));
4422    }
4423
4424    #[test]
4425    fn numeric_greater_than() {
4426        let pattern = r#"{"detail": {"count": [{"numeric": [">", 100]}]}}"#;
4427        assert!(test_matches(
4428            Some(pattern),
4429            "src",
4430            "type",
4431            r#"{"count": 150}"#
4432        ));
4433        assert!(!test_matches(
4434            Some(pattern),
4435            "src",
4436            "type",
4437            r#"{"count": 100}"#
4438        ));
4439        assert!(!test_matches(
4440            Some(pattern),
4441            "src",
4442            "type",
4443            r#"{"count": 50}"#
4444        ));
4445    }
4446
4447    #[test]
4448    fn numeric_less_than() {
4449        let pattern = r#"{"detail": {"count": [{"numeric": ["<", 10]}]}}"#;
4450        assert!(test_matches(
4451            Some(pattern),
4452            "src",
4453            "type",
4454            r#"{"count": 5}"#
4455        ));
4456        assert!(!test_matches(
4457            Some(pattern),
4458            "src",
4459            "type",
4460            r#"{"count": 10}"#
4461        ));
4462        assert!(!test_matches(
4463            Some(pattern),
4464            "src",
4465            "type",
4466            r#"{"count": 15}"#
4467        ));
4468    }
4469
4470    #[test]
4471    fn numeric_range() {
4472        let pattern = r#"{"detail": {"count": [{"numeric": [">=", 50, "<", 200]}]}}"#;
4473        assert!(test_matches(
4474            Some(pattern),
4475            "src",
4476            "type",
4477            r#"{"count": 50}"#
4478        ));
4479        assert!(test_matches(
4480            Some(pattern),
4481            "src",
4482            "type",
4483            r#"{"count": 100}"#
4484        ));
4485        assert!(!test_matches(
4486            Some(pattern),
4487            "src",
4488            "type",
4489            r#"{"count": 200}"#
4490        ));
4491        assert!(!test_matches(
4492            Some(pattern),
4493            "src",
4494            "type",
4495            r#"{"count": 49}"#
4496        ));
4497    }
4498
4499    #[test]
4500    fn mixed_matchers_and_literals() {
4501        let pattern = r#"{"source": ["exact.match", {"prefix": "com.myapp"}]}"#;
4502        assert!(test_matches(Some(pattern), "exact.match", "Event", "{}"));
4503        assert!(test_matches(
4504            Some(pattern),
4505            "com.myapp.orders",
4506            "Event",
4507            "{}"
4508        ));
4509        assert!(!test_matches(Some(pattern), "other.source", "Event", "{}"));
4510    }
4511
4512    // ---- list_connections / list_api_destinations filtering & pagination ----
4513
4514    use fakecloud_core::delivery::DeliveryBus;
4515    use parking_lot::RwLock;
4516
4517    fn make_service() -> EventBridgeService {
4518        let state = Arc::new(RwLock::new(
4519            fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
4520        ));
4521        let delivery = Arc::new(DeliveryBus::new());
4522        EventBridgeService::new(state, delivery)
4523    }
4524
4525    fn make_request(action: &str, body: Value) -> AwsRequest {
4526        AwsRequest {
4527            service: "events".to_string(),
4528            action: action.to_string(),
4529            region: "us-east-1".to_string(),
4530            account_id: "123456789012".to_string(),
4531            request_id: "test-id".to_string(),
4532            headers: http::HeaderMap::new(),
4533            query_params: HashMap::new(),
4534            body: serde_json::to_vec(&body).unwrap().into(),
4535            body_stream: parking_lot::Mutex::new(None),
4536            path_segments: vec![],
4537            raw_path: "/".to_string(),
4538            raw_query: String::new(),
4539            method: http::Method::POST,
4540            is_query_protocol: false,
4541            access_key_id: None,
4542            principal: None,
4543        }
4544    }
4545
4546    fn create_connection(svc: &EventBridgeService, name: &str) {
4547        let req = make_request(
4548            "CreateConnection",
4549            json!({
4550                "Name": name,
4551                "AuthorizationType": "API_KEY",
4552                "AuthParameters": {
4553                    "ApiKeyAuthParameters": {
4554                        "ApiKeyName": "x-api-key",
4555                        "ApiKeyValue": "secret"
4556                    }
4557                }
4558            }),
4559        );
4560        svc.create_connection(&req).unwrap();
4561    }
4562
4563    fn create_api_destination(svc: &EventBridgeService, name: &str, conn_name: &str) {
4564        let conn_arn_field = {
4565            let _mas = svc.state.read();
4566            let state = _mas.default_ref();
4567            state.connections.get(conn_name).unwrap().arn.clone()
4568        };
4569        let req = make_request(
4570            "CreateApiDestination",
4571            json!({
4572                "Name": name,
4573                "ConnectionArn": conn_arn_field,
4574                "InvocationEndpoint": "https://example.com",
4575                "HttpMethod": "POST"
4576            }),
4577        );
4578        svc.create_api_destination(&req).unwrap();
4579    }
4580
4581    // -- ListConnections tests --
4582
4583    #[test]
4584    fn list_connections_returns_all_by_default() {
4585        let svc = make_service();
4586        create_connection(&svc, "conn-alpha");
4587        create_connection(&svc, "conn-beta");
4588        create_connection(&svc, "conn-gamma");
4589
4590        let req = make_request("ListConnections", json!({}));
4591        let resp = svc.list_connections(&req).unwrap();
4592        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4593        assert_eq!(body["Connections"].as_array().unwrap().len(), 3);
4594        assert!(body["NextToken"].is_null());
4595    }
4596
4597    #[test]
4598    fn list_connections_name_prefix_filter() {
4599        let svc = make_service();
4600        create_connection(&svc, "prod-conn-1");
4601        create_connection(&svc, "prod-conn-2");
4602        create_connection(&svc, "dev-conn-1");
4603
4604        let req = make_request("ListConnections", json!({ "NamePrefix": "prod-" }));
4605        let resp = svc.list_connections(&req).unwrap();
4606        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4607        let names: Vec<&str> = body["Connections"]
4608            .as_array()
4609            .unwrap()
4610            .iter()
4611            .map(|c| c["Name"].as_str().unwrap())
4612            .collect();
4613        assert_eq!(names.len(), 2);
4614        assert!(names.iter().all(|n| n.starts_with("prod-")));
4615    }
4616
4617    #[test]
4618    fn list_connections_state_filter() {
4619        let svc = make_service();
4620        create_connection(&svc, "conn-a");
4621        create_connection(&svc, "conn-b");
4622
4623        // All connections start as AUTHORIZED; change one
4624        {
4625            let mut _mas = svc.state.write();
4626            let state = _mas.default_mut();
4627            state
4628                .connections
4629                .get_mut("conn-b")
4630                .unwrap()
4631                .connection_state = "DEAUTHORIZED".to_string();
4632        }
4633
4634        let req = make_request(
4635            "ListConnections",
4636            json!({ "ConnectionState": "AUTHORIZED" }),
4637        );
4638        let resp = svc.list_connections(&req).unwrap();
4639        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4640        let conns = body["Connections"].as_array().unwrap();
4641        assert_eq!(conns.len(), 1);
4642        assert_eq!(conns[0]["Name"].as_str().unwrap(), "conn-a");
4643    }
4644
4645    #[test]
4646    fn list_connections_pagination() {
4647        let svc = make_service();
4648        for i in 0..5 {
4649            create_connection(&svc, &format!("conn-{i:02}"));
4650        }
4651
4652        // First page: limit 2
4653        let req = make_request("ListConnections", json!({ "Limit": 2 }));
4654        let resp = svc.list_connections(&req).unwrap();
4655        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4656        assert_eq!(body["Connections"].as_array().unwrap().len(), 2);
4657        let token = body["NextToken"].as_str().unwrap();
4658        assert_eq!(token, "2");
4659
4660        // Second page
4661        let req = make_request("ListConnections", json!({ "Limit": 2, "NextToken": token }));
4662        let resp = svc.list_connections(&req).unwrap();
4663        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4664        assert_eq!(body["Connections"].as_array().unwrap().len(), 2);
4665        let token = body["NextToken"].as_str().unwrap();
4666        assert_eq!(token, "4");
4667
4668        // Third page (only 1 remaining)
4669        let req = make_request("ListConnections", json!({ "Limit": 2, "NextToken": token }));
4670        let resp = svc.list_connections(&req).unwrap();
4671        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4672        assert_eq!(body["Connections"].as_array().unwrap().len(), 1);
4673        assert!(body["NextToken"].is_null());
4674    }
4675
4676    #[test]
4677    fn list_connections_pagination_with_filter() {
4678        let svc = make_service();
4679        for i in 0..4 {
4680            create_connection(&svc, &format!("prod-{i:02}"));
4681        }
4682        create_connection(&svc, "dev-00");
4683
4684        let req = make_request(
4685            "ListConnections",
4686            json!({ "NamePrefix": "prod-", "Limit": 2 }),
4687        );
4688        let resp = svc.list_connections(&req).unwrap();
4689        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4690        assert_eq!(body["Connections"].as_array().unwrap().len(), 2);
4691        assert!(body["NextToken"].as_str().is_some());
4692    }
4693
4694    // -- ListApiDestinations tests --
4695
4696    #[test]
4697    fn list_api_destinations_returns_all_by_default() {
4698        let svc = make_service();
4699        create_connection(&svc, "my-conn");
4700        create_api_destination(&svc, "dest-alpha", "my-conn");
4701        create_api_destination(&svc, "dest-beta", "my-conn");
4702
4703        let req = make_request("ListApiDestinations", json!({}));
4704        let resp = svc.list_api_destinations(&req).unwrap();
4705        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4706        assert_eq!(body["ApiDestinations"].as_array().unwrap().len(), 2);
4707        assert!(body["NextToken"].is_null());
4708    }
4709
4710    #[test]
4711    fn list_api_destinations_name_prefix_filter() {
4712        let svc = make_service();
4713        create_connection(&svc, "my-conn");
4714        create_api_destination(&svc, "prod-dest-1", "my-conn");
4715        create_api_destination(&svc, "prod-dest-2", "my-conn");
4716        create_api_destination(&svc, "dev-dest-1", "my-conn");
4717
4718        let req = make_request("ListApiDestinations", json!({ "NamePrefix": "prod-" }));
4719        let resp = svc.list_api_destinations(&req).unwrap();
4720        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4721        let names: Vec<&str> = body["ApiDestinations"]
4722            .as_array()
4723            .unwrap()
4724            .iter()
4725            .map(|d| d["Name"].as_str().unwrap())
4726            .collect();
4727        assert_eq!(names.len(), 2);
4728        assert!(names.iter().all(|n| n.starts_with("prod-")));
4729    }
4730
4731    #[test]
4732    fn list_api_destinations_connection_arn_filter() {
4733        let svc = make_service();
4734        create_connection(&svc, "conn-a");
4735        create_connection(&svc, "conn-b");
4736        create_api_destination(&svc, "dest-1", "conn-a");
4737        create_api_destination(&svc, "dest-2", "conn-b");
4738        create_api_destination(&svc, "dest-3", "conn-a");
4739
4740        let conn_a_arn = {
4741            let _mas = svc.state.read();
4742            let state = _mas.default_ref();
4743            state.connections.get("conn-a").unwrap().arn.clone()
4744        };
4745
4746        let req = make_request(
4747            "ListApiDestinations",
4748            json!({ "ConnectionArn": conn_a_arn }),
4749        );
4750        let resp = svc.list_api_destinations(&req).unwrap();
4751        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4752        let names: Vec<&str> = body["ApiDestinations"]
4753            .as_array()
4754            .unwrap()
4755            .iter()
4756            .map(|d| d["Name"].as_str().unwrap())
4757            .collect();
4758        assert_eq!(names.len(), 2);
4759        assert!(names.contains(&"dest-1"));
4760        assert!(names.contains(&"dest-3"));
4761    }
4762
4763    #[test]
4764    fn list_api_destinations_pagination() {
4765        let svc = make_service();
4766        create_connection(&svc, "my-conn");
4767        for i in 0..5 {
4768            create_api_destination(&svc, &format!("dest-{i:02}"), "my-conn");
4769        }
4770
4771        // First page
4772        let req = make_request("ListApiDestinations", json!({ "Limit": 2 }));
4773        let resp = svc.list_api_destinations(&req).unwrap();
4774        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4775        assert_eq!(body["ApiDestinations"].as_array().unwrap().len(), 2);
4776        let token = body["NextToken"].as_str().unwrap();
4777        assert_eq!(token, "2");
4778
4779        // Second page
4780        let req = make_request(
4781            "ListApiDestinations",
4782            json!({ "Limit": 2, "NextToken": token }),
4783        );
4784        let resp = svc.list_api_destinations(&req).unwrap();
4785        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4786        assert_eq!(body["ApiDestinations"].as_array().unwrap().len(), 2);
4787        let token = body["NextToken"].as_str().unwrap();
4788        assert_eq!(token, "4");
4789
4790        // Last page
4791        let req = make_request(
4792            "ListApiDestinations",
4793            json!({ "Limit": 2, "NextToken": token }),
4794        );
4795        let resp = svc.list_api_destinations(&req).unwrap();
4796        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4797        assert_eq!(body["ApiDestinations"].as_array().unwrap().len(), 1);
4798        assert!(body["NextToken"].is_null());
4799    }
4800
4801    // -- ListEventBuses pagination tests --
4802
4803    fn create_event_bus(svc: &EventBridgeService, name: &str) {
4804        let req = make_request("CreateEventBus", json!({ "Name": name }));
4805        svc.create_event_bus(&req).unwrap();
4806    }
4807
4808    #[test]
4809    fn list_event_buses_pagination() {
4810        let svc = make_service();
4811        // "default" bus already exists, create 4 more
4812        for i in 0..4 {
4813            create_event_bus(&svc, &format!("bus-{i:02}"));
4814        }
4815
4816        // First page: limit 2
4817        let req = make_request("ListEventBuses", json!({ "Limit": 2 }));
4818        let resp = svc.list_event_buses(&req).unwrap();
4819        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4820        assert_eq!(body["EventBuses"].as_array().unwrap().len(), 2);
4821        let token = body["NextToken"].as_str().unwrap();
4822        assert_eq!(token, "2");
4823
4824        // Second page
4825        let req = make_request("ListEventBuses", json!({ "Limit": 2, "NextToken": token }));
4826        let resp = svc.list_event_buses(&req).unwrap();
4827        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4828        assert_eq!(body["EventBuses"].as_array().unwrap().len(), 2);
4829        let token = body["NextToken"].as_str().unwrap();
4830        assert_eq!(token, "4");
4831
4832        // Third page (only 1 remaining)
4833        let req = make_request("ListEventBuses", json!({ "Limit": 2, "NextToken": token }));
4834        let resp = svc.list_event_buses(&req).unwrap();
4835        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4836        assert_eq!(body["EventBuses"].as_array().unwrap().len(), 1);
4837        assert!(body["NextToken"].is_null());
4838    }
4839
4840    #[test]
4841    fn list_event_buses_no_pagination_returns_all() {
4842        let svc = make_service();
4843        create_event_bus(&svc, "bus-alpha");
4844        create_event_bus(&svc, "bus-beta");
4845
4846        let req = make_request("ListEventBuses", json!({}));
4847        let resp = svc.list_event_buses(&req).unwrap();
4848        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4849        // default + 2 custom = 3
4850        assert_eq!(body["EventBuses"].as_array().unwrap().len(), 3);
4851        assert!(body["NextToken"].is_null());
4852    }
4853
4854    // -- PutEvents EndpointId tests --
4855
4856    #[test]
4857    fn put_events_never_includes_endpoint_id_in_response() {
4858        let svc = make_service();
4859        // Even when EndpointId is provided in the request, it must not appear in the response
4860        let req = make_request(
4861            "PutEvents",
4862            json!({
4863                "EndpointId": "my-endpoint.abc123",
4864                "Entries": [{
4865                    "Source": "my.source",
4866                    "DetailType": "MyType",
4867                    "Detail": "{}",
4868                    "EventBusName": "default"
4869                }]
4870            }),
4871        );
4872        let resp = svc.put_events(&req).unwrap();
4873        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4874        assert!(
4875            !body.as_object().unwrap().contains_key("EndpointId"),
4876            "EndpointId should never be in the PutEvents response"
4877        );
4878        assert_eq!(body["FailedEntryCount"], 0);
4879    }
4880
4881    // -- ListArchives pagination tests --
4882
4883    fn create_archive(svc: &EventBridgeService, name: &str) {
4884        let req = make_request(
4885            "CreateArchive",
4886            json!({
4887                "ArchiveName": name,
4888                "EventSourceArn": "arn:aws:events:us-east-1:123456789012:event-bus/default"
4889            }),
4890        );
4891        svc.create_archive(&req).unwrap();
4892    }
4893
4894    #[test]
4895    fn list_archives_pagination() {
4896        let svc = make_service();
4897        for i in 0..5 {
4898            create_archive(&svc, &format!("archive-{i:02}"));
4899        }
4900
4901        // First page: limit 2
4902        let req = make_request("ListArchives", json!({ "Limit": 2 }));
4903        let resp = svc.list_archives(&req).unwrap();
4904        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4905        assert_eq!(body["Archives"].as_array().unwrap().len(), 2);
4906        let token = body["NextToken"].as_str().unwrap();
4907        assert_eq!(token, "2");
4908
4909        // Second page
4910        let req = make_request("ListArchives", json!({ "Limit": 2, "NextToken": token }));
4911        let resp = svc.list_archives(&req).unwrap();
4912        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4913        assert_eq!(body["Archives"].as_array().unwrap().len(), 2);
4914        let token = body["NextToken"].as_str().unwrap();
4915        assert_eq!(token, "4");
4916
4917        // Third page (only 1 remaining)
4918        let req = make_request("ListArchives", json!({ "Limit": 2, "NextToken": token }));
4919        let resp = svc.list_archives(&req).unwrap();
4920        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4921        assert_eq!(body["Archives"].as_array().unwrap().len(), 1);
4922        assert!(body["NextToken"].is_null());
4923    }
4924
4925    // -- ListReplays pagination tests --
4926
4927    fn create_replay(svc: &EventBridgeService, name: &str) {
4928        // Need an archive first for the replay's event source
4929        let archive_arn = {
4930            let guard = svc.state.read();
4931            let st = guard.default_ref();
4932            if st.archives.contains_key("replay-archive") {
4933                st.archives["replay-archive"].arn.clone()
4934            } else {
4935                drop(guard);
4936                create_archive(svc, "replay-archive");
4937                svc.state.read().default_ref().archives["replay-archive"]
4938                    .arn
4939                    .clone()
4940            }
4941        };
4942        let req = make_request(
4943            "StartReplay",
4944            json!({
4945                "ReplayName": name,
4946                "EventSourceArn": archive_arn,
4947                "EventStartTime": 1000000.0,
4948                "EventEndTime": 2000000.0,
4949                "Destination": {
4950                    "Arn": "arn:aws:events:us-east-1:123456789012:event-bus/default"
4951                }
4952            }),
4953        );
4954        svc.start_replay(&req).unwrap();
4955    }
4956
4957    #[test]
4958    fn list_replays_pagination() {
4959        let svc = make_service();
4960        for i in 0..5 {
4961            create_replay(&svc, &format!("replay-{i:02}"));
4962        }
4963
4964        // First page: limit 2
4965        let req = make_request("ListReplays", json!({ "Limit": 2 }));
4966        let resp = svc.list_replays(&req).unwrap();
4967        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4968        assert_eq!(body["Replays"].as_array().unwrap().len(), 2);
4969        let token = body["NextToken"].as_str().unwrap();
4970        assert_eq!(token, "2");
4971
4972        // Second page
4973        let req = make_request("ListReplays", json!({ "Limit": 2, "NextToken": token }));
4974        let resp = svc.list_replays(&req).unwrap();
4975        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4976        assert_eq!(body["Replays"].as_array().unwrap().len(), 2);
4977        let token = body["NextToken"].as_str().unwrap();
4978        assert_eq!(token, "4");
4979
4980        // Third page (only 1 remaining)
4981        let req = make_request("ListReplays", json!({ "Limit": 2, "NextToken": token }));
4982        let resp = svc.list_replays(&req).unwrap();
4983        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4984        assert_eq!(body["Replays"].as_array().unwrap().len(), 1);
4985        assert!(body["NextToken"].is_null());
4986    }
4987
4988    #[test]
4989    fn list_event_buses_invalid_next_token_returns_error() {
4990        let svc = make_service();
4991
4992        let req = make_request("ListEventBuses", json!({ "NextToken": "not-a-number" }));
4993        let result = svc.list_event_buses(&req);
4994        assert!(
4995            result.is_err(),
4996            "non-numeric NextToken should return an error"
4997        );
4998    }
4999
5000    // ---- TestEventPattern tests ----
5001
5002    #[test]
5003    fn test_event_pattern_match() {
5004        let svc = make_service();
5005        let req = make_request(
5006            "TestEventPattern",
5007            json!({
5008                "EventPattern": r#"{"source": ["my.app"]}"#,
5009                "Event": r#"{"source": "my.app", "detail-type": "Test", "detail": {}}"#
5010            }),
5011        );
5012        let resp = svc.test_event_pattern(&req).unwrap();
5013        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5014        assert_eq!(body["Result"], true);
5015    }
5016
5017    #[test]
5018    fn test_event_pattern_no_match() {
5019        let svc = make_service();
5020        let req = make_request(
5021            "TestEventPattern",
5022            json!({
5023                "EventPattern": r#"{"source": ["other.app"]}"#,
5024                "Event": r#"{"source": "my.app", "detail-type": "Test", "detail": {}}"#
5025            }),
5026        );
5027        let resp = svc.test_event_pattern(&req).unwrap();
5028        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5029        assert_eq!(body["Result"], false);
5030    }
5031
5032    #[test]
5033    fn test_event_pattern_detail_match() {
5034        let svc = make_service();
5035        let req = make_request(
5036            "TestEventPattern",
5037            json!({
5038                "EventPattern": r#"{"detail": {"status": ["PLACED"]}}"#,
5039                "Event": r#"{"source": "my.app", "detail-type": "Order", "detail": {"status": "PLACED", "id": "123"}}"#
5040            }),
5041        );
5042        let resp = svc.test_event_pattern(&req).unwrap();
5043        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5044        assert_eq!(body["Result"], true);
5045    }
5046
5047    // ---- UpdateEventBus tests ----
5048
5049    #[test]
5050    fn update_event_bus_description() {
5051        let svc = make_service();
5052        create_event_bus(&svc, "my-bus");
5053
5054        let req = make_request(
5055            "UpdateEventBus",
5056            json!({ "Name": "my-bus", "Description": "Updated desc" }),
5057        );
5058        let resp = svc.update_event_bus(&req).unwrap();
5059        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5060        assert_eq!(body["Name"], "my-bus");
5061
5062        // Verify via describe
5063        let req = make_request("DescribeEventBus", json!({ "Name": "my-bus" }));
5064        let resp = svc.describe_event_bus(&req).unwrap();
5065        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5066        assert_eq!(body["Description"], "Updated desc");
5067    }
5068
5069    #[test]
5070    fn update_event_bus_not_found() {
5071        let svc = make_service();
5072        let req = make_request(
5073            "UpdateEventBus",
5074            json!({ "Name": "ghost-bus", "Description": "nope" }),
5075        );
5076        assert!(svc.update_event_bus(&req).is_err());
5077    }
5078
5079    // ---- Endpoint CRUD tests ----
5080
5081    fn create_endpoint_helper(svc: &EventBridgeService, name: &str) {
5082        let req = make_request(
5083            "CreateEndpoint",
5084            json!({
5085                "Name": name,
5086                "RoutingConfig": {
5087                    "FailoverConfig": {
5088                        "Primary": { "HealthCheck": "" },
5089                        "Secondary": { "Route": "us-west-2" }
5090                    }
5091                },
5092                "EventBuses": [
5093                    { "EventBusArn": "arn:aws:events:us-east-1:123456789012:event-bus/default" }
5094                ]
5095            }),
5096        );
5097        svc.create_endpoint(&req).unwrap();
5098    }
5099
5100    #[test]
5101    fn endpoint_create_describe_delete() {
5102        let svc = make_service();
5103        create_endpoint_helper(&svc, "my-endpoint");
5104
5105        // Describe
5106        let req = make_request("DescribeEndpoint", json!({ "Name": "my-endpoint" }));
5107        let resp = svc.describe_endpoint(&req).unwrap();
5108        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5109        assert_eq!(body["Name"], "my-endpoint");
5110        assert_eq!(body["State"], "ACTIVE");
5111        assert!(body["EndpointId"].as_str().unwrap().contains("my-endpoint"));
5112
5113        // Delete
5114        let req = make_request("DeleteEndpoint", json!({ "Name": "my-endpoint" }));
5115        svc.delete_endpoint(&req).unwrap();
5116
5117        // Verify gone
5118        let req = make_request("DescribeEndpoint", json!({ "Name": "my-endpoint" }));
5119        assert!(svc.describe_endpoint(&req).is_err());
5120    }
5121
5122    #[test]
5123    fn endpoint_list_and_update() {
5124        let svc = make_service();
5125        create_endpoint_helper(&svc, "ep-alpha");
5126        create_endpoint_helper(&svc, "ep-beta");
5127
5128        // List all
5129        let req = make_request("ListEndpoints", json!({}));
5130        let resp = svc.list_endpoints(&req).unwrap();
5131        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5132        assert_eq!(body["Endpoints"].as_array().unwrap().len(), 2);
5133
5134        // Update
5135        let req = make_request(
5136            "UpdateEndpoint",
5137            json!({ "Name": "ep-alpha", "Description": "updated" }),
5138        );
5139        let resp = svc.update_endpoint(&req).unwrap();
5140        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5141        assert_eq!(body["Name"], "ep-alpha");
5142
5143        // Verify description
5144        let req = make_request("DescribeEndpoint", json!({ "Name": "ep-alpha" }));
5145        let resp = svc.describe_endpoint(&req).unwrap();
5146        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5147        assert_eq!(body["Description"], "updated");
5148    }
5149
5150    #[test]
5151    fn endpoint_duplicate_fails() {
5152        let svc = make_service();
5153        create_endpoint_helper(&svc, "dup-ep");
5154        let req = make_request(
5155            "CreateEndpoint",
5156            json!({
5157                "Name": "dup-ep",
5158                "RoutingConfig": {},
5159                "EventBuses": []
5160            }),
5161        );
5162        assert!(svc.create_endpoint(&req).is_err());
5163    }
5164
5165    // ---- DeauthorizeConnection tests ----
5166
5167    #[test]
5168    fn deauthorize_connection_sets_state() {
5169        let svc = make_service();
5170        create_connection(&svc, "deauth-conn");
5171
5172        let req = make_request("DeauthorizeConnection", json!({ "Name": "deauth-conn" }));
5173        let resp = svc.deauthorize_connection(&req).unwrap();
5174        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5175        assert_eq!(body["ConnectionState"], "DEAUTHORIZING");
5176        assert!(body["ConnectionArn"]
5177            .as_str()
5178            .unwrap()
5179            .contains("deauth-conn"));
5180
5181        // Verify via describe
5182        let req = make_request("DescribeConnection", json!({ "Name": "deauth-conn" }));
5183        let resp = svc.describe_connection(&req).unwrap();
5184        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5185        assert_eq!(body["ConnectionState"], "DEAUTHORIZING");
5186    }
5187
5188    #[test]
5189    fn deauthorize_connection_not_found() {
5190        let svc = make_service();
5191        let req = make_request("DeauthorizeConnection", json!({ "Name": "ghost-conn" }));
5192        assert!(svc.deauthorize_connection(&req).is_err());
5193    }
5194
5195    // ---- Partner event source tests ----
5196
5197    #[test]
5198    fn partner_event_source_crud() {
5199        let svc = make_service();
5200
5201        // Create
5202        let req = make_request(
5203            "CreatePartnerEventSource",
5204            json!({ "Name": "partner/test", "Account": "123456789012" }),
5205        );
5206        svc.create_partner_event_source(&req).unwrap();
5207
5208        // Describe
5209        let req = make_request(
5210            "DescribePartnerEventSource",
5211            json!({ "Name": "partner/test" }),
5212        );
5213        let resp = svc.describe_partner_event_source(&req).unwrap();
5214        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5215        assert_eq!(body["Name"], "partner/test");
5216
5217        // List
5218        let req = make_request("ListPartnerEventSources", json!({"NamePrefix": "partner/"}));
5219        let resp = svc.list_partner_event_sources(&req).unwrap();
5220        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5221        assert_eq!(body["PartnerEventSources"].as_array().unwrap().len(), 1);
5222
5223        // ListPartnerEventSourceAccounts
5224        let req = make_request(
5225            "ListPartnerEventSourceAccounts",
5226            json!({ "EventSourceName": "partner/test" }),
5227        );
5228        let resp = svc.list_partner_event_source_accounts(&req).unwrap();
5229        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5230        assert_eq!(
5231            body["PartnerEventSourceAccounts"].as_array().unwrap().len(),
5232            1
5233        );
5234
5235        // DescribeEventSource
5236        let req = make_request("DescribeEventSource", json!({ "Name": "partner/test" }));
5237        let resp = svc.describe_event_source(&req).unwrap();
5238        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5239        assert_eq!(body["Name"], "partner/test");
5240        assert_eq!(body["State"], "ACTIVE");
5241
5242        // ListEventSources
5243        let req = make_request("ListEventSources", json!({}));
5244        let resp = svc.list_event_sources(&req).unwrap();
5245        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5246        assert_eq!(body["EventSources"].as_array().unwrap().len(), 1);
5247
5248        // Delete
5249        let req = make_request(
5250            "DeletePartnerEventSource",
5251            json!({ "Name": "partner/test", "Account": "123456789012" }),
5252        );
5253        svc.delete_partner_event_source(&req).unwrap();
5254
5255        // Verify gone
5256        let req = make_request(
5257            "DescribePartnerEventSource",
5258            json!({ "Name": "partner/test" }),
5259        );
5260        assert!(svc.describe_partner_event_source(&req).is_err());
5261    }
5262
5263    #[test]
5264    fn activate_deactivate_event_source() {
5265        let svc = make_service();
5266
5267        // Create a partner event source first
5268        let req = make_request(
5269            "CreatePartnerEventSource",
5270            json!({ "Name": "aws.partner/test", "Account": "123456789012" }),
5271        );
5272        svc.create_partner_event_source(&req).unwrap();
5273
5274        // Deactivate it
5275        let req = make_request(
5276            "DeactivateEventSource",
5277            json!({ "Name": "aws.partner/test" }),
5278        );
5279        svc.deactivate_event_source(&req).unwrap();
5280        {
5281            let _mas = svc.state.read();
5282            let state = _mas.default_ref();
5283            assert_eq!(
5284                state.partner_event_sources["aws.partner/test"].state,
5285                "INACTIVE"
5286            );
5287        }
5288
5289        // Activate it
5290        let req = make_request("ActivateEventSource", json!({ "Name": "aws.partner/test" }));
5291        svc.activate_event_source(&req).unwrap();
5292        {
5293            let _mas = svc.state.read();
5294            let state = _mas.default_ref();
5295            assert_eq!(
5296                state.partner_event_sources["aws.partner/test"].state,
5297                "ACTIVE"
5298            );
5299        }
5300
5301        // Not-found returns error
5302        let req = make_request("ActivateEventSource", json!({ "Name": "nonexistent" }));
5303        assert!(svc.activate_event_source(&req).is_err());
5304
5305        let req = make_request("DeactivateEventSource", json!({ "Name": "nonexistent" }));
5306        assert!(svc.deactivate_event_source(&req).is_err());
5307    }
5308
5309    #[test]
5310    fn delete_partner_event_source_verifies_account() {
5311        let svc = make_service();
5312
5313        // Create a partner event source
5314        let req = make_request(
5315            "CreatePartnerEventSource",
5316            json!({ "Name": "aws.partner/test", "Account": "123456789012" }),
5317        );
5318        svc.create_partner_event_source(&req).unwrap();
5319
5320        // Deleting with wrong account fails
5321        let req = make_request(
5322            "DeletePartnerEventSource",
5323            json!({ "Name": "aws.partner/test", "Account": "999999999999" }),
5324        );
5325        assert!(svc.delete_partner_event_source(&req).is_err());
5326        // Source still exists
5327        assert!(svc
5328            .state
5329            .read()
5330            .default_ref()
5331            .partner_event_sources
5332            .contains_key("aws.partner/test"));
5333
5334        // Deleting with correct account succeeds
5335        let req = make_request(
5336            "DeletePartnerEventSource",
5337            json!({ "Name": "aws.partner/test", "Account": "123456789012" }),
5338        );
5339        svc.delete_partner_event_source(&req).unwrap();
5340        assert!(!svc
5341            .state
5342            .read()
5343            .default_ref()
5344            .partner_event_sources
5345            .contains_key("aws.partner/test"));
5346
5347        // Deleting non-existent source returns error
5348        let req = make_request(
5349            "DeletePartnerEventSource",
5350            json!({ "Name": "aws.partner/test", "Account": "123456789012" }),
5351        );
5352        assert!(svc.delete_partner_event_source(&req).is_err());
5353    }
5354
5355    #[test]
5356    fn put_partner_events() {
5357        let svc = make_service();
5358        let req = make_request(
5359            "PutPartnerEvents",
5360            json!({
5361                "Entries": [
5362                    { "Source": "partner.app", "DetailType": "Test", "Detail": "{}" }
5363                ]
5364            }),
5365        );
5366        let resp = svc.put_partner_events(&req).unwrap();
5367        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5368        assert_eq!(body["FailedEntryCount"], 0);
5369        assert_eq!(body["Entries"].as_array().unwrap().len(), 1);
5370        assert!(body["Entries"][0]["EventId"].as_str().is_some());
5371    }
5372
5373    // ---- Archive + Replay delivery tests ----
5374
5375    /// Helper: create a service with a mock SQS delivery that records messages.
5376    #[allow(clippy::type_complexity)]
5377    fn make_service_with_sqs_recorder() -> (
5378        EventBridgeService,
5379        Arc<parking_lot::Mutex<Vec<(String, String)>>>,
5380    ) {
5381        use fakecloud_core::delivery::SqsDelivery;
5382
5383        struct RecordingSqsDelivery {
5384            messages: Arc<parking_lot::Mutex<Vec<(String, String)>>>,
5385        }
5386
5387        impl SqsDelivery for RecordingSqsDelivery {
5388            fn deliver_to_queue(
5389                &self,
5390                queue_arn: &str,
5391                message_body: &str,
5392                _attributes: &HashMap<String, String>,
5393            ) {
5394                self.messages
5395                    .lock()
5396                    .push((queue_arn.to_string(), message_body.to_string()));
5397            }
5398        }
5399
5400        let messages: Arc<parking_lot::Mutex<Vec<(String, String)>>> =
5401            Arc::new(parking_lot::Mutex::new(Vec::new()));
5402        let state = Arc::new(RwLock::new(
5403            fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
5404        ));
5405        let delivery = Arc::new(DeliveryBus::new().with_sqs(Arc::new(RecordingSqsDelivery {
5406            messages: messages.clone(),
5407        })));
5408        let svc = EventBridgeService::new(state, delivery);
5409        (svc, messages)
5410    }
5411
5412    #[test]
5413    fn start_replay_delivers_archived_events_to_sqs_target() {
5414        let (svc, messages) = make_service_with_sqs_recorder();
5415        let queue_arn = "arn:aws:sqs:us-east-1:123456789012:replay-queue";
5416
5417        // Create a rule with an SQS target
5418        let req = make_request(
5419            "PutRule",
5420            json!({
5421                "Name": "replay-test-rule",
5422                "EventPattern": r#"{"source": ["my.app"]}"#,
5423                "State": "ENABLED"
5424            }),
5425        );
5426        svc.put_rule(&req).unwrap();
5427
5428        let req = make_request(
5429            "PutTargets",
5430            json!({
5431                "Rule": "replay-test-rule",
5432                "Targets": [{
5433                    "Id": "sqs-target",
5434                    "Arn": queue_arn
5435                }]
5436            }),
5437        );
5438        svc.put_targets(&req).unwrap();
5439
5440        // Create an archive on the default bus
5441        let req = make_request(
5442            "CreateArchive",
5443            json!({
5444                "ArchiveName": "test-archive",
5445                "EventSourceArn": "arn:aws:events:us-east-1:123456789012:event-bus/default"
5446            }),
5447        );
5448        svc.create_archive(&req).unwrap();
5449
5450        // PutEvents: these should get archived and delivered
5451        let req = make_request(
5452            "PutEvents",
5453            json!({
5454                "Entries": [
5455                    {
5456                        "Source": "my.app",
5457                        "DetailType": "OrderCreated",
5458                        "Detail": "{\"orderId\": \"1\"}",
5459                        "EventBusName": "default"
5460                    },
5461                    {
5462                        "Source": "my.app",
5463                        "DetailType": "OrderShipped",
5464                        "Detail": "{\"orderId\": \"2\"}",
5465                        "EventBusName": "default"
5466                    }
5467                ]
5468            }),
5469        );
5470        let resp = svc.put_events(&req).unwrap();
5471        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5472        assert_eq!(body["FailedEntryCount"], 0);
5473
5474        // Verify archive has 2 events
5475        {
5476            let _mas = svc.state.read();
5477            let state = _mas.default_ref();
5478            let archive = state.archives.get("test-archive").unwrap();
5479            assert_eq!(archive.events.len(), 2);
5480            assert_eq!(archive.event_count, 2);
5481        }
5482
5483        // Clear recorded messages from PutEvents delivery
5484        messages.lock().clear();
5485
5486        // StartReplay: should re-deliver the archived events
5487        let archive_arn = {
5488            let _mas = svc.state.read();
5489            let state = _mas.default_ref();
5490            state.archives.get("test-archive").unwrap().arn.clone()
5491        };
5492
5493        // Use a wide time range to capture all events
5494        let start_ts = 0.0_f64;
5495        let end_ts = (chrono::Utc::now().timestamp() + 3600) as f64;
5496
5497        let req = make_request(
5498            "StartReplay",
5499            json!({
5500                "ReplayName": "my-replay",
5501                "EventSourceArn": archive_arn,
5502                "Destination": {
5503                    "Arn": "arn:aws:events:us-east-1:123456789012:event-bus/default"
5504                },
5505                "EventStartTime": start_ts,
5506                "EventEndTime": end_ts
5507            }),
5508        );
5509        let resp = svc.start_replay(&req).unwrap();
5510        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5511        assert_eq!(body["State"], "STARTING");
5512
5513        // Verify the replay delivered events to SQS
5514        let delivered = messages.lock();
5515        assert_eq!(
5516            delivered.len(),
5517            2,
5518            "expected 2 replayed events delivered to SQS"
5519        );
5520        for (arn, msg) in delivered.iter() {
5521            assert_eq!(arn, queue_arn);
5522            let event: Value = serde_json::from_str(msg).unwrap();
5523            assert_eq!(event["source"], "my.app");
5524            // Replayed events should include replay-name
5525            assert!(event["replay-name"].as_str().is_some());
5526        }
5527
5528        // Verify replay is marked as COMPLETED
5529        let _mas = svc.state.read();
5530        let state = _mas.default_ref();
5531        let replay = state.replays.get("my-replay").unwrap();
5532        assert_eq!(replay.state, "COMPLETED");
5533    }
5534
5535    #[test]
5536    fn apply_connection_auth_api_key() {
5537        let conn = Connection {
5538            name: "test-conn".to_string(),
5539            arn: "arn:aws:events:us-east-1:123456789012:connection/test-conn/uuid".to_string(),
5540            description: None,
5541            authorization_type: "API_KEY".to_string(),
5542            auth_parameters: json!({
5543                "ApiKeyAuthParameters": {
5544                    "ApiKeyName": "x-api-key",
5545                    "ApiKeyValue": "my-secret"
5546                }
5547            }),
5548            connection_state: "AUTHORIZED".to_string(),
5549            secret_arn: "arn:aws:secretsmanager:us-east-1:123456789012:secret:test".to_string(),
5550            creation_time: Utc::now(),
5551            last_modified_time: Utc::now(),
5552            last_authorized_time: Utc::now(),
5553        };
5554
5555        let client = reqwest::Client::new();
5556        let builder = client
5557            .post("http://localhost:12345/test")
5558            .header("Content-Type", "application/json");
5559        let builder = apply_connection_auth(builder, &conn);
5560
5561        // Build and verify the header was applied
5562        let request = builder.body("{}").build().unwrap();
5563        assert_eq!(
5564            request
5565                .headers()
5566                .get("x-api-key")
5567                .unwrap()
5568                .to_str()
5569                .unwrap(),
5570            "my-secret"
5571        );
5572    }
5573
5574    #[test]
5575    fn apply_connection_auth_basic() {
5576        let conn = Connection {
5577            name: "basic-conn".to_string(),
5578            arn: "arn:aws:events:us-east-1:123456789012:connection/basic-conn/uuid".to_string(),
5579            description: None,
5580            authorization_type: "BASIC".to_string(),
5581            auth_parameters: json!({
5582                "BasicAuthParameters": {
5583                    "Username": "user",
5584                    "Password": "pass"
5585                }
5586            }),
5587            connection_state: "AUTHORIZED".to_string(),
5588            secret_arn: "arn:aws:secretsmanager:us-east-1:123456789012:secret:test".to_string(),
5589            creation_time: Utc::now(),
5590            last_modified_time: Utc::now(),
5591            last_authorized_time: Utc::now(),
5592        };
5593
5594        let client = reqwest::Client::new();
5595        let builder = client.post("http://localhost:12345/test");
5596        let builder = apply_connection_auth(builder, &conn);
5597
5598        let request = builder.body("{}").build().unwrap();
5599        let auth_header = request
5600            .headers()
5601            .get("authorization")
5602            .unwrap()
5603            .to_str()
5604            .unwrap();
5605        assert!(
5606            auth_header.starts_with("Basic "),
5607            "Expected Basic auth header, got: {auth_header}"
5608        );
5609    }
5610
5611    #[tokio::test]
5612    async fn put_events_with_api_destination_target_resolves_destination() {
5613        // This test verifies that the PutEvents code path correctly identifies
5614        // api-destination ARN targets and resolves the destination metadata.
5615        // The actual HTTP call goes to a non-existent host (fire-and-forget).
5616        let state = Arc::new(RwLock::new(
5617            fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
5618        ));
5619        let delivery = Arc::new(DeliveryBus::new());
5620        let svc = EventBridgeService::new(state, delivery);
5621
5622        // Create connection and api destination
5623        create_connection(&svc, "my-conn");
5624        let conn_arn = {
5625            let _mas = svc.state.read();
5626            let state = _mas.default_ref();
5627            state.connections.get("my-conn").unwrap().arn.clone()
5628        };
5629        let req = make_request(
5630            "CreateApiDestination",
5631            json!({
5632                "Name": "my-dest",
5633                "ConnectionArn": conn_arn,
5634                "InvocationEndpoint": "http://127.0.0.1:1/noop",
5635                "HttpMethod": "POST"
5636            }),
5637        );
5638        svc.create_api_destination(&req).unwrap();
5639
5640        let dest_arn = {
5641            let _mas = svc.state.read();
5642            let state = _mas.default_ref();
5643            state.api_destinations.get("my-dest").unwrap().arn.clone()
5644        };
5645
5646        // Create a rule that targets the api-destination
5647        let req = make_request(
5648            "PutRule",
5649            json!({
5650                "Name": "api-dest-rule",
5651                "EventPattern": r#"{"source":["test.app"]}"#,
5652                "State": "ENABLED"
5653            }),
5654        );
5655        svc.put_rule(&req).unwrap();
5656
5657        let req = make_request(
5658            "PutTargets",
5659            json!({
5660                "Rule": "api-dest-rule",
5661                "Targets": [{ "Id": "dest-target", "Arn": dest_arn }]
5662            }),
5663        );
5664        svc.put_targets(&req).unwrap();
5665
5666        // PutEvents - should match the rule and attempt delivery to ApiDestination
5667        let req = make_request(
5668            "PutEvents",
5669            json!({
5670                "Entries": [{
5671                    "Source": "test.app",
5672                    "DetailType": "TestEvent",
5673                    "Detail": r#"{"key":"value"}"#
5674                }]
5675            }),
5676        );
5677        let resp = svc.put_events(&req).unwrap();
5678        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5679        assert_eq!(body["FailedEntryCount"], 0);
5680        assert_eq!(body["Entries"].as_array().unwrap().len(), 1);
5681        assert!(body["Entries"][0]["EventId"].as_str().is_some());
5682    }
5683
5684    #[test]
5685    fn test_function_name_from_arn() {
5686        // Unqualified ARN
5687        assert_eq!(
5688            super::function_name_from_arn("arn:aws:lambda:us-east-1:123456789012:function:my-func"),
5689            "my-func"
5690        );
5691        // Qualified ARN with alias
5692        assert_eq!(
5693            super::function_name_from_arn(
5694                "arn:aws:lambda:us-east-1:123456789012:function:my-func:prod"
5695            ),
5696            "my-func"
5697        );
5698        // Qualified ARN with version
5699        assert_eq!(
5700            super::function_name_from_arn(
5701                "arn:aws:lambda:us-east-1:123456789012:function:my-func:42"
5702            ),
5703            "my-func"
5704        );
5705        // Plain function name (not an ARN)
5706        assert_eq!(super::function_name_from_arn("my-func"), "my-func");
5707    }
5708
5709    // ── Rules / targets / tags handler tests ────────────────────────
5710
5711    fn put_rule_simple(svc: &EventBridgeService, name: &str) {
5712        let req = make_request(
5713            "PutRule",
5714            json!({ "Name": name, "EventPattern": r#"{"source":["a"]}"# }),
5715        );
5716        svc.put_rule(&req).unwrap();
5717    }
5718
5719    #[test]
5720    fn put_rule_persists_event_pattern_and_state() {
5721        let svc = make_service();
5722        put_rule_simple(&svc, "r1");
5723        let _mas = svc.state.read();
5724        let state = _mas.default_ref();
5725        let rule = state
5726            .rules
5727            .get(&("default".to_string(), "r1".to_string()))
5728            .unwrap();
5729        assert_eq!(rule.state, "ENABLED");
5730        assert!(rule.event_pattern.is_some());
5731        assert!(rule.arn.contains("rule/r1"));
5732    }
5733
5734    #[test]
5735    fn put_rule_rejects_schedule_on_non_default_bus() {
5736        let svc = make_service();
5737        // Create a custom bus first.
5738        let bus_req = make_request("CreateEventBus", json!({ "Name": "custom" }));
5739        svc.create_event_bus(&bus_req).unwrap();
5740
5741        let req = make_request(
5742            "PutRule",
5743            json!({
5744                "Name": "r1",
5745                "EventBusName": "custom",
5746                "ScheduleExpression": "rate(5 minutes)"
5747            }),
5748        );
5749        let err = svc.put_rule(&req).err().expect("expected error");
5750        assert_eq!(err.code(), "ValidationException");
5751    }
5752
5753    #[test]
5754    fn put_rule_rejects_unknown_event_bus() {
5755        let svc = make_service();
5756        let req = make_request(
5757            "PutRule",
5758            json!({ "Name": "r1", "EventBusName": "ghost", "EventPattern": r#"{"source":["a"]}"# }),
5759        );
5760        let err = svc.put_rule(&req).err().expect("expected error");
5761        assert_eq!(err.code(), "ResourceNotFoundException");
5762    }
5763
5764    #[test]
5765    fn put_rule_overlay_preserves_existing_targets() {
5766        let svc = make_service();
5767        put_rule_simple(&svc, "r1");
5768        // Inject a target directly.
5769        {
5770            let mut _mas = svc.state.write();
5771            let state = _mas.default_mut();
5772            let rule = state
5773                .rules
5774                .get_mut(&("default".to_string(), "r1".to_string()))
5775                .unwrap();
5776            rule.targets.push(crate::state::EventTarget {
5777                id: "t1".to_string(),
5778                arn: "arn:aws:sqs:us-east-1:123456789012:q".to_string(),
5779                input: None,
5780                input_path: None,
5781                input_transformer: None,
5782                sqs_parameters: None,
5783            });
5784        }
5785
5786        // Re-PutRule with new description; targets should survive.
5787        let req = make_request(
5788            "PutRule",
5789            json!({ "Name": "r1", "Description": "updated", "EventPattern": r#"{"source":["a"]}"# }),
5790        );
5791        svc.put_rule(&req).unwrap();
5792        let _mas = svc.state.read();
5793        let state = _mas.default_ref();
5794        let rule = state
5795            .rules
5796            .get(&("default".to_string(), "r1".to_string()))
5797            .unwrap();
5798        assert_eq!(rule.description.as_deref(), Some("updated"));
5799        assert_eq!(rule.targets.len(), 1);
5800    }
5801
5802    #[test]
5803    fn delete_rule_with_targets_errors() {
5804        let svc = make_service();
5805        put_rule_simple(&svc, "r1");
5806        let put_targets_req = make_request(
5807            "PutTargets",
5808            json!({
5809                "Rule": "r1",
5810                "Targets": [{ "Id": "t1", "Arn": "arn:aws:sqs:us-east-1:123456789012:q" }]
5811            }),
5812        );
5813        svc.put_targets(&put_targets_req).unwrap();
5814
5815        let req = make_request("DeleteRule", json!({ "Name": "r1" }));
5816        let err = svc.delete_rule(&req).err().expect("expected error");
5817        assert_eq!(err.code(), "ValidationException");
5818    }
5819
5820    #[test]
5821    fn delete_rule_after_remove_targets_succeeds() {
5822        let svc = make_service();
5823        put_rule_simple(&svc, "r1");
5824        let put_t = make_request(
5825            "PutTargets",
5826            json!({
5827                "Rule": "r1",
5828                "Targets": [{ "Id": "t1", "Arn": "arn:aws:sqs:us-east-1:123456789012:q" }]
5829            }),
5830        );
5831        svc.put_targets(&put_t).unwrap();
5832        let rm_t = make_request("RemoveTargets", json!({ "Rule": "r1", "Ids": ["t1"] }));
5833        svc.remove_targets(&rm_t).unwrap();
5834        let del = make_request("DeleteRule", json!({ "Name": "r1" }));
5835        svc.delete_rule(&del).unwrap();
5836        assert!(!svc
5837            .state
5838            .read()
5839            .default_ref()
5840            .rules
5841            .contains_key(&("default".to_string(), "r1".to_string())));
5842    }
5843
5844    #[test]
5845    fn enable_disable_rule_toggles_state() {
5846        let svc = make_service();
5847        put_rule_simple(&svc, "r1");
5848        let dis = make_request("DisableRule", json!({ "Name": "r1" }));
5849        svc.disable_rule(&dis).unwrap();
5850        assert_eq!(
5851            svc.state
5852                .read()
5853                .default_ref()
5854                .rules
5855                .get(&("default".to_string(), "r1".to_string()))
5856                .unwrap()
5857                .state,
5858            "DISABLED"
5859        );
5860        let en = make_request("EnableRule", json!({ "Name": "r1" }));
5861        svc.enable_rule(&en).unwrap();
5862        assert_eq!(
5863            svc.state
5864                .read()
5865                .default_ref()
5866                .rules
5867                .get(&("default".to_string(), "r1".to_string()))
5868                .unwrap()
5869                .state,
5870            "ENABLED"
5871        );
5872    }
5873
5874    #[test]
5875    fn enable_rule_unknown_errors() {
5876        let svc = make_service();
5877        let req = make_request("EnableRule", json!({ "Name": "ghost" }));
5878        let err = svc.enable_rule(&req).err().expect("expected error");
5879        assert_eq!(err.code(), "ResourceNotFoundException");
5880    }
5881
5882    #[test]
5883    fn list_rules_with_name_prefix_filter() {
5884        let svc = make_service();
5885        put_rule_simple(&svc, "prod-orders");
5886        put_rule_simple(&svc, "prod-shipping");
5887        put_rule_simple(&svc, "dev-orders");
5888
5889        let req = make_request("ListRules", json!({ "NamePrefix": "prod-" }));
5890        let resp = svc.list_rules(&req).unwrap();
5891        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5892        let names: Vec<&str> = body["Rules"]
5893            .as_array()
5894            .unwrap()
5895            .iter()
5896            .map(|r| r["Name"].as_str().unwrap())
5897            .collect();
5898        assert_eq!(names.len(), 2);
5899        assert!(names.iter().all(|n| n.starts_with("prod-")));
5900    }
5901
5902    #[test]
5903    fn list_rules_pagination_emits_next_token() {
5904        let svc = make_service();
5905        for i in 0..5 {
5906            put_rule_simple(&svc, &format!("r{i}"));
5907        }
5908        let req = make_request("ListRules", json!({ "Limit": 2 }));
5909        let resp = svc.list_rules(&req).unwrap();
5910        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5911        assert_eq!(body["Rules"].as_array().unwrap().len(), 2);
5912        assert!(body["NextToken"].is_string());
5913    }
5914
5915    #[test]
5916    fn describe_rule_returns_persisted_fields() {
5917        let svc = make_service();
5918        let put = make_request(
5919            "PutRule",
5920            json!({
5921                "Name": "r1",
5922                "EventPattern": r#"{"source":["a"]}"#,
5923                "Description": "hi",
5924                "State": "DISABLED"
5925            }),
5926        );
5927        svc.put_rule(&put).unwrap();
5928        let desc = make_request("DescribeRule", json!({ "Name": "r1" }));
5929        let resp = svc.describe_rule(&desc).unwrap();
5930        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5931        assert_eq!(body["Name"], json!("r1"));
5932        assert_eq!(body["State"], json!("DISABLED"));
5933        assert_eq!(body["Description"], json!("hi"));
5934    }
5935
5936    #[test]
5937    fn describe_rule_unknown_errors() {
5938        let svc = make_service();
5939        let req = make_request("DescribeRule", json!({ "Name": "ghost" }));
5940        let err = svc.describe_rule(&req).err().expect("expected error");
5941        assert_eq!(err.code(), "ResourceNotFoundException");
5942    }
5943
5944    #[test]
5945    fn put_targets_rejects_fifo_without_sqs_parameters() {
5946        let svc = make_service();
5947        put_rule_simple(&svc, "r1");
5948        let req = make_request(
5949            "PutTargets",
5950            json!({
5951                "Rule": "r1",
5952                "Targets": [{ "Id": "t1", "Arn": "arn:aws:sqs:us-east-1:123456789012:q.fifo" }]
5953            }),
5954        );
5955        let err = svc.put_targets(&req).err().expect("expected error");
5956        assert_eq!(err.code(), "ValidationException");
5957    }
5958
5959    #[test]
5960    fn put_targets_rejects_invalid_arn() {
5961        let svc = make_service();
5962        put_rule_simple(&svc, "r1");
5963        let req = make_request(
5964            "PutTargets",
5965            json!({
5966                "Rule": "r1",
5967                "Targets": [{ "Id": "t1", "Arn": "not-an-arn" }]
5968            }),
5969        );
5970        let err = svc.put_targets(&req).err().expect("expected error");
5971        assert_eq!(err.code(), "ValidationException");
5972    }
5973
5974    #[test]
5975    fn put_targets_unknown_rule_errors() {
5976        let svc = make_service();
5977        let req = make_request(
5978            "PutTargets",
5979            json!({
5980                "Rule": "ghost",
5981                "Targets": [{ "Id": "t1", "Arn": "arn:aws:sqs:us-east-1:123456789012:q" }]
5982            }),
5983        );
5984        let err = svc.put_targets(&req).err().expect("expected error");
5985        assert_eq!(err.code(), "ResourceNotFoundException");
5986    }
5987
5988    #[test]
5989    fn put_targets_replaces_existing_with_same_id() {
5990        let svc = make_service();
5991        put_rule_simple(&svc, "r1");
5992        let first = make_request(
5993            "PutTargets",
5994            json!({
5995                "Rule": "r1",
5996                "Targets": [{ "Id": "t1", "Arn": "arn:aws:sqs:us-east-1:123456789012:q1" }]
5997            }),
5998        );
5999        svc.put_targets(&first).unwrap();
6000        let second = make_request(
6001            "PutTargets",
6002            json!({
6003                "Rule": "r1",
6004                "Targets": [{ "Id": "t1", "Arn": "arn:aws:sqs:us-east-1:123456789012:q2" }]
6005            }),
6006        );
6007        svc.put_targets(&second).unwrap();
6008
6009        let _mas = svc.state.read();
6010        let state = _mas.default_ref();
6011        let rule = state
6012            .rules
6013            .get(&("default".to_string(), "r1".to_string()))
6014            .unwrap();
6015        assert_eq!(rule.targets.len(), 1);
6016        assert!(rule.targets[0].arn.ends_with("q2"));
6017    }
6018
6019    #[test]
6020    fn list_targets_by_rule_returns_pagination_token() {
6021        let svc = make_service();
6022        put_rule_simple(&svc, "r1");
6023        for i in 0..4 {
6024            let req = make_request(
6025                "PutTargets",
6026                json!({
6027                    "Rule": "r1",
6028                    "Targets": [{
6029                        "Id": format!("t{i}"),
6030                        "Arn": format!("arn:aws:sqs:us-east-1:123456789012:q{i}")
6031                    }]
6032                }),
6033            );
6034            svc.put_targets(&req).unwrap();
6035        }
6036        let req = make_request("ListTargetsByRule", json!({ "Rule": "r1", "Limit": 2 }));
6037        let resp = svc.list_targets_by_rule(&req).unwrap();
6038        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6039        assert_eq!(body["Targets"].as_array().unwrap().len(), 2);
6040        assert!(body["NextToken"].is_string());
6041    }
6042
6043    #[test]
6044    fn list_rule_names_by_target_groups_by_arn() {
6045        let svc = make_service();
6046        put_rule_simple(&svc, "r1");
6047        put_rule_simple(&svc, "r2");
6048        for rule in ["r1", "r2"] {
6049            let req = make_request(
6050                "PutTargets",
6051                json!({
6052                    "Rule": rule,
6053                    "Targets": [{
6054                        "Id": "t1",
6055                        "Arn": "arn:aws:sqs:us-east-1:123456789012:shared"
6056                    }]
6057                }),
6058            );
6059            svc.put_targets(&req).unwrap();
6060        }
6061        let req = make_request(
6062            "ListRuleNamesByTarget",
6063            json!({ "TargetArn": "arn:aws:sqs:us-east-1:123456789012:shared" }),
6064        );
6065        let resp = svc.list_rule_names_by_target(&req).unwrap();
6066        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6067        let names: Vec<&str> = body["RuleNames"]
6068            .as_array()
6069            .unwrap()
6070            .iter()
6071            .map(|v| v.as_str().unwrap())
6072            .collect();
6073        assert_eq!(names, vec!["r1", "r2"]);
6074    }
6075
6076    // ── Tag operations ───────────────────────────────────────────────
6077
6078    #[test]
6079    fn tag_then_list_tags_for_rule() {
6080        let svc = make_service();
6081        put_rule_simple(&svc, "r1");
6082        let arn = svc
6083            .state
6084            .read()
6085            .default_ref()
6086            .rules
6087            .get(&("default".to_string(), "r1".to_string()))
6088            .unwrap()
6089            .arn
6090            .clone();
6091
6092        let tag_req = make_request(
6093            "TagResource",
6094            json!({
6095                "ResourceARN": arn,
6096                "Tags": [{ "Key": "env", "Value": "prod" }]
6097            }),
6098        );
6099        svc.tag_resource(&tag_req).unwrap();
6100
6101        let list_req = make_request("ListTagsForResource", json!({ "ResourceARN": arn }));
6102        let resp = svc.list_tags_for_resource(&list_req).unwrap();
6103        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6104        let tags = body["Tags"].as_array().unwrap();
6105        assert_eq!(tags.len(), 1);
6106        assert_eq!(tags[0]["Key"], json!("env"));
6107        assert_eq!(tags[0]["Value"], json!("prod"));
6108    }
6109
6110    #[test]
6111    fn untag_resource_removes_listed_keys() {
6112        let svc = make_service();
6113        put_rule_simple(&svc, "r1");
6114        let arn = svc
6115            .state
6116            .read()
6117            .default_ref()
6118            .rules
6119            .get(&("default".to_string(), "r1".to_string()))
6120            .unwrap()
6121            .arn
6122            .clone();
6123        let tag_req = make_request(
6124            "TagResource",
6125            json!({
6126                "ResourceARN": &arn,
6127                "Tags": [{ "Key": "env", "Value": "prod" }, { "Key": "team", "Value": "core" }]
6128            }),
6129        );
6130        svc.tag_resource(&tag_req).unwrap();
6131
6132        let untag = make_request(
6133            "UntagResource",
6134            json!({ "ResourceARN": &arn, "TagKeys": ["env"] }),
6135        );
6136        svc.untag_resource(&untag).unwrap();
6137
6138        let _mas = svc.state.read();
6139        let state = _mas.default_ref();
6140        let rule = state
6141            .rules
6142            .get(&("default".to_string(), "r1".to_string()))
6143            .unwrap();
6144        assert!(!rule.tags.contains_key("env"));
6145        assert_eq!(rule.tags.get("team").map(String::as_str), Some("core"));
6146    }
6147
6148    // ── TestEventPattern ─────────────────────────────────────────────
6149
6150    #[test]
6151    fn test_event_pattern_returns_result_field() {
6152        let svc = make_service();
6153        let req = make_request(
6154            "TestEventPattern",
6155            json!({
6156                "EventPattern": r#"{"source":["my.app"]}"#,
6157                "Event": r#"{"source":"my.app","detail-type":"x","detail":{}}"#
6158            }),
6159        );
6160        let resp = svc.test_event_pattern(&req).unwrap();
6161        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6162        assert_eq!(body["Result"], json!(true));
6163    }
6164
6165    // ── Event bus describe / delete ──────────────────────────────────
6166
6167    #[test]
6168    fn describe_event_bus_default_returns_arn() {
6169        let svc = make_service();
6170        let req = make_request("DescribeEventBus", json!({}));
6171        let resp = svc.describe_event_bus(&req).unwrap();
6172        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6173        assert_eq!(body["Name"], json!("default"));
6174        assert!(body["Arn"].as_str().unwrap().contains("event-bus/default"));
6175    }
6176
6177    #[test]
6178    fn delete_event_bus_default_fails() {
6179        let svc = make_service();
6180        let req = make_request("DeleteEventBus", json!({ "Name": "default" }));
6181        let err = svc.delete_event_bus(&req).err().expect("expected error");
6182        assert_eq!(err.code(), "ValidationException");
6183    }
6184
6185    // ── Error branch tests ──
6186
6187    #[test]
6188    fn describe_rule_not_found() {
6189        let svc = make_service();
6190        let req = make_request("DescribeRule", json!({"Name": "nonexistent"}));
6191        let err = svc.describe_rule(&req).err().expect("expected error");
6192        assert_eq!(err.code(), "ResourceNotFoundException");
6193    }
6194
6195    #[test]
6196    fn delete_rule_nonexistent_is_noop() {
6197        let svc = make_service();
6198        let req = make_request("DeleteRule", json!({"Name": "nope"}));
6199        // EventBridge returns success for deleting nonexistent rules
6200        svc.delete_rule(&req).unwrap();
6201    }
6202
6203    #[test]
6204    fn put_targets_rule_not_found() {
6205        let svc = make_service();
6206        let req = make_request(
6207            "PutTargets",
6208            json!({"Rule": "ghost", "Targets": [{"Id": "t1", "Arn": "arn:a"}]}),
6209        );
6210        let err = svc.put_targets(&req).err().expect("expected error");
6211        assert_eq!(err.code(), "ResourceNotFoundException");
6212    }
6213
6214    #[test]
6215    fn remove_targets_rule_not_found() {
6216        let svc = make_service();
6217        let req = make_request("RemoveTargets", json!({"Rule": "ghost", "Ids": ["t1"]}));
6218        let err = svc.remove_targets(&req).err().expect("expected error");
6219        assert_eq!(err.code(), "ResourceNotFoundException");
6220    }
6221
6222    #[test]
6223    fn list_targets_by_rule_not_found() {
6224        let svc = make_service();
6225        let req = make_request("ListTargetsByRule", json!({"Rule": "ghost"}));
6226        let err = svc
6227            .list_targets_by_rule(&req)
6228            .err()
6229            .expect("expected error");
6230        assert_eq!(err.code(), "ResourceNotFoundException");
6231    }
6232
6233    #[test]
6234    fn enable_rule_not_found() {
6235        let svc = make_service();
6236        let req = make_request("EnableRule", json!({"Name": "ghost"}));
6237        let err = svc.enable_rule(&req).err().expect("expected error");
6238        assert_eq!(err.code(), "ResourceNotFoundException");
6239    }
6240
6241    #[test]
6242    fn disable_rule_not_found() {
6243        let svc = make_service();
6244        let req = make_request("DisableRule", json!({"Name": "ghost"}));
6245        let err = svc.disable_rule(&req).err().expect("expected error");
6246        assert_eq!(err.code(), "ResourceNotFoundException");
6247    }
6248
6249    #[test]
6250    fn describe_event_bus_not_found() {
6251        let svc = make_service();
6252        let req = make_request("DescribeEventBus", json!({"Name": "nonexistent-bus"}));
6253        let err = svc.describe_event_bus(&req).err().expect("expected error");
6254        assert_eq!(err.code(), "ResourceNotFoundException");
6255    }
6256
6257    #[test]
6258    fn tag_resource_not_found() {
6259        let svc = make_service();
6260        let req = make_request(
6261            "TagResource",
6262            json!({"ResourceARN": "arn:aws:events:us-east-1:123:nope", "Tags": [{"Key": "k", "Value": "v"}]}),
6263        );
6264        let err = svc.tag_resource(&req).err().expect("expected error");
6265        assert_eq!(err.code(), "ResourceNotFoundException");
6266    }
6267
6268    #[test]
6269    fn untag_resource_not_found() {
6270        let svc = make_service();
6271        let req = make_request(
6272            "UntagResource",
6273            json!({"ResourceARN": "arn:aws:events:us-east-1:123:nope", "TagKeys": ["k"]}),
6274        );
6275        let err = svc.untag_resource(&req).err().expect("expected error");
6276        assert_eq!(err.code(), "ResourceNotFoundException");
6277    }
6278
6279    #[test]
6280    fn describe_archive_not_found() {
6281        let svc = make_service();
6282        let req = make_request("DescribeArchive", json!({"ArchiveName": "ghost"}));
6283        let err = svc.describe_archive(&req).err().expect("expected error");
6284        assert_eq!(err.code(), "ResourceNotFoundException");
6285    }
6286
6287    #[test]
6288    fn delete_archive_not_found() {
6289        let svc = make_service();
6290        let req = make_request("DeleteArchive", json!({"ArchiveName": "ghost"}));
6291        let err = svc.delete_archive(&req).err().expect("expected error");
6292        assert_eq!(err.code(), "ResourceNotFoundException");
6293    }
6294
6295    #[test]
6296    fn describe_connection_not_found() {
6297        let svc = make_service();
6298        let req = make_request("DescribeConnection", json!({"Name": "ghost"}));
6299        let err = svc.describe_connection(&req).err().expect("expected error");
6300        assert_eq!(err.code(), "ResourceNotFoundException");
6301    }
6302
6303    #[test]
6304    fn describe_api_destination_not_found() {
6305        let svc = make_service();
6306        let req = make_request("DescribeApiDestination", json!({"Name": "ghost"}));
6307        let err = svc
6308            .describe_api_destination(&req)
6309            .err()
6310            .expect("expected error");
6311        assert_eq!(err.code(), "ResourceNotFoundException");
6312    }
6313
6314    #[test]
6315    fn describe_replay_not_found() {
6316        let svc = make_service();
6317        let req = make_request("DescribeReplay", json!({"ReplayName": "ghost"}));
6318        let err = svc.describe_replay(&req).err().expect("expected error");
6319        assert_eq!(err.code(), "ResourceNotFoundException");
6320    }
6321
6322    #[test]
6323    fn create_event_bus_duplicate() {
6324        let svc = make_service();
6325        let req = make_request("CreateEventBus", json!({"Name": "dup-bus"}));
6326        svc.create_event_bus(&req).unwrap();
6327        let err = svc.create_event_bus(&req).err().expect("expected error");
6328        assert_eq!(err.code(), "ResourceAlreadyExistsException");
6329    }
6330
6331    // ── Rule lifecycle ──
6332
6333    #[test]
6334    fn rule_put_describe_enable_disable_delete() {
6335        let svc = make_service();
6336        svc.put_rule(&make_request(
6337            "PutRule",
6338            json!({"Name": "my-rule", "EventPattern": "{\"source\":[\"aws.s3\"]}", "State": "ENABLED"}),
6339        ))
6340        .unwrap();
6341
6342        let resp = svc
6343            .describe_rule(&make_request("DescribeRule", json!({"Name": "my-rule"})))
6344            .unwrap();
6345        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6346        assert_eq!(body["State"], "ENABLED");
6347
6348        svc.disable_rule(&make_request("DisableRule", json!({"Name": "my-rule"})))
6349            .unwrap();
6350        svc.enable_rule(&make_request("EnableRule", json!({"Name": "my-rule"})))
6351            .unwrap();
6352        svc.delete_rule(&make_request("DeleteRule", json!({"Name": "my-rule"})))
6353            .unwrap();
6354    }
6355
6356    #[test]
6357    fn list_rules_returns_created() {
6358        let svc = make_service();
6359        for name in &["r1", "r2", "r3"] {
6360            svc.put_rule(&make_request(
6361                "PutRule",
6362                json!({"Name": name, "EventPattern": "{\"source\":[\"aws.s3\"]}"}),
6363            ))
6364            .unwrap();
6365        }
6366        let resp = svc
6367            .list_rules(&make_request("ListRules", json!({})))
6368            .unwrap();
6369        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6370        assert_eq!(body["Rules"].as_array().unwrap().len(), 3);
6371    }
6372
6373    // ── Targets ──
6374
6375    #[test]
6376    fn put_list_remove_targets() {
6377        let svc = make_service();
6378        svc.put_rule(&make_request(
6379            "PutRule",
6380            json!({"Name": "tr", "EventPattern": "{\"source\":[\"aws.s3\"]}"}),
6381        ))
6382        .unwrap();
6383
6384        svc.put_targets(&make_request(
6385            "PutTargets",
6386            json!({
6387                "Rule": "tr",
6388                "Targets": [
6389                    {"Id": "t1", "Arn": "arn:aws:sqs:us-east-1:123456789012:q1"},
6390                    {"Id": "t2", "Arn": "arn:aws:lambda:us-east-1:123456789012:function:fn1"},
6391                ]
6392            }),
6393        ))
6394        .unwrap();
6395
6396        let resp = svc
6397            .list_targets_by_rule(&make_request("ListTargetsByRule", json!({"Rule": "tr"})))
6398            .unwrap();
6399        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6400        assert_eq!(body["Targets"].as_array().unwrap().len(), 2);
6401
6402        svc.remove_targets(&make_request(
6403            "RemoveTargets",
6404            json!({"Rule": "tr", "Ids": ["t1"]}),
6405        ))
6406        .unwrap();
6407
6408        let resp = svc
6409            .list_targets_by_rule(&make_request("ListTargetsByRule", json!({"Rule": "tr"})))
6410            .unwrap();
6411        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6412        assert_eq!(body["Targets"].as_array().unwrap().len(), 1);
6413    }
6414
6415    // ── PutEvents ──
6416
6417    #[test]
6418    fn put_events_basic() {
6419        let svc = make_service();
6420        let resp = svc
6421            .put_events(&make_request(
6422                "PutEvents",
6423                json!({
6424                    "Entries": [
6425                        {"Source": "aws.s3", "DetailType": "Object Created", "Detail": "{}"},
6426                    ]
6427                }),
6428            ))
6429            .unwrap();
6430        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6431        assert_eq!(body["FailedEntryCount"], 0);
6432    }
6433
6434    // ── Archives ──
6435
6436    #[test]
6437    fn archive_create_describe_list_delete() {
6438        let svc = make_service();
6439
6440        svc.create_archive(&make_request(
6441            "CreateArchive",
6442            json!({
6443                "ArchiveName": "my-archive",
6444                "EventSourceArn": "arn:aws:events:us-east-1:123456789012:event-bus/default",
6445            }),
6446        ))
6447        .unwrap();
6448
6449        let resp = svc
6450            .describe_archive(&make_request(
6451                "DescribeArchive",
6452                json!({"ArchiveName": "my-archive"}),
6453            ))
6454            .unwrap();
6455        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6456        assert_eq!(body["ArchiveName"], "my-archive");
6457
6458        let resp = svc
6459            .list_archives(&make_request("ListArchives", json!({})))
6460            .unwrap();
6461        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6462        assert!(!body["Archives"].as_array().unwrap().is_empty());
6463
6464        svc.delete_archive(&make_request(
6465            "DeleteArchive",
6466            json!({"ArchiveName": "my-archive"}),
6467        ))
6468        .unwrap();
6469    }
6470
6471    // ── Connections ──
6472
6473    #[test]
6474    fn connection_create_list_describe_deauthorize() {
6475        let svc = make_service();
6476
6477        svc.create_connection(&make_request(
6478            "CreateConnection",
6479            json!({
6480                "Name": "my-conn",
6481                "AuthorizationType": "API_KEY",
6482                "AuthParameters": {
6483                    "ApiKeyAuthParameters": {"ApiKeyName": "x-key", "ApiKeyValue": "secret"}
6484                }
6485            }),
6486        ))
6487        .unwrap();
6488
6489        let resp = svc
6490            .list_connections(&make_request("ListConnections", json!({})))
6491            .unwrap();
6492        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6493        assert!(!body["Connections"].as_array().unwrap().is_empty());
6494
6495        svc.describe_connection(&make_request(
6496            "DescribeConnection",
6497            json!({"Name": "my-conn"}),
6498        ))
6499        .unwrap();
6500        svc.deauthorize_connection(&make_request(
6501            "DeauthorizeConnection",
6502            json!({"Name": "my-conn"}),
6503        ))
6504        .unwrap();
6505    }
6506
6507    // ── Event bus list ──
6508
6509    #[test]
6510    fn list_event_buses_returns_default_and_custom() {
6511        let svc = make_service();
6512        svc.create_event_bus(&make_request(
6513            "CreateEventBus",
6514            json!({"Name": "custom-bus"}),
6515        ))
6516        .unwrap();
6517
6518        let resp = svc
6519            .list_event_buses(&make_request("ListEventBuses", json!({})))
6520            .unwrap();
6521        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6522        let names: Vec<&str> = body["EventBuses"]
6523            .as_array()
6524            .unwrap()
6525            .iter()
6526            .map(|v| v["Name"].as_str().unwrap())
6527            .collect();
6528        assert!(names.contains(&"default"));
6529        assert!(names.contains(&"custom-bus"));
6530    }
6531
6532    // ── Tags ──
6533
6534    #[test]
6535    fn tag_list_untag_rule_resource() {
6536        let svc = make_service();
6537        svc.put_rule(&make_request(
6538            "PutRule",
6539            json!({"Name": "tagged-rule", "EventPattern": "{\"source\":[\"aws.s3\"]}"}),
6540        ))
6541        .unwrap();
6542
6543        let arn = "arn:aws:events:us-east-1:123456789012:rule/tagged-rule";
6544
6545        svc.tag_resource(&make_request(
6546            "TagResource",
6547            json!({"ResourceARN": arn, "Tags": [{"Key": "env", "Value": "prod"}]}),
6548        ))
6549        .unwrap();
6550
6551        let resp = svc
6552            .list_tags_for_resource(&make_request(
6553                "ListTagsForResource",
6554                json!({"ResourceARN": arn}),
6555            ))
6556            .unwrap();
6557        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6558        assert_eq!(body["Tags"].as_array().unwrap().len(), 1);
6559
6560        svc.untag_resource(&make_request(
6561            "UntagResource",
6562            json!({"ResourceARN": arn, "TagKeys": ["env"]}),
6563        ))
6564        .unwrap();
6565    }
6566
6567    // ── put_permission / remove_permission ──
6568
6569    #[test]
6570    fn put_permission_with_policy_json() {
6571        let svc = make_service();
6572        let policy = r#"{"Version":"2012-10-17","Statement":[]}"#;
6573        let req = make_request("PutPermission", json!({"Policy": policy}));
6574        svc.put_permission(&req).unwrap();
6575    }
6576
6577    #[test]
6578    fn put_permission_invalid_action_errors() {
6579        let svc = make_service();
6580        let req = make_request(
6581            "PutPermission",
6582            json!({
6583                "Action": "events:NotARealAction",
6584                "Principal": "123456789012",
6585                "StatementId": "s1"
6586            }),
6587        );
6588        assert!(svc.put_permission(&req).is_err());
6589    }
6590
6591    #[test]
6592    fn put_permission_unknown_bus_errors() {
6593        let svc = make_service();
6594        let req = make_request(
6595            "PutPermission",
6596            json!({
6597                "EventBusName": "missing",
6598                "Action": "events:PutEvents",
6599                "Principal": "123456789012",
6600                "StatementId": "s1"
6601            }),
6602        );
6603        assert!(svc.put_permission(&req).is_err());
6604    }
6605
6606    #[test]
6607    fn put_permission_add_and_remove_statement() {
6608        let svc = make_service();
6609        let req = make_request(
6610            "PutPermission",
6611            json!({
6612                "Action": "events:PutEvents",
6613                "Principal": "123456789012",
6614                "StatementId": "s1"
6615            }),
6616        );
6617        svc.put_permission(&req).unwrap();
6618
6619        let req = make_request("RemovePermission", json!({"StatementId": "s1"}));
6620        svc.remove_permission(&req).unwrap();
6621    }
6622
6623    #[test]
6624    fn remove_permission_remove_all_flag() {
6625        let svc = make_service();
6626        let req = make_request(
6627            "PutPermission",
6628            json!({
6629                "Action": "events:PutEvents",
6630                "Principal": "123456789012",
6631                "StatementId": "s1"
6632            }),
6633        );
6634        svc.put_permission(&req).unwrap();
6635
6636        let req = make_request("RemovePermission", json!({"RemoveAllPermissions": true}));
6637        svc.remove_permission(&req).unwrap();
6638    }
6639
6640    #[test]
6641    fn remove_permission_unknown_bus_errors() {
6642        let svc = make_service();
6643        let req = make_request(
6644            "RemovePermission",
6645            json!({"EventBusName": "missing", "StatementId": "s1"}),
6646        );
6647        assert!(svc.remove_permission(&req).is_err());
6648    }
6649
6650    #[test]
6651    fn remove_permission_no_policy_errors() {
6652        let svc = make_service();
6653        let req = make_request("RemovePermission", json!({"StatementId": "s1"}));
6654        assert!(svc.remove_permission(&req).is_err());
6655    }
6656
6657    #[test]
6658    fn remove_permission_unknown_statement_errors() {
6659        let svc = make_service();
6660        svc.put_permission(&make_request(
6661            "PutPermission",
6662            json!({
6663                "Action": "events:PutEvents",
6664                "Principal": "123456789012",
6665                "StatementId": "s1"
6666            }),
6667        ))
6668        .unwrap();
6669
6670        let req = make_request("RemovePermission", json!({"StatementId": "ghost"}));
6671        assert!(svc.remove_permission(&req).is_err());
6672    }
6673
6674    // ── put_rule invalid schedule expression ──
6675
6676    #[test]
6677    fn put_rule_missing_name_errors() {
6678        let svc = make_service();
6679        let req = make_request("PutRule", json!({}));
6680        assert!(svc.put_rule(&req).is_err());
6681    }
6682
6683    #[test]
6684    fn put_rule_name_too_long_errors() {
6685        let svc = make_service();
6686        let name = "x".repeat(65);
6687        let req = make_request("PutRule", json!({"Name": name}));
6688        assert!(svc.put_rule(&req).is_err());
6689    }
6690
6691    #[test]
6692    fn put_rule_invalid_state_errors() {
6693        let svc = make_service();
6694        let req = make_request("PutRule", json!({"Name": "r1", "State": "BOGUS"}));
6695        assert!(svc.put_rule(&req).is_err());
6696    }
6697
6698    // ── create_connection variants ──
6699
6700    #[test]
6701    fn create_connection_api_key_auth() {
6702        let svc = make_service();
6703        let req = make_request(
6704            "CreateConnection",
6705            json!({
6706                "Name": "conn-apikey",
6707                "AuthorizationType": "API_KEY",
6708                "AuthParameters": {
6709                    "ApiKeyAuthParameters": {
6710                        "ApiKeyName": "X-Api-Key",
6711                        "ApiKeyValue": "secret"
6712                    }
6713                }
6714            }),
6715        );
6716        svc.create_connection(&req).unwrap();
6717    }
6718
6719    #[test]
6720    fn create_connection_basic_auth() {
6721        let svc = make_service();
6722        let req = make_request(
6723            "CreateConnection",
6724            json!({
6725                "Name": "conn-basic",
6726                "AuthorizationType": "BASIC",
6727                "AuthParameters": {
6728                    "BasicAuthParameters": {
6729                        "Username": "u",
6730                        "Password": "p"
6731                    }
6732                }
6733            }),
6734        );
6735        svc.create_connection(&req).unwrap();
6736    }
6737
6738    #[test]
6739    fn create_connection_missing_name_errors() {
6740        let svc = make_service();
6741        let req = make_request("CreateConnection", json!({"AuthorizationType": "API_KEY"}));
6742        assert!(svc.create_connection(&req).is_err());
6743    }
6744
6745    #[test]
6746    fn create_connection_missing_auth_type_errors() {
6747        let svc = make_service();
6748        let req = make_request("CreateConnection", json!({"Name": "c-noauth"}));
6749        assert!(svc.create_connection(&req).is_err());
6750    }
6751
6752    #[test]
6753    fn delete_connection_not_found() {
6754        let svc = make_service();
6755        let req = make_request("DeleteConnection", json!({"Name": "ghost"}));
6756        assert!(svc.delete_connection(&req).is_err());
6757    }
6758
6759    // ── api destination validation ──
6760
6761    #[test]
6762    fn create_api_destination_missing_name_errors() {
6763        let svc = make_service();
6764        let req = make_request(
6765            "CreateApiDestination",
6766            json!({
6767                "ConnectionArn": "arn:aws:events:us-east-1:123456789012:connection/c",
6768                "InvocationEndpoint": "https://example.com",
6769                "HttpMethod": "POST"
6770            }),
6771        );
6772        assert!(svc.create_api_destination(&req).is_err());
6773    }
6774
6775    #[test]
6776    fn create_api_destination_invalid_method_errors() {
6777        let svc = make_service();
6778        create_connection(&svc, "conn-m");
6779        let guard = svc.state.read();
6780        let st = guard.default_ref();
6781        let conn_arn = st
6782            .connections
6783            .get("conn-m")
6784            .map(|c| c.arn.clone())
6785            .unwrap_or_default();
6786        drop(guard);
6787
6788        let req = make_request(
6789            "CreateApiDestination",
6790            json!({
6791                "Name": "d1",
6792                "ConnectionArn": conn_arn,
6793                "InvocationEndpoint": "https://example.com",
6794                "HttpMethod": "FLY"
6795            }),
6796        );
6797        assert!(svc.create_api_destination(&req).is_err());
6798    }
6799
6800    #[test]
6801    fn delete_api_destination_not_found() {
6802        let svc = make_service();
6803        let req = make_request("DeleteApiDestination", json!({"Name": "ghost"}));
6804        assert!(svc.delete_api_destination(&req).is_err());
6805    }
6806
6807    // ── archive error paths ──
6808
6809    #[test]
6810    fn create_archive_missing_name_errors() {
6811        let svc = make_service();
6812        let req = make_request(
6813            "CreateArchive",
6814            json!({"EventSourceArn": "arn:aws:events:us-east-1:123456789012:event-bus/default"}),
6815        );
6816        assert!(svc.create_archive(&req).is_err());
6817    }
6818
6819    #[test]
6820    fn create_archive_missing_source_arn_errors() {
6821        let svc = make_service();
6822        let req = make_request("CreateArchive", json!({"ArchiveName": "arc1"}));
6823        assert!(svc.create_archive(&req).is_err());
6824    }
6825
6826    #[test]
6827    fn delete_archive_missing_errors() {
6828        let svc = make_service();
6829        let req = make_request("DeleteArchive", json!({"ArchiveName": "ghost"}));
6830        assert!(svc.delete_archive(&req).is_err());
6831    }
6832
6833    // ── replay error paths ──
6834
6835    #[test]
6836    fn cancel_replay_not_found() {
6837        let svc = make_service();
6838        let req = make_request("CancelReplay", json!({"ReplayName": "ghost"}));
6839        assert!(svc.cancel_replay(&req).is_err());
6840    }
6841
6842    // ── put_events empty ──
6843
6844    #[test]
6845    fn put_events_empty_entries_errors() {
6846        let svc = make_service();
6847        let req = make_request("PutEvents", json!({"Entries": []}));
6848        assert!(svc.put_events(&req).is_err());
6849    }
6850
6851    #[test]
6852    fn put_events_success_count() {
6853        let svc = make_service();
6854        let req = make_request(
6855            "PutEvents",
6856            json!({
6857                "Entries": [
6858                    {"Source": "my.app", "DetailType": "Test", "Detail": "{}"}
6859                ]
6860            }),
6861        );
6862        let resp = svc.put_events(&req).unwrap();
6863        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6864        assert_eq!(body["FailedEntryCount"], 0);
6865        assert_eq!(body["Entries"].as_array().unwrap().len(), 1);
6866    }
6867
6868    // ── list_tags_for_resource on unknown ARN ──
6869
6870    #[test]
6871    fn list_tags_for_resource_unknown_errors() {
6872        let svc = make_service();
6873        let req = make_request(
6874            "ListTagsForResource",
6875            json!({
6876                "ResourceARN": "arn:aws:events:us-east-1:123456789012:rule/ghost"
6877            }),
6878        );
6879        assert!(svc.list_tags_for_resource(&req).is_err());
6880    }
6881
6882    // ── describe_rule with EventBusName ──
6883
6884    #[test]
6885    fn describe_rule_custom_bus() {
6886        let svc = make_service();
6887        svc.create_event_bus(&make_request("CreateEventBus", json!({"Name": "cb"})))
6888            .unwrap();
6889
6890        svc.put_rule(&make_request(
6891            "PutRule",
6892            json!({
6893                "Name": "r-cb",
6894                "EventPattern": "{\"source\":[\"aws.s3\"]}",
6895                "EventBusName": "cb"
6896            }),
6897        ))
6898        .unwrap();
6899
6900        let resp = svc
6901            .describe_rule(&make_request(
6902                "DescribeRule",
6903                json!({"Name": "r-cb", "EventBusName": "cb"}),
6904            ))
6905            .unwrap();
6906        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6907        assert_eq!(body["Name"], "r-cb");
6908    }
6909
6910    // ── enable/disable rule on custom bus ──
6911
6912    #[test]
6913    fn disable_rule_on_custom_bus() {
6914        let svc = make_service();
6915        svc.create_event_bus(&make_request("CreateEventBus", json!({"Name": "dcb"})))
6916            .unwrap();
6917        svc.put_rule(&make_request(
6918            "PutRule",
6919            json!({
6920                "Name": "r-d",
6921                "EventPattern": "{\"source\":[\"s\"]}",
6922                "EventBusName": "dcb"
6923            }),
6924        ))
6925        .unwrap();
6926        svc.disable_rule(&make_request(
6927            "DisableRule",
6928            json!({"Name": "r-d", "EventBusName": "dcb"}),
6929        ))
6930        .unwrap();
6931    }
6932
6933    // ── describe_event_bus with custom bus ──
6934
6935    #[test]
6936    fn describe_event_bus_custom() {
6937        let svc = make_service();
6938        svc.create_event_bus(&make_request("CreateEventBus", json!({"Name": "deb"})))
6939            .unwrap();
6940        let resp = svc
6941            .describe_event_bus(&make_request("DescribeEventBus", json!({"Name": "deb"})))
6942            .unwrap();
6943        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6944        assert_eq!(body["Name"], "deb");
6945    }
6946
6947    #[test]
6948    fn list_event_buses_with_name_prefix() {
6949        let svc = make_service();
6950        for name in &["dev-x", "dev-y", "prod-z"] {
6951            svc.create_event_bus(&make_request("CreateEventBus", json!({"Name": name})))
6952                .unwrap();
6953        }
6954        let resp = svc
6955            .list_event_buses(&make_request(
6956                "ListEventBuses",
6957                json!({"NamePrefix": "dev-"}),
6958            ))
6959            .unwrap();
6960        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6961        assert_eq!(body["EventBuses"].as_array().unwrap().len(), 2);
6962    }
6963
6964    #[test]
6965    fn list_rules_on_custom_bus() {
6966        let svc = make_service();
6967        svc.create_event_bus(&make_request("CreateEventBus", json!({"Name": "lrcb"})))
6968            .unwrap();
6969        svc.put_rule(&make_request(
6970            "PutRule",
6971            json!({
6972                "Name": "r1",
6973                "EventPattern": "{\"source\":[\"s\"]}",
6974                "EventBusName": "lrcb"
6975            }),
6976        ))
6977        .unwrap();
6978
6979        let resp = svc
6980            .list_rules(&make_request("ListRules", json!({"EventBusName": "lrcb"})))
6981            .unwrap();
6982        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6983        assert_eq!(body["Rules"].as_array().unwrap().len(), 1);
6984    }
6985
6986    // ── put_targets on custom bus ──
6987
6988    #[test]
6989    fn put_targets_on_custom_bus() {
6990        let svc = make_service();
6991        svc.create_event_bus(&make_request("CreateEventBus", json!({"Name": "ptcb"})))
6992            .unwrap();
6993        svc.put_rule(&make_request(
6994            "PutRule",
6995            json!({
6996                "Name": "rt",
6997                "EventPattern": "{\"source\":[\"s\"]}",
6998                "EventBusName": "ptcb"
6999            }),
7000        ))
7001        .unwrap();
7002
7003        svc.put_targets(&make_request(
7004            "PutTargets",
7005            json!({
7006                "Rule": "rt",
7007                "EventBusName": "ptcb",
7008                "Targets": [{"Id": "t1", "Arn": "arn:aws:sqs:us-east-1:123456789012:q1"}]
7009            }),
7010        ))
7011        .unwrap();
7012    }
7013
7014    // ── remove_targets unknown target ids ──
7015
7016    #[test]
7017    fn remove_targets_unknown_ids_returns_failed() {
7018        let svc = make_service();
7019        svc.put_rule(&make_request(
7020            "PutRule",
7021            json!({"Name": "rmt", "EventPattern": "{\"source\":[\"s\"]}"}),
7022        ))
7023        .unwrap();
7024
7025        let resp = svc
7026            .remove_targets(&make_request(
7027                "RemoveTargets",
7028                json!({"Rule": "rmt", "Ids": ["ghost1", "ghost2"]}),
7029            ))
7030            .unwrap();
7031        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
7032        // Unknown ids are silently ok in many implementations; at least we hit the code path
7033        assert!(body.is_object());
7034    }
7035
7036    #[test]
7037    fn describe_event_source_unknown_errors() {
7038        let svc = make_service();
7039        let req = make_request("DescribeEventSource", json!({"Name": "ghost"}));
7040        assert!(svc.describe_event_source(&req).is_err());
7041    }
7042
7043    #[test]
7044    fn describe_partner_event_source_unknown_errors() {
7045        let svc = make_service();
7046        let req = make_request("DescribePartnerEventSource", json!({"Name": "ghost"}));
7047        assert!(svc.describe_partner_event_source(&req).is_err());
7048    }
7049
7050    #[test]
7051    fn list_partner_event_sources_empty_ok() {
7052        let svc = make_service();
7053        let req = make_request(
7054            "ListPartnerEventSources",
7055            json!({"NamePrefix": "aws.partner"}),
7056        );
7057        let resp = svc.list_partner_event_sources(&req).unwrap();
7058        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
7059        assert!(body["PartnerEventSources"].is_array());
7060    }
7061
7062    #[test]
7063    fn list_event_sources_empty_ok() {
7064        let svc = make_service();
7065        let req = make_request("ListEventSources", json!({}));
7066        let resp = svc.list_event_sources(&req).unwrap();
7067        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
7068        assert!(body["EventSources"].is_array());
7069    }
7070
7071    #[test]
7072    fn update_connection_unknown_errors() {
7073        let svc = make_service();
7074        let req = make_request(
7075            "UpdateConnection",
7076            json!({"Name": "ghost", "AuthorizationType": "API_KEY"}),
7077        );
7078        assert!(svc.update_connection(&req).is_err());
7079    }
7080
7081    #[test]
7082    fn describe_api_destination_unknown_errors() {
7083        let svc = make_service();
7084        let req = make_request("DescribeApiDestination", json!({"Name": "ghost"}));
7085        assert!(svc.describe_api_destination(&req).is_err());
7086    }
7087
7088    #[test]
7089    fn update_api_destination_unknown_errors() {
7090        let svc = make_service();
7091        let req = make_request("UpdateApiDestination", json!({"Name": "ghost"}));
7092        assert!(svc.update_api_destination(&req).is_err());
7093    }
7094
7095    #[test]
7096    fn update_archive_unknown_errors() {
7097        let svc = make_service();
7098        let req = make_request("UpdateArchive", json!({"ArchiveName": "ghost"}));
7099        assert!(svc.update_archive(&req).is_err());
7100    }
7101
7102    #[test]
7103    fn describe_archive_unknown_errors_b() {
7104        let svc = make_service();
7105        let req = make_request("DescribeArchive", json!({"ArchiveName": "ghost"}));
7106        assert!(svc.describe_archive(&req).is_err());
7107    }
7108
7109    #[test]
7110    fn list_archives_empty_ok() {
7111        let svc = make_service();
7112        let req = make_request("ListArchives", json!({}));
7113        let resp = svc.list_archives(&req).unwrap();
7114        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
7115        assert!(body["Archives"].is_array());
7116    }
7117
7118    #[test]
7119    fn list_replays_empty_ok() {
7120        let svc = make_service();
7121        let req = make_request("ListReplays", json!({}));
7122        let resp = svc.list_replays(&req).unwrap();
7123        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
7124        assert!(body["Replays"].is_array());
7125    }
7126
7127    #[test]
7128    fn describe_endpoint_unknown_errors() {
7129        let svc = make_service();
7130        let req = make_request("DescribeEndpoint", json!({"Name": "ghost"}));
7131        assert!(svc.describe_endpoint(&req).is_err());
7132    }
7133
7134    #[test]
7135    fn delete_endpoint_unknown_errors() {
7136        let svc = make_service();
7137        let req = make_request("DeleteEndpoint", json!({"Name": "ghost"}));
7138        assert!(svc.delete_endpoint(&req).is_err());
7139    }
7140
7141    #[test]
7142    fn list_endpoints_empty_ok() {
7143        let svc = make_service();
7144        let req = make_request("ListEndpoints", json!({}));
7145        let resp = svc.list_endpoints(&req).unwrap();
7146        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
7147        assert!(body["Endpoints"].is_array());
7148    }
7149}