Skip to main content

fakecloud_eventbridge/
service.rs

1use async_trait::async_trait;
2use chrono::{DateTime, Utc};
3use http::StatusCode;
4use serde_json::{json, Value};
5
6use std::collections::HashMap;
7use std::sync::Arc;
8
9use fakecloud_aws::arn::Arn;
10use fakecloud_core::delivery::DeliveryBus;
11use fakecloud_core::pagination::paginate;
12use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
13use fakecloud_core::validation::*;
14
15use fakecloud_lambda::runtime::ContainerRuntime;
16use fakecloud_lambda::state::{LambdaInvocation, SharedLambdaState};
17use fakecloud_logs::state::SharedLogsState;
18
19use crate::state::{
20    ApiDestination, Archive, Connection, Endpoint, EventBus, EventRule, EventTarget,
21    PartnerEventSource, PutEvent, Replay, SharedEventBridgeState,
22};
23
24pub struct EventBridgeService {
25    state: SharedEventBridgeState,
26    delivery: Arc<DeliveryBus>,
27    lambda_state: Option<SharedLambdaState>,
28    logs_state: Option<SharedLogsState>,
29    container_runtime: Option<Arc<ContainerRuntime>>,
30}
31
32impl EventBridgeService {
33    pub fn new(state: SharedEventBridgeState, delivery: Arc<DeliveryBus>) -> Self {
34        Self {
35            state,
36            delivery,
37            lambda_state: None,
38            logs_state: None,
39            container_runtime: None,
40        }
41    }
42
43    pub fn with_lambda(mut self, lambda_state: SharedLambdaState) -> Self {
44        self.lambda_state = Some(lambda_state);
45        self
46    }
47
48    pub fn with_logs(mut self, logs_state: SharedLogsState) -> Self {
49        self.logs_state = Some(logs_state);
50        self
51    }
52
53    pub fn with_runtime(mut self, runtime: Arc<ContainerRuntime>) -> Self {
54        self.container_runtime = Some(runtime);
55        self
56    }
57}
58
59#[async_trait]
60impl AwsService for EventBridgeService {
61    fn service_name(&self) -> &str {
62        "events"
63    }
64
65    async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
66        match req.action.as_str() {
67            "CreateEventBus" => self.create_event_bus(&req),
68            "DeleteEventBus" => self.delete_event_bus(&req),
69            "ListEventBuses" => self.list_event_buses(&req),
70            "DescribeEventBus" => self.describe_event_bus(&req),
71            "PutRule" => self.put_rule(&req),
72            "DeleteRule" => self.delete_rule(&req),
73            "ListRules" => self.list_rules(&req),
74            "DescribeRule" => self.describe_rule(&req),
75            "EnableRule" => self.enable_rule(&req),
76            "DisableRule" => self.disable_rule(&req),
77            "PutTargets" => self.put_targets(&req),
78            "RemoveTargets" => self.remove_targets(&req),
79            "ListTargetsByRule" => self.list_targets_by_rule(&req),
80            "ListRuleNamesByTarget" => self.list_rule_names_by_target(&req),
81            "PutEvents" => self.put_events(&req),
82            "PutPermission" => self.put_permission(&req),
83            "RemovePermission" => self.remove_permission(&req),
84            "TagResource" => self.tag_resource(&req),
85            "UntagResource" => self.untag_resource(&req),
86            "ListTagsForResource" => self.list_tags_for_resource(&req),
87            "CreateArchive" => self.create_archive(&req),
88            "DescribeArchive" => self.describe_archive(&req),
89            "ListArchives" => self.list_archives(&req),
90            "UpdateArchive" => self.update_archive(&req),
91            "DeleteArchive" => self.delete_archive(&req),
92            "CreateConnection" => self.create_connection(&req),
93            "DescribeConnection" => self.describe_connection(&req),
94            "ListConnections" => self.list_connections(&req),
95            "UpdateConnection" => self.update_connection(&req),
96            "DeleteConnection" => self.delete_connection(&req),
97            "CreateApiDestination" => self.create_api_destination(&req),
98            "DescribeApiDestination" => self.describe_api_destination(&req),
99            "ListApiDestinations" => self.list_api_destinations(&req),
100            "UpdateApiDestination" => self.update_api_destination(&req),
101            "DeleteApiDestination" => self.delete_api_destination(&req),
102            "StartReplay" => self.start_replay(&req),
103            "DescribeReplay" => self.describe_replay(&req),
104            "ListReplays" => self.list_replays(&req),
105            "CancelReplay" => self.cancel_replay(&req),
106            "CreatePartnerEventSource" => self.create_partner_event_source(&req),
107            "DeletePartnerEventSource" => self.delete_partner_event_source(&req),
108            "DescribePartnerEventSource" => self.describe_partner_event_source(&req),
109            "ListPartnerEventSources" => self.list_partner_event_sources(&req),
110            "ListPartnerEventSourceAccounts" => self.list_partner_event_source_accounts(&req),
111            "ActivateEventSource" => self.activate_event_source(&req),
112            "DeactivateEventSource" => self.deactivate_event_source(&req),
113            "DescribeEventSource" => self.describe_event_source(&req),
114            "ListEventSources" => self.list_event_sources(&req),
115            "PutPartnerEvents" => self.put_partner_events(&req),
116            "TestEventPattern" => self.test_event_pattern(&req),
117            "UpdateEventBus" => self.update_event_bus(&req),
118            "CreateEndpoint" => self.create_endpoint(&req),
119            "DeleteEndpoint" => self.delete_endpoint(&req),
120            "DescribeEndpoint" => self.describe_endpoint(&req),
121            "ListEndpoints" => self.list_endpoints(&req),
122            "UpdateEndpoint" => self.update_endpoint(&req),
123            "DeauthorizeConnection" => self.deauthorize_connection(&req),
124            _ => Err(AwsServiceError::action_not_implemented(
125                "events",
126                &req.action,
127            )),
128        }
129    }
130
131    fn supported_actions(&self) -> &[&str] {
132        &[
133            "CreateEventBus",
134            "DeleteEventBus",
135            "ListEventBuses",
136            "DescribeEventBus",
137            "PutRule",
138            "DeleteRule",
139            "ListRules",
140            "DescribeRule",
141            "EnableRule",
142            "DisableRule",
143            "PutTargets",
144            "RemoveTargets",
145            "ListTargetsByRule",
146            "ListRuleNamesByTarget",
147            "PutEvents",
148            "PutPermission",
149            "RemovePermission",
150            "TagResource",
151            "UntagResource",
152            "ListTagsForResource",
153            "CreateArchive",
154            "DescribeArchive",
155            "ListArchives",
156            "UpdateArchive",
157            "DeleteArchive",
158            "CreateConnection",
159            "DescribeConnection",
160            "ListConnections",
161            "UpdateConnection",
162            "DeleteConnection",
163            "CreateApiDestination",
164            "DescribeApiDestination",
165            "ListApiDestinations",
166            "UpdateApiDestination",
167            "DeleteApiDestination",
168            "StartReplay",
169            "DescribeReplay",
170            "ListReplays",
171            "CancelReplay",
172            "CreatePartnerEventSource",
173            "DeletePartnerEventSource",
174            "DescribePartnerEventSource",
175            "ListPartnerEventSources",
176            "ListPartnerEventSourceAccounts",
177            "ActivateEventSource",
178            "DeactivateEventSource",
179            "DescribeEventSource",
180            "ListEventSources",
181            "PutPartnerEvents",
182            "TestEventPattern",
183            "UpdateEventBus",
184            "CreateEndpoint",
185            "DeleteEndpoint",
186            "DescribeEndpoint",
187            "ListEndpoints",
188            "UpdateEndpoint",
189            "DeauthorizeConnection",
190        ]
191    }
192}
193
194fn parse_tags(body: &Value) -> HashMap<String, String> {
195    let mut tags = HashMap::new();
196    if let Some(arr) = body["Tags"].as_array() {
197        for tag in arr {
198            if let (Some(key), Some(val)) = (tag["Key"].as_str(), tag["Value"].as_str()) {
199                tags.insert(key.to_string(), val.to_string());
200            }
201        }
202    }
203    tags
204}
205
206fn parse_target(target: &Value) -> EventTarget {
207    EventTarget {
208        id: target["Id"].as_str().unwrap_or("").to_string(),
209        arn: target["Arn"].as_str().unwrap_or("").to_string(),
210        input: target["Input"].as_str().map(|s| s.to_string()),
211        input_path: target["InputPath"].as_str().map(|s| s.to_string()),
212        input_transformer: target.get("InputTransformer").cloned(),
213        sqs_parameters: target.get("SqsParameters").cloned(),
214    }
215}
216
217fn target_to_json(t: &EventTarget) -> Value {
218    let mut obj = json!({ "Id": t.id, "Arn": t.arn });
219    if let Some(ref input) = t.input {
220        obj["Input"] = json!(input);
221    }
222    if let Some(ref input_path) = t.input_path {
223        obj["InputPath"] = json!(input_path);
224    }
225    if let Some(ref it) = t.input_transformer {
226        obj["InputTransformer"] = it.clone();
227    }
228    if let Some(ref sp) = t.sqs_parameters {
229        obj["SqsParameters"] = sp.clone();
230    }
231    obj
232}
233
234// ─── Event Bus Operations ───────────────────────────────────────────
235impl EventBridgeService {
236    fn create_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
237        let body = req.json_body();
238        validate_required("Name", &body["Name"])?;
239        let name = body["Name"]
240            .as_str()
241            .ok_or_else(|| missing("Name"))?
242            .to_string();
243        validate_string_length("name", &name, 1, 256)?;
244        validate_optional_string_length(
245            "eventSourceName",
246            body["EventSourceName"].as_str(),
247            1,
248            256,
249        )?;
250        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
251        validate_optional_string_length(
252            "kmsKeyIdentifier",
253            body["KmsKeyIdentifier"].as_str(),
254            0,
255            2048,
256        )?;
257
258        // Validate name doesn't contain '/' (unless partner bus)
259        if name.contains('/') && !name.starts_with("aws.partner/") {
260            return Err(AwsServiceError::aws_error(
261                StatusCode::BAD_REQUEST,
262                "ValidationException",
263                "Event bus name must not contain '/'.",
264            ));
265        }
266
267        // Partner event bus validation
268        if name.starts_with("aws.partner/") {
269            let event_source = body["EventSourceName"].as_str().unwrap_or("");
270            let state_r = self.state.read();
271            let has_source = state_r.partner_event_sources.contains_key(event_source);
272            drop(state_r);
273            if !has_source {
274                return Err(AwsServiceError::aws_error(
275                    StatusCode::BAD_REQUEST,
276                    "ResourceNotFoundException",
277                    format!("Event source {event_source} does not exist."),
278                ));
279            }
280        }
281
282        let mut state = self.state.write();
283
284        if state.buses.contains_key(&name) {
285            return Err(AwsServiceError::aws_error(
286                StatusCode::BAD_REQUEST,
287                "ResourceAlreadyExistsException",
288                format!("Event bus {name} already exists."),
289            ));
290        }
291
292        let arn = format!(
293            "arn:aws:events:{}:{}:event-bus/{}",
294            req.region, state.account_id, name
295        );
296        let now = Utc::now();
297        let description = body["Description"].as_str().map(|s| s.to_string());
298        let kms_key_identifier = body["KmsKeyIdentifier"].as_str().map(|s| s.to_string());
299        let dead_letter_config = body.get("DeadLetterConfig").cloned();
300
301        let tags = parse_tags(&body);
302
303        let bus = EventBus {
304            name: name.clone(),
305            arn: arn.clone(),
306            tags,
307            policy: None,
308            description,
309            kms_key_identifier,
310            dead_letter_config,
311            creation_time: now,
312            last_modified_time: now,
313        };
314        state.buses.insert(name, bus);
315
316        Ok(AwsResponse::ok_json(json!({ "EventBusArn": arn })))
317    }
318
319    fn delete_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
320        let body = req.json_body();
321        validate_required("Name", &body["Name"])?;
322        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
323        validate_string_length("name", name, 1, 256)?;
324
325        if name == "default" {
326            return Err(AwsServiceError::aws_error(
327                StatusCode::BAD_REQUEST,
328                "ValidationException",
329                format!("Cannot delete event bus {name}."),
330            ));
331        }
332
333        let mut state = self.state.write();
334        state.buses.remove(name);
335        state.rules.retain(|k, _| k.0 != name);
336
337        Ok(AwsResponse::ok_json(json!({})))
338    }
339
340    fn list_event_buses(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
341        let body = req.json_body();
342        validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 256)?;
343        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
344        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
345        let name_prefix = body["NamePrefix"].as_str();
346        let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
347        if let Some(t) = body["NextToken"].as_str() {
348            t.parse::<usize>().map_err(|_| {
349                AwsServiceError::aws_error(
350                    StatusCode::BAD_REQUEST,
351                    "InvalidNextTokenException",
352                    format!("Invalid NextToken value: '{t}'"),
353                )
354            })?;
355        }
356
357        let state = self.state.read();
358        let filtered: Vec<&_> = state
359            .buses
360            .values()
361            .filter(|b| match name_prefix {
362                Some(prefix) => b.name.starts_with(prefix),
363                None => true,
364            })
365            .collect();
366
367        let (page, next_token) = paginate(&filtered, body["NextToken"].as_str(), limit);
368        let buses: Vec<Value> = page
369            .iter()
370            .map(|b| json!({ "Name": b.name, "Arn": b.arn }))
371            .collect();
372        let mut resp = json!({ "EventBuses": buses });
373        if let Some(token) = next_token {
374            resp["NextToken"] = json!(token);
375        }
376
377        Ok(AwsResponse::ok_json(resp))
378    }
379
380    fn describe_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
381        let body = req.json_body();
382        validate_optional_string_length("name", body["Name"].as_str(), 1, 1600)?;
383        let name = body["Name"].as_str().unwrap_or("default");
384
385        let state = self.state.read();
386        let bus = state.buses.get(name).ok_or_else(|| {
387            AwsServiceError::aws_error(
388                StatusCode::BAD_REQUEST,
389                "ResourceNotFoundException",
390                format!("Event bus {name} does not exist."),
391            )
392        })?;
393
394        let mut resp = json!({
395            "Name": bus.name,
396            "Arn": bus.arn,
397            "CreationTime": bus.creation_time.timestamp() as f64,
398            "LastModifiedTime": bus.last_modified_time.timestamp() as f64,
399        });
400
401        if let Some(ref policy) = bus.policy {
402            resp["Policy"] = Value::String(serde_json::to_string(policy).unwrap());
403        }
404        if let Some(ref desc) = bus.description {
405            resp["Description"] = json!(desc);
406        }
407        if let Some(ref kms) = bus.kms_key_identifier {
408            resp["KmsKeyIdentifier"] = json!(kms);
409        }
410        if let Some(ref dlc) = bus.dead_letter_config {
411            resp["DeadLetterConfig"] = dlc.clone();
412        }
413
414        Ok(AwsResponse::ok_json(resp))
415    }
416
417    // ─── Permission Operations ──────────────────────────────────────────
418
419    fn put_permission(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
420        let body = req.json_body();
421        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 256)?;
422        validate_optional_string_length("action", body["Action"].as_str(), 1, 64)?;
423        validate_optional_string_length("principal", body["Principal"].as_str(), 1, 12)?;
424        validate_optional_string_length("statementId", body["StatementId"].as_str(), 1, 64)?;
425        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
426
427        let mut state = self.state.write();
428
429        let bus = state.buses.get_mut(event_bus_name).ok_or_else(|| {
430            AwsServiceError::aws_error(
431                StatusCode::BAD_REQUEST,
432                "ResourceNotFoundException",
433                format!("Event bus {event_bus_name} does not exist."),
434            )
435        })?;
436
437        // Check if Policy is provided (new-style)
438        if let Some(policy_str) = body["Policy"].as_str() {
439            if let Ok(policy) = serde_json::from_str::<Value>(policy_str) {
440                bus.policy = Some(policy);
441                return Ok(AwsResponse::ok_json(json!({})));
442            }
443        }
444
445        // Old-style: Action, Principal, StatementId
446        let action = body["Action"].as_str().unwrap_or("");
447        let principal = body["Principal"].as_str().unwrap_or("");
448        let statement_id = body["StatementId"].as_str().unwrap_or("");
449
450        // Validate action
451        if action != "events:PutEvents" {
452            return Err(AwsServiceError::aws_error(
453                StatusCode::BAD_REQUEST,
454                "ValidationException",
455                "Provided value in parameter 'action' is not supported.",
456            ));
457        }
458
459        let statement = json!({
460            "Sid": statement_id,
461            "Effect": "Allow",
462            "Principal": { "AWS": Arn::global("iam", principal, "root").to_string() },
463            "Action": action,
464            "Resource": bus.arn,
465        });
466
467        let policy = bus.policy.get_or_insert_with(|| {
468            json!({
469                "Version": "2012-10-17",
470                "Statement": [],
471            })
472        });
473
474        if let Some(stmts) = policy["Statement"].as_array_mut() {
475            stmts.push(statement);
476        }
477
478        Ok(AwsResponse::ok_json(json!({})))
479    }
480
481    fn remove_permission(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
482        let body = req.json_body();
483        validate_optional_string_length("statementId", body["StatementId"].as_str(), 1, 64)?;
484        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 256)?;
485        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
486        let statement_id = body["StatementId"].as_str().unwrap_or("");
487        let remove_all = body["RemoveAllPermissions"].as_bool().unwrap_or(false);
488
489        let mut state = self.state.write();
490
491        let bus = state.buses.get_mut(event_bus_name).ok_or_else(|| {
492            AwsServiceError::aws_error(
493                StatusCode::BAD_REQUEST,
494                "ResourceNotFoundException",
495                format!("Event bus {event_bus_name} does not exist."),
496            )
497        })?;
498
499        if remove_all {
500            bus.policy = None;
501            return Ok(AwsResponse::ok_json(json!({})));
502        }
503
504        let policy = bus.policy.as_mut().ok_or_else(|| {
505            AwsServiceError::aws_error(
506                StatusCode::BAD_REQUEST,
507                "ResourceNotFoundException",
508                "EventBus does not have a policy.",
509            )
510        })?;
511
512        if let Some(stmts) = policy["Statement"].as_array_mut() {
513            let before = stmts.len();
514            stmts.retain(|s| s["Sid"].as_str() != Some(statement_id));
515            if stmts.len() == before {
516                return Err(AwsServiceError::aws_error(
517                    StatusCode::BAD_REQUEST,
518                    "ResourceNotFoundException",
519                    "Statement with the provided id does not exist.",
520                ));
521            }
522            if stmts.is_empty() {
523                bus.policy = None;
524            }
525        }
526
527        Ok(AwsResponse::ok_json(json!({})))
528    }
529
530    // ─── Rule Operations ────────────────────────────────────────────────
531
532    fn put_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
533        let body = req.json_body();
534        validate_required("Name", &body["Name"])?;
535        let name = body["Name"]
536            .as_str()
537            .ok_or_else(|| missing("Name"))?
538            .to_string();
539        validate_string_length("name", &name, 1, 64)?;
540        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
541        validate_optional_string_length(
542            "scheduleExpression",
543            body["ScheduleExpression"].as_str(),
544            0,
545            256,
546        )?;
547        validate_optional_string_length("eventPattern", body["EventPattern"].as_str(), 0, 4096)?;
548        validate_optional_enum(
549            "state",
550            body["State"].as_str(),
551            &[
552                "ENABLED",
553                "DISABLED",
554                "ENABLED_WITH_ALL_CLOUDTRAIL_MANAGEMENT_EVENTS",
555            ],
556        )?;
557        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
558        validate_optional_string_length("roleArn", body["RoleArn"].as_str(), 1, 1600)?;
559
560        let raw_bus = body["EventBusName"]
561            .as_str()
562            .unwrap_or("default")
563            .to_string();
564
565        let mut state = self.state.write();
566        let event_bus_name = state.resolve_bus_name(&raw_bus);
567
568        let event_pattern = body["EventPattern"].as_str().and_then(|s| {
569            if s.is_empty() {
570                None
571            } else {
572                Some(s.to_string())
573            }
574        });
575        let schedule_expression = body["ScheduleExpression"].as_str().and_then(|s| {
576            if s.is_empty() {
577                None
578            } else {
579                Some(s.to_string())
580            }
581        });
582        let description = body["Description"].as_str().map(|s| s.to_string());
583        let role_arn = body["RoleArn"].as_str().map(|s| s.to_string());
584        let rule_state = body["State"].as_str().unwrap_or("ENABLED").to_string();
585
586        // Validate: schedule expressions only on default bus
587        if schedule_expression.is_some() && event_bus_name != "default" {
588            return Err(AwsServiceError::aws_error(
589                StatusCode::BAD_REQUEST,
590                "ValidationException",
591                "ScheduleExpression is supported only on the default event bus.",
592            ));
593        }
594
595        if !state.buses.contains_key(&event_bus_name) {
596            return Err(AwsServiceError::aws_error(
597                StatusCode::BAD_REQUEST,
598                "ResourceNotFoundException",
599                format!("Event bus {event_bus_name} does not exist."),
600            ));
601        }
602
603        let arn = if event_bus_name == "default" {
604            format!(
605                "arn:aws:events:{}:{}:rule/{}",
606                req.region, state.account_id, name
607            )
608        } else {
609            format!(
610                "arn:aws:events:{}:{}:rule/{}/{}",
611                req.region, state.account_id, event_bus_name, name
612            )
613        };
614
615        let key = (event_bus_name.clone(), name.clone());
616        let targets = state
617            .rules
618            .get(&key)
619            .map(|r| r.targets.clone())
620            .unwrap_or_default();
621
622        let tags = parse_tags(&body);
623
624        let rule = EventRule {
625            name: name.clone(),
626            arn: arn.clone(),
627            event_bus_name,
628            event_pattern,
629            schedule_expression,
630            state: rule_state,
631            description,
632            role_arn,
633            managed_by: None,
634            created_by: None,
635            targets,
636            tags,
637            last_fired: None,
638        };
639
640        state.rules.insert(key, rule);
641        Ok(AwsResponse::ok_json(json!({ "RuleArn": arn })))
642    }
643
644    fn delete_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
645        let body = req.json_body();
646        validate_required("Name", &body["Name"])?;
647        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
648        validate_string_length("name", name, 1, 64)?;
649        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
650        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
651
652        let mut state = self.state.write();
653        let bus_name = state.resolve_bus_name(event_bus_name);
654        let key = (bus_name, name.to_string());
655
656        // Check if rule has targets
657        if let Some(rule) = state.rules.get(&key) {
658            if !rule.targets.is_empty() {
659                return Err(AwsServiceError::aws_error(
660                    StatusCode::BAD_REQUEST,
661                    "ValidationException",
662                    "Rule can't be deleted since it has targets.",
663                ));
664            }
665        }
666
667        state.rules.remove(&key);
668        Ok(AwsResponse::ok_json(json!({})))
669    }
670
671    fn list_rules(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
672        let body = req.json_body();
673        validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 64)?;
674        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
675        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
676        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
677        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
678        let name_prefix = body["NamePrefix"].as_str();
679        let limit = body["Limit"].as_u64().map(|n| n as usize);
680        let next_token = body["NextToken"].as_str();
681
682        let state = self.state.read();
683        let bus_name = state.resolve_bus_name(event_bus_name);
684
685        let mut rules: Vec<&EventRule> = state
686            .rules
687            .values()
688            .filter(|r| r.event_bus_name == bus_name)
689            .filter(|r| match name_prefix {
690                Some(prefix) => r.name.starts_with(prefix),
691                None => true,
692            })
693            .collect();
694        rules.sort_by(|a, b| a.name.cmp(&b.name));
695
696        // Pagination
697        let start = next_token
698            .and_then(|t| t.parse::<usize>().ok())
699            .unwrap_or(0)
700            .min(rules.len());
701        let rules_slice = &rules[start..];
702
703        let (page, new_next_token) = if let Some(lim) = limit {
704            if rules_slice.len() > lim {
705                (&rules_slice[..lim], Some((start + lim).to_string()))
706            } else {
707                (rules_slice, None)
708            }
709        } else {
710            (rules_slice, None)
711        };
712
713        let rules_json: Vec<Value> = page
714            .iter()
715            .map(|r| {
716                let mut obj = json!({
717                    "Name": r.name,
718                    "Arn": r.arn,
719                    "EventBusName": r.event_bus_name,
720                    "State": r.state,
721                });
722                if let Some(ref desc) = r.description {
723                    obj["Description"] = json!(desc);
724                }
725                if let Some(ref ep) = r.event_pattern {
726                    obj["EventPattern"] = json!(ep);
727                }
728                if let Some(ref se) = r.schedule_expression {
729                    obj["ScheduleExpression"] = json!(se);
730                }
731                if let Some(ref mb) = r.managed_by {
732                    obj["ManagedBy"] = json!(mb);
733                }
734                obj
735            })
736            .collect();
737
738        let mut resp = json!({ "Rules": rules_json });
739        if let Some(token) = new_next_token {
740            resp["NextToken"] = json!(token);
741        }
742
743        Ok(AwsResponse::ok_json(resp))
744    }
745
746    fn describe_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
747        let body = req.json_body();
748        validate_required("Name", &body["Name"])?;
749        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
750        validate_string_length("name", name, 1, 64)?;
751        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
752        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
753
754        let state = self.state.read();
755        let bus_name = state.resolve_bus_name(event_bus_name);
756        let key = (bus_name.clone(), name.to_string());
757
758        let rule = state.rules.get(&key).ok_or_else(|| {
759            AwsServiceError::aws_error(
760                StatusCode::BAD_REQUEST,
761                "ResourceNotFoundException",
762                format!("Rule {name} does not exist."),
763            )
764        })?;
765
766        let mut resp = json!({
767            "Name": rule.name,
768            "Arn": rule.arn,
769            "EventBusName": rule.event_bus_name,
770            "State": rule.state,
771        });
772
773        if let Some(ref desc) = rule.description {
774            resp["Description"] = json!(desc);
775        }
776        if let Some(ref ep) = rule.event_pattern {
777            resp["EventPattern"] = json!(ep);
778        }
779        if let Some(ref se) = rule.schedule_expression {
780            resp["ScheduleExpression"] = json!(se);
781        }
782        if let Some(ref role) = rule.role_arn {
783            resp["RoleArn"] = json!(role);
784        }
785        if let Some(ref mb) = rule.managed_by {
786            resp["ManagedBy"] = json!(mb);
787        }
788        if let Some(ref cb) = rule.created_by {
789            resp["CreatedBy"] = json!(cb);
790        }
791        // If non-default bus, set CreatedBy to account_id
792        if rule.event_bus_name != "default" && rule.created_by.is_none() {
793            resp["CreatedBy"] = json!(state.account_id);
794        }
795
796        Ok(AwsResponse::ok_json(resp))
797    }
798
799    fn enable_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
800        let body = req.json_body();
801        validate_required("Name", &body["Name"])?;
802        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
803        validate_string_length("name", name, 1, 64)?;
804        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
805        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
806
807        let mut state = self.state.write();
808        let bus_name = state.resolve_bus_name(event_bus_name);
809        let key = (bus_name, name.to_string());
810
811        let rule = state.rules.get_mut(&key).ok_or_else(|| {
812            AwsServiceError::aws_error(
813                StatusCode::BAD_REQUEST,
814                "ResourceNotFoundException",
815                format!("Rule {name} does not exist."),
816            )
817        })?;
818
819        rule.state = "ENABLED".to_string();
820        Ok(AwsResponse::ok_json(json!({})))
821    }
822
823    fn disable_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
824        let body = req.json_body();
825        validate_required("Name", &body["Name"])?;
826        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
827        validate_string_length("name", name, 1, 64)?;
828        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
829        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
830
831        let mut state = self.state.write();
832        let bus_name = state.resolve_bus_name(event_bus_name);
833        let key = (bus_name, name.to_string());
834
835        let rule = state.rules.get_mut(&key).ok_or_else(|| {
836            AwsServiceError::aws_error(
837                StatusCode::BAD_REQUEST,
838                "ResourceNotFoundException",
839                format!("Rule {name} does not exist."),
840            )
841        })?;
842
843        rule.state = "DISABLED".to_string();
844        Ok(AwsResponse::ok_json(json!({})))
845    }
846
847    // ─── Target Operations ──────────────────────────────────────────────
848
849    fn put_targets(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
850        let body = req.json_body();
851        validate_required("Rule", &body["Rule"])?;
852        let rule_name = body["Rule"].as_str().ok_or_else(|| missing("Rule"))?;
853        validate_string_length("rule", rule_name, 1, 64)?;
854        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
855        validate_required("Targets", &body["Targets"])?;
856        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
857        let targets = body["Targets"]
858            .as_array()
859            .ok_or_else(|| missing("Targets"))?;
860
861        // Validate targets - check for FIFO SQS without SqsParameters
862        for target in targets {
863            let target_id = target["Id"].as_str().unwrap_or("");
864            let target_arn = target["Arn"].as_str().unwrap_or("");
865
866            if target_arn.ends_with(".fifo") && target.get("SqsParameters").is_none() {
867                return Err(AwsServiceError::aws_error(
868                    StatusCode::BAD_REQUEST,
869                    "ValidationException",
870                    format!(
871                        "Parameter(s) SqsParameters must be specified for target: {target_id}."
872                    ),
873                ));
874            }
875
876            // Validate ARN format
877            if !target_arn.starts_with("arn:") {
878                return Err(AwsServiceError::aws_error(
879                    StatusCode::BAD_REQUEST,
880                    "ValidationException",
881                    format!(
882                        "Parameter {target_arn} is not valid. Reason: Provided Arn is not in correct format."
883                    ),
884                ));
885            }
886        }
887
888        let mut state = self.state.write();
889        let bus_name = state.resolve_bus_name(event_bus_name);
890        let key = (bus_name.clone(), rule_name.to_string());
891
892        let rule = state.rules.get_mut(&key).ok_or_else(|| {
893            AwsServiceError::aws_error(
894                StatusCode::BAD_REQUEST,
895                "ResourceNotFoundException",
896                format!("Rule {rule_name} does not exist on EventBus {bus_name}."),
897            )
898        })?;
899
900        for target in targets {
901            let et = parse_target(target);
902            // Remove existing target with same ID
903            rule.targets.retain(|t| t.id != et.id);
904            rule.targets.push(et);
905        }
906
907        Ok(AwsResponse::ok_json(json!({
908            "FailedEntryCount": 0,
909            "FailedEntries": [],
910        })))
911    }
912
913    fn remove_targets(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
914        let body = req.json_body();
915        validate_required("Rule", &body["Rule"])?;
916        let rule_name = body["Rule"].as_str().ok_or_else(|| missing("Rule"))?;
917        validate_string_length("rule", rule_name, 1, 64)?;
918        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
919        validate_required("Ids", &body["Ids"])?;
920        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
921        let ids = body["Ids"].as_array().ok_or_else(|| missing("Ids"))?;
922
923        let target_ids: Vec<String> = ids
924            .iter()
925            .filter_map(|v| v.as_str().map(|s| s.to_string()))
926            .collect();
927
928        let mut state = self.state.write();
929        let bus_name = state.resolve_bus_name(event_bus_name);
930        let key = (bus_name.clone(), rule_name.to_string());
931
932        let rule = state.rules.get_mut(&key).ok_or_else(|| {
933            AwsServiceError::aws_error(
934                StatusCode::BAD_REQUEST,
935                "ResourceNotFoundException",
936                format!("Rule {rule_name} does not exist on EventBus {bus_name}."),
937            )
938        })?;
939
940        rule.targets.retain(|t| !target_ids.contains(&t.id));
941
942        Ok(AwsResponse::ok_json(json!({
943            "FailedEntryCount": 0,
944            "FailedEntries": [],
945        })))
946    }
947
948    fn list_targets_by_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
949        let body = req.json_body();
950        validate_required("Rule", &body["Rule"])?;
951        let rule_name = body["Rule"].as_str().ok_or_else(|| missing("Rule"))?;
952        validate_string_length("rule", rule_name, 1, 64)?;
953        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
954        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
955        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
956        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
957        let limit = body["Limit"].as_u64().map(|n| n as usize);
958        let next_token = body["NextToken"].as_str();
959
960        let state = self.state.read();
961        let bus_name = state.resolve_bus_name(event_bus_name);
962        let key = (bus_name, rule_name.to_string());
963
964        let rule = state.rules.get(&key).ok_or_else(|| {
965            AwsServiceError::aws_error(
966                StatusCode::BAD_REQUEST,
967                "ResourceNotFoundException",
968                format!("Rule {rule_name} does not exist."),
969            )
970        })?;
971
972        let all_targets = &rule.targets;
973        let start = next_token
974            .and_then(|t| t.parse::<usize>().ok())
975            .unwrap_or(0)
976            .min(all_targets.len());
977        let slice = &all_targets[start..];
978
979        let (page, new_next_token) = if let Some(lim) = limit {
980            if slice.len() > lim {
981                (&slice[..lim], Some((start + lim).to_string()))
982            } else {
983                (slice, None)
984            }
985        } else {
986            (slice, None)
987        };
988
989        let targets: Vec<Value> = page.iter().map(target_to_json).collect();
990
991        let mut resp = json!({ "Targets": targets });
992        if let Some(token) = new_next_token {
993            resp["NextToken"] = json!(token);
994        }
995
996        Ok(AwsResponse::ok_json(resp))
997    }
998
999    fn list_rule_names_by_target(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1000        let body = req.json_body();
1001        validate_required("TargetArn", &body["TargetArn"])?;
1002        let target_arn = body["TargetArn"]
1003            .as_str()
1004            .ok_or_else(|| missing("TargetArn"))?;
1005        validate_string_length("targetArn", target_arn, 1, 1600)?;
1006        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
1007        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1008        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1009        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
1010        let limit = body["Limit"].as_u64().map(|n| n as usize);
1011        let next_token = body["NextToken"].as_str();
1012
1013        let state = self.state.read();
1014        let bus_name = state.resolve_bus_name(event_bus_name);
1015
1016        // Deduplicate rule names
1017        let mut rule_names: Vec<String> = Vec::new();
1018        for rule in state.rules.values() {
1019            if rule.event_bus_name == bus_name
1020                && rule.targets.iter().any(|t| t.arn == target_arn)
1021                && !rule_names.contains(&rule.name)
1022            {
1023                rule_names.push(rule.name.clone());
1024            }
1025        }
1026        rule_names.sort();
1027
1028        let start = next_token
1029            .and_then(|t| t.parse::<usize>().ok())
1030            .unwrap_or(0)
1031            .min(rule_names.len());
1032        let slice = &rule_names[start..];
1033
1034        let (page, new_next_token) = if let Some(lim) = limit {
1035            if slice.len() > lim {
1036                (&slice[..lim], Some((start + lim).to_string()))
1037            } else {
1038                (slice, None)
1039            }
1040        } else {
1041            (slice, None)
1042        };
1043
1044        let mut resp = json!({ "RuleNames": page });
1045        if let Some(token) = new_next_token {
1046            resp["NextToken"] = json!(token);
1047        }
1048
1049        Ok(AwsResponse::ok_json(resp))
1050    }
1051
1052    // ─── Partner Event Sources ────────────���───────────────────────────
1053
1054    fn create_partner_event_source(
1055        &self,
1056        req: &AwsRequest,
1057    ) -> Result<AwsResponse, AwsServiceError> {
1058        let body = req.json_body();
1059        validate_required("Name", &body["Name"])?;
1060        let name = body["Name"]
1061            .as_str()
1062            .ok_or_else(|| missing("Name"))?
1063            .to_string();
1064        validate_string_length("name", &name, 1, 256)?;
1065        validate_required("Account", &body["Account"])?;
1066        let account = body["Account"]
1067            .as_str()
1068            .ok_or_else(|| missing("Account"))?
1069            .to_string();
1070        validate_string_length("account", &account, 12, 12)?;
1071
1072        let mut state = self.state.write();
1073        if state.partner_event_sources.contains_key(&name) {
1074            return Err(AwsServiceError::aws_error(
1075                StatusCode::CONFLICT,
1076                "ResourceAlreadyExistsException",
1077                format!("Partner event source {name} already exists."),
1078            ));
1079        }
1080        let arn = format!(
1081            "arn:aws:events:{}::event-source/aws.partner/{}",
1082            state.region, name
1083        );
1084        let now = Utc::now();
1085        let ps = PartnerEventSource {
1086            name: name.clone(),
1087            arn: arn.clone(),
1088            account,
1089            creation_time: now,
1090            expiration_time: None,
1091            state: "ACTIVE".to_string(),
1092        };
1093        state.partner_event_sources.insert(name.clone(), ps);
1094
1095        Ok(AwsResponse::ok_json(json!({ "EventSourceArn": arn })))
1096    }
1097
1098    fn delete_partner_event_source(
1099        &self,
1100        req: &AwsRequest,
1101    ) -> Result<AwsResponse, AwsServiceError> {
1102        let body = req.json_body();
1103        validate_required("Name", &body["Name"])?;
1104        let name = body["Name"]
1105            .as_str()
1106            .ok_or_else(|| missing("Name"))?
1107            .to_string();
1108        validate_required("Account", &body["Account"])?;
1109        let account = body["Account"]
1110            .as_str()
1111            .ok_or_else(|| missing("Account"))?
1112            .to_string();
1113
1114        let mut state = self.state.write();
1115        match state.partner_event_sources.get(&name) {
1116            Some(ps) if ps.account == account => {
1117                state.partner_event_sources.remove(&name);
1118            }
1119            Some(_) => {
1120                return Err(AwsServiceError::aws_error(
1121                    StatusCode::NOT_FOUND,
1122                    "ResourceNotFoundException",
1123                    format!("Partner event source {name} does not exist for account {account}."),
1124                ));
1125            }
1126            None => {
1127                return Err(AwsServiceError::aws_error(
1128                    StatusCode::NOT_FOUND,
1129                    "ResourceNotFoundException",
1130                    format!("Partner event source {name} does not exist."),
1131                ));
1132            }
1133        }
1134
1135        Ok(AwsResponse::ok_json(json!({})))
1136    }
1137
1138    fn describe_partner_event_source(
1139        &self,
1140        req: &AwsRequest,
1141    ) -> Result<AwsResponse, AwsServiceError> {
1142        let body = req.json_body();
1143        validate_required("Name", &body["Name"])?;
1144        let name = body["Name"]
1145            .as_str()
1146            .ok_or_else(|| missing("Name"))?
1147            .to_string();
1148        validate_string_length("name", &name, 1, 256)?;
1149
1150        let state = self.state.read();
1151        let ps = state.partner_event_sources.get(&name).ok_or_else(|| {
1152            AwsServiceError::aws_error(
1153                StatusCode::NOT_FOUND,
1154                "ResourceNotFoundException",
1155                format!("Partner event source {name} does not exist."),
1156            )
1157        })?;
1158
1159        Ok(AwsResponse::ok_json(json!({
1160            "Arn": ps.arn,
1161            "Name": ps.name,
1162        })))
1163    }
1164
1165    fn list_partner_event_sources(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1166        let body = req.json_body();
1167        validate_required("namePrefix", &body["NamePrefix"])?;
1168        let name_prefix = body["NamePrefix"]
1169            .as_str()
1170            .ok_or_else(|| missing("NamePrefix"))?;
1171        validate_string_length("namePrefix", name_prefix, 1, 256)?;
1172        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1173        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1174        let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
1175
1176        let state = self.state.read();
1177        let all: Vec<Value> = state
1178            .partner_event_sources
1179            .values()
1180            .filter(|ps| ps.name.starts_with(name_prefix))
1181            .map(|ps| {
1182                json!({
1183                    "Arn": ps.arn,
1184                    "Name": ps.name,
1185                })
1186            })
1187            .collect();
1188
1189        let (sources, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
1190        let mut resp = json!({ "PartnerEventSources": sources });
1191        if let Some(token) = next_token {
1192            resp["NextToken"] = json!(token);
1193        }
1194
1195        Ok(AwsResponse::ok_json(resp))
1196    }
1197
1198    fn list_partner_event_source_accounts(
1199        &self,
1200        req: &AwsRequest,
1201    ) -> Result<AwsResponse, AwsServiceError> {
1202        let body = req.json_body();
1203        validate_required("EventSourceName", &body["EventSourceName"])?;
1204        let event_source_name = body["EventSourceName"]
1205            .as_str()
1206            .ok_or_else(|| missing("EventSourceName"))?;
1207        validate_string_length("eventSourceName", event_source_name, 1, 256)?;
1208        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1209        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1210
1211        let state = self.state.read();
1212        let accounts: Vec<Value> = state
1213            .partner_event_sources
1214            .values()
1215            .filter(|ps| ps.name == event_source_name)
1216            .map(|ps| json!({ "Account": ps.account }))
1217            .collect();
1218
1219        Ok(AwsResponse::ok_json(json!({
1220            "PartnerEventSourceAccounts": accounts
1221        })))
1222    }
1223
1224    fn activate_event_source(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1225        let body = req.json_body();
1226        validate_required("Name", &body["Name"])?;
1227        let name = body["Name"]
1228            .as_str()
1229            .ok_or_else(|| missing("Name"))?
1230            .to_string();
1231
1232        let mut state = self.state.write();
1233        let ps = state.partner_event_sources.get_mut(&name).ok_or_else(|| {
1234            AwsServiceError::aws_error(
1235                StatusCode::NOT_FOUND,
1236                "ResourceNotFoundException",
1237                format!("Event source {name} does not exist."),
1238            )
1239        })?;
1240        ps.state = "ACTIVE".to_string();
1241
1242        Ok(AwsResponse::ok_json(json!({})))
1243    }
1244
1245    fn deactivate_event_source(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1246        let body = req.json_body();
1247        validate_required("Name", &body["Name"])?;
1248        let name = body["Name"]
1249            .as_str()
1250            .ok_or_else(|| missing("Name"))?
1251            .to_string();
1252
1253        let mut state = self.state.write();
1254        let ps = state.partner_event_sources.get_mut(&name).ok_or_else(|| {
1255            AwsServiceError::aws_error(
1256                StatusCode::NOT_FOUND,
1257                "ResourceNotFoundException",
1258                format!("Event source {name} does not exist."),
1259            )
1260        })?;
1261        ps.state = "INACTIVE".to_string();
1262
1263        Ok(AwsResponse::ok_json(json!({})))
1264    }
1265
1266    fn describe_event_source(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1267        let body = req.json_body();
1268        validate_required("Name", &body["Name"])?;
1269        let name = body["Name"]
1270            .as_str()
1271            .ok_or_else(|| missing("Name"))?
1272            .to_string();
1273
1274        let state = self.state.read();
1275        let ps = state.partner_event_sources.get(&name).ok_or_else(|| {
1276            AwsServiceError::aws_error(
1277                StatusCode::NOT_FOUND,
1278                "ResourceNotFoundException",
1279                format!("Event source {name} does not exist."),
1280            )
1281        })?;
1282
1283        Ok(AwsResponse::ok_json(json!({
1284            "Arn": ps.arn,
1285            "Name": ps.name,
1286            "CreatedBy": ps.account,
1287            "CreationTime": ps.creation_time.timestamp() as f64,
1288            "State": ps.state,
1289        })))
1290    }
1291
1292    fn list_event_sources(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1293        let body = req.json_body();
1294        validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 256)?;
1295        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1296        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1297        let name_prefix = body["NamePrefix"].as_str();
1298        let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
1299
1300        let state = self.state.read();
1301        let all: Vec<Value> = state
1302            .partner_event_sources
1303            .values()
1304            .filter(|ps| match name_prefix {
1305                Some(prefix) => ps.name.starts_with(prefix),
1306                None => true,
1307            })
1308            .map(|ps| {
1309                json!({
1310                    "Arn": ps.arn,
1311                    "Name": ps.name,
1312                    "CreatedBy": ps.account,
1313                    "CreationTime": ps.creation_time.timestamp() as f64,
1314                    "State": ps.state,
1315                })
1316            })
1317            .collect();
1318
1319        let (sources, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
1320        let mut resp = json!({ "EventSources": sources });
1321        if let Some(token) = next_token {
1322            resp["NextToken"] = json!(token);
1323        }
1324
1325        Ok(AwsResponse::ok_json(resp))
1326    }
1327
1328    fn put_partner_events(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1329        let body = req.json_body();
1330        validate_required("Entries", &body["Entries"])?;
1331        let entries = body["Entries"]
1332            .as_array()
1333            .ok_or_else(|| missing("Entries"))?;
1334
1335        let mut result_entries = Vec::new();
1336        for _entry in entries {
1337            let event_id = uuid::Uuid::new_v4().to_string();
1338            result_entries.push(json!({ "EventId": event_id }));
1339        }
1340
1341        Ok(AwsResponse::ok_json(json!({
1342            "FailedEntryCount": 0,
1343            "Entries": result_entries,
1344        })))
1345    }
1346
1347    // ─── TestEventPattern ────────────────────────────────────────────────
1348
1349    fn test_event_pattern(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1350        let body = req.json_body();
1351        validate_required("EventPattern", &body["EventPattern"])?;
1352        validate_required("Event", &body["Event"])?;
1353        let event_pattern = body["EventPattern"]
1354            .as_str()
1355            .ok_or_else(|| missing("EventPattern"))?;
1356        let event_str = body["Event"].as_str().ok_or_else(|| missing("Event"))?;
1357
1358        // Parse the event JSON
1359        let event: Value = serde_json::from_str(event_str).map_err(|_| {
1360            AwsServiceError::aws_error(
1361                StatusCode::BAD_REQUEST,
1362                "InvalidEventPatternException",
1363                "Event is not valid JSON.",
1364            )
1365        })?;
1366
1367        // Parse the pattern JSON
1368        let _pattern: Value = serde_json::from_str(event_pattern).map_err(|_| {
1369            AwsServiceError::aws_error(
1370                StatusCode::BAD_REQUEST,
1371                "InvalidEventPatternException",
1372                "Event pattern is not valid JSON.",
1373            )
1374        })?;
1375
1376        let source = event["source"].as_str().unwrap_or("");
1377        let detail_type = event["detail-type"].as_str().unwrap_or("");
1378        let detail = event
1379            .get("detail")
1380            .map(|v| serde_json::to_string(v).unwrap_or_default())
1381            .unwrap_or_else(|| "{}".to_string());
1382        let account = event["account"].as_str().unwrap_or("");
1383        let region = event["region"].as_str().unwrap_or("");
1384        let resources: Vec<String> = event["resources"]
1385            .as_array()
1386            .map(|arr| {
1387                arr.iter()
1388                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
1389                    .collect()
1390            })
1391            .unwrap_or_default();
1392
1393        let result = matches_pattern(
1394            Some(event_pattern),
1395            source,
1396            detail_type,
1397            &detail,
1398            account,
1399            region,
1400            &resources,
1401        );
1402
1403        Ok(AwsResponse::ok_json(json!({ "Result": result })))
1404    }
1405
1406    // ─── UpdateEventBus ─────────────────────────────────────────────────
1407
1408    fn update_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1409        let body = req.json_body();
1410        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
1411        validate_optional_string_length(
1412            "kmsKeyIdentifier",
1413            body["KmsKeyIdentifier"].as_str(),
1414            0,
1415            2048,
1416        )?;
1417        let name = body["Name"].as_str().unwrap_or("default");
1418
1419        let mut state = self.state.write();
1420        let bus = state.buses.get_mut(name).ok_or_else(|| {
1421            AwsServiceError::aws_error(
1422                StatusCode::BAD_REQUEST,
1423                "ResourceNotFoundException",
1424                format!("Event bus {name} does not exist."),
1425            )
1426        })?;
1427
1428        if let Some(desc) = body["Description"].as_str() {
1429            bus.description = Some(desc.to_string());
1430        }
1431        if let Some(kms) = body["KmsKeyIdentifier"].as_str() {
1432            bus.kms_key_identifier = Some(kms.to_string());
1433        }
1434        if let Some(dlc) = body.get("DeadLetterConfig") {
1435            bus.dead_letter_config = Some(dlc.clone());
1436        }
1437        bus.last_modified_time = Utc::now();
1438
1439        let arn = bus.arn.clone();
1440        let bus_name = bus.name.clone();
1441
1442        Ok(AwsResponse::ok_json(json!({
1443            "Arn": arn,
1444            "Name": bus_name,
1445        })))
1446    }
1447
1448    // ─── Endpoint Operations ────────────────────────────────────────────
1449
1450    fn create_endpoint(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1451        let body = req.json_body();
1452        validate_required("Name", &body["Name"])?;
1453        let name = body["Name"]
1454            .as_str()
1455            .ok_or_else(|| missing("Name"))?
1456            .to_string();
1457        validate_string_length("name", &name, 1, 64)?;
1458        validate_required("RoutingConfig", &body["RoutingConfig"])?;
1459        validate_required("EventBuses", &body["EventBuses"])?;
1460
1461        let description = body["Description"].as_str().map(|s| s.to_string());
1462        let routing_config = body["RoutingConfig"].clone();
1463        let replication_config = body.get("ReplicationConfig").cloned();
1464        let event_buses = body["EventBuses"].as_array().cloned().unwrap_or_default();
1465        let role_arn = body["RoleArn"].as_str().map(|s| s.to_string());
1466
1467        let mut state = self.state.write();
1468        if state.endpoints.contains_key(&name) {
1469            return Err(AwsServiceError::aws_error(
1470                StatusCode::CONFLICT,
1471                "ResourceAlreadyExistsException",
1472                format!("Endpoint {name} already exists."),
1473            ));
1474        }
1475
1476        let endpoint_id = format!("{}.abc123", name);
1477        let arn = format!(
1478            "arn:aws:events:{}:{}:endpoint/{}",
1479            req.region, state.account_id, name
1480        );
1481        let endpoint_url = format!(
1482            "https://{}.endpoint.events.{}.amazonaws.com",
1483            endpoint_id, req.region
1484        );
1485        let now = Utc::now();
1486
1487        let endpoint = Endpoint {
1488            name: name.clone(),
1489            arn: arn.clone(),
1490            endpoint_id: endpoint_id.clone(),
1491            endpoint_url: Some(endpoint_url),
1492            description,
1493            routing_config: routing_config.clone(),
1494            replication_config: replication_config.clone(),
1495            event_buses: event_buses.clone(),
1496            role_arn: role_arn.clone(),
1497            state: "ACTIVE".to_string(),
1498            creation_time: now,
1499            last_modified_time: now,
1500        };
1501        state.endpoints.insert(name.clone(), endpoint);
1502
1503        let mut resp = json!({
1504            "Name": name,
1505            "Arn": arn,
1506            "State": "ACTIVE",
1507            "RoutingConfig": routing_config,
1508            "EventBuses": event_buses,
1509        });
1510        if let Some(ref rc) = replication_config {
1511            resp["ReplicationConfig"] = rc.clone();
1512        }
1513        if let Some(ref ra) = role_arn {
1514            resp["RoleArn"] = json!(ra);
1515        }
1516
1517        Ok(AwsResponse::ok_json(resp))
1518    }
1519
1520    fn delete_endpoint(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1521        let body = req.json_body();
1522        validate_required("Name", &body["Name"])?;
1523        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
1524
1525        let mut state = self.state.write();
1526        state.endpoints.remove(name).ok_or_else(|| {
1527            AwsServiceError::aws_error(
1528                StatusCode::BAD_REQUEST,
1529                "ResourceNotFoundException",
1530                format!("Endpoint '{name}' does not exist."),
1531            )
1532        })?;
1533
1534        Ok(AwsResponse::ok_json(json!({})))
1535    }
1536
1537    fn describe_endpoint(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1538        let body = req.json_body();
1539        validate_required("Name", &body["Name"])?;
1540        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
1541
1542        let state = self.state.read();
1543        let ep = state.endpoints.get(name).ok_or_else(|| {
1544            AwsServiceError::aws_error(
1545                StatusCode::BAD_REQUEST,
1546                "ResourceNotFoundException",
1547                format!("Endpoint '{name}' does not exist."),
1548            )
1549        })?;
1550
1551        let mut resp = json!({
1552            "Name": ep.name,
1553            "Arn": ep.arn,
1554            "EndpointId": ep.endpoint_id,
1555            "State": ep.state,
1556            "RoutingConfig": ep.routing_config,
1557            "EventBuses": ep.event_buses,
1558            "CreationTime": ep.creation_time.timestamp() as f64,
1559            "LastModifiedTime": ep.last_modified_time.timestamp() as f64,
1560        });
1561        if let Some(ref url) = ep.endpoint_url {
1562            resp["EndpointUrl"] = json!(url);
1563        }
1564        if let Some(ref desc) = ep.description {
1565            resp["Description"] = json!(desc);
1566        }
1567        if let Some(ref rc) = ep.replication_config {
1568            resp["ReplicationConfig"] = rc.clone();
1569        }
1570        if let Some(ref ra) = ep.role_arn {
1571            resp["RoleArn"] = json!(ra);
1572        }
1573
1574        Ok(AwsResponse::ok_json(resp))
1575    }
1576
1577    fn list_endpoints(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1578        let body = req.json_body();
1579        validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 64)?;
1580        validate_optional_string_length("homeRegion", body["HomeRegion"].as_str(), 9, 20)?;
1581        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1582        validate_optional_range_i64("maxResults", body["MaxResults"].as_i64(), 1, 100)?;
1583        let name_prefix = body["NamePrefix"].as_str();
1584        let limit = body["MaxResults"].as_i64().unwrap_or(100) as usize;
1585
1586        let state = self.state.read();
1587        let all: Vec<Value> = state
1588            .endpoints
1589            .values()
1590            .filter(|ep| match name_prefix {
1591                Some(prefix) => ep.name.starts_with(prefix),
1592                None => true,
1593            })
1594            .map(|ep| {
1595                let mut obj = json!({
1596                    "Name": ep.name,
1597                    "Arn": ep.arn,
1598                    "EndpointId": ep.endpoint_id,
1599                    "State": ep.state,
1600                    "RoutingConfig": ep.routing_config,
1601                    "EventBuses": ep.event_buses,
1602                    "CreationTime": ep.creation_time.timestamp() as f64,
1603                    "LastModifiedTime": ep.last_modified_time.timestamp() as f64,
1604                });
1605                if let Some(ref url) = ep.endpoint_url {
1606                    obj["EndpointUrl"] = json!(url);
1607                }
1608                obj
1609            })
1610            .collect();
1611
1612        let (endpoints, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
1613        let mut resp = json!({ "Endpoints": endpoints });
1614        if let Some(token) = next_token {
1615            resp["NextToken"] = json!(token);
1616        }
1617
1618        Ok(AwsResponse::ok_json(resp))
1619    }
1620
1621    fn update_endpoint(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1622        let body = req.json_body();
1623        validate_required("Name", &body["Name"])?;
1624        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
1625
1626        let mut state = self.state.write();
1627        let ep = state.endpoints.get_mut(name).ok_or_else(|| {
1628            AwsServiceError::aws_error(
1629                StatusCode::BAD_REQUEST,
1630                "ResourceNotFoundException",
1631                format!("Endpoint '{name}' does not exist."),
1632            )
1633        })?;
1634
1635        if let Some(desc) = body["Description"].as_str() {
1636            ep.description = Some(desc.to_string());
1637        }
1638        if !body["RoutingConfig"].is_null() {
1639            ep.routing_config = body["RoutingConfig"].clone();
1640        }
1641        if let Some(rc) = body.get("ReplicationConfig") {
1642            ep.replication_config = Some(rc.clone());
1643        }
1644        if let Some(buses) = body["EventBuses"].as_array() {
1645            ep.event_buses = buses.clone();
1646        }
1647        if let Some(ra) = body["RoleArn"].as_str() {
1648            ep.role_arn = Some(ra.to_string());
1649        }
1650        ep.last_modified_time = Utc::now();
1651
1652        let resp = json!({
1653            "Name": ep.name,
1654            "Arn": ep.arn,
1655            "EndpointId": ep.endpoint_id,
1656            "State": ep.state,
1657            "RoutingConfig": ep.routing_config,
1658            "EventBuses": ep.event_buses,
1659        });
1660
1661        Ok(AwsResponse::ok_json(resp))
1662    }
1663
1664    // ─── DeauthorizeConnection ──────────────────────────────────────────
1665
1666    fn deauthorize_connection(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1667        let body = req.json_body();
1668        validate_required("Name", &body["Name"])?;
1669        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
1670        validate_string_length("name", name, 1, 64)?;
1671
1672        let mut state = self.state.write();
1673        let conn = state.connections.get_mut(name).ok_or_else(|| {
1674            AwsServiceError::aws_error(
1675                StatusCode::BAD_REQUEST,
1676                "ResourceNotFoundException",
1677                format!("Connection '{name}' does not exist."),
1678            )
1679        })?;
1680
1681        conn.connection_state = "DEAUTHORIZING".to_string();
1682        conn.last_modified_time = Utc::now();
1683
1684        let resp = json!({
1685            "ConnectionArn": conn.arn,
1686            "ConnectionState": conn.connection_state,
1687            "CreationTime": conn.creation_time.timestamp() as f64,
1688            "LastModifiedTime": conn.last_modified_time.timestamp() as f64,
1689            "LastAuthorizedTime": conn.last_authorized_time.timestamp() as f64,
1690        });
1691
1692        Ok(AwsResponse::ok_json(resp))
1693    }
1694
1695    // ─── PutEvents ──────────────────────────────────────────────────────
1696
1697    fn put_events(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1698        let body = req.json_body();
1699        validate_required("Entries", &body["Entries"])?;
1700        validate_optional_string_length("endpointId", body["EndpointId"].as_str(), 1, 50)?;
1701        let entries = body["Entries"]
1702            .as_array()
1703            .ok_or_else(|| missing("Entries"))?;
1704
1705        // Validate entries count
1706        if entries.is_empty() {
1707            return Err(AwsServiceError::aws_error(
1708                StatusCode::BAD_REQUEST,
1709                "ValidationException",
1710                "1 validation error detected: Value '[PutEventsRequestEntry]' at 'entries' failed to satisfy constraint: Member must have length greater than or equal to 1",
1711            ));
1712        }
1713        if entries.len() > 10 {
1714            return Err(AwsServiceError::aws_error(
1715                StatusCode::BAD_REQUEST,
1716                "ValidationException",
1717                "1 validation error detected: Value '[PutEventsRequestEntry]' at 'entries' failed to satisfy constraint: Member must have length less than or equal to 10",
1718            ));
1719        }
1720
1721        let mut state = self.state.write();
1722        let mut result_entries = Vec::new();
1723        let mut events_to_deliver = Vec::new();
1724        let mut failed_count = 0;
1725
1726        for entry in entries {
1727            let source = entry["Source"].as_str().unwrap_or("").to_string();
1728            let detail_type = entry["DetailType"].as_str().unwrap_or("").to_string();
1729            let detail = entry["Detail"].as_str().unwrap_or("").to_string();
1730
1731            // Validate required fields
1732            if source.is_empty() {
1733                failed_count += 1;
1734                result_entries.push(json!({
1735                    "ErrorCode": "InvalidArgument",
1736                    "ErrorMessage": "Parameter Source is not valid. Reason: Source is a required argument.",
1737                }));
1738                continue;
1739            }
1740            if detail_type.is_empty() {
1741                failed_count += 1;
1742                result_entries.push(json!({
1743                    "ErrorCode": "InvalidArgument",
1744                    "ErrorMessage": "Parameter DetailType is not valid. Reason: DetailType is a required argument.",
1745                }));
1746                continue;
1747            }
1748            if detail.is_empty() {
1749                failed_count += 1;
1750                result_entries.push(json!({
1751                    "ErrorCode": "InvalidArgument",
1752                    "ErrorMessage": "Parameter Detail is not valid. Reason: Detail is a required argument.",
1753                }));
1754                continue;
1755            }
1756
1757            // Validate Detail is valid JSON
1758            if serde_json::from_str::<Value>(&detail).is_err() {
1759                failed_count += 1;
1760                result_entries.push(json!({
1761                    "ErrorCode": "MalformedDetail",
1762                    "ErrorMessage": "Detail is malformed.",
1763                }));
1764                continue;
1765            }
1766
1767            let event_id = uuid::Uuid::new_v4().to_string();
1768            let raw_bus = entry["EventBusName"]
1769                .as_str()
1770                .unwrap_or("default")
1771                .to_string();
1772            let event_bus_name = state.resolve_bus_name(&raw_bus);
1773            let time = if let Some(s) = entry["Time"].as_str() {
1774                DateTime::parse_from_rfc3339(s)
1775                    .map(|dt| dt.with_timezone(&Utc))
1776                    .unwrap_or_else(|_| Utc::now())
1777            } else if let Some(ts) = entry["Time"].as_f64() {
1778                DateTime::from_timestamp(ts as i64, ((ts.fract()) * 1_000_000_000.0) as u32)
1779                    .unwrap_or_else(Utc::now)
1780            } else if let Some(ts) = entry["Time"].as_i64() {
1781                DateTime::from_timestamp(ts, 0).unwrap_or_else(Utc::now)
1782            } else {
1783                Utc::now()
1784            };
1785            let resources: Vec<String> = entry["Resources"]
1786                .as_array()
1787                .map(|arr| {
1788                    arr.iter()
1789                        .filter_map(|v| v.as_str().map(|s| s.to_string()))
1790                        .collect()
1791                })
1792                .unwrap_or_default();
1793
1794            let event = PutEvent {
1795                event_id: event_id.clone(),
1796                source: source.clone(),
1797                detail_type: detail_type.clone(),
1798                detail: detail.clone(),
1799                event_bus_name: event_bus_name.clone(),
1800                time,
1801                resources: resources.clone(),
1802            };
1803
1804            // Archive matching events
1805            let archive_keys: Vec<String> = state.archives.keys().cloned().collect();
1806            for akey in archive_keys {
1807                let (archive_bus, archive_pattern, archive_enabled) = {
1808                    let a = &state.archives[&akey];
1809                    (
1810                        state.resolve_bus_name(&a.event_source_arn),
1811                        a.event_pattern.clone(),
1812                        a.state == "ENABLED",
1813                    )
1814                };
1815                if archive_bus == event_bus_name && archive_enabled {
1816                    // Check if event matches archive's event pattern
1817                    let pattern_matches = matches_pattern(
1818                        archive_pattern.as_deref(),
1819                        &source,
1820                        &detail_type,
1821                        &detail,
1822                        &req.account_id,
1823                        &req.region,
1824                        &resources,
1825                    );
1826                    if pattern_matches {
1827                        if let Some(archive) = state.archives.get_mut(&akey) {
1828                            archive.event_count += 1;
1829                            archive.size_bytes += detail.len() as i64;
1830                            archive.events.push(event.clone());
1831                        }
1832                    }
1833                }
1834            }
1835
1836            state.events.push(event);
1837
1838            // Find matching rules and their targets
1839            let matching_targets: Vec<EventTarget> = state
1840                .rules
1841                .values()
1842                .filter(|r| {
1843                    r.event_bus_name == event_bus_name
1844                        && r.state == "ENABLED"
1845                        && matches_pattern(
1846                            r.event_pattern.as_deref(),
1847                            &source,
1848                            &detail_type,
1849                            &detail,
1850                            &req.account_id,
1851                            &req.region,
1852                            &resources,
1853                        )
1854                })
1855                .flat_map(|r| r.targets.clone())
1856                .collect();
1857
1858            if !matching_targets.is_empty() {
1859                events_to_deliver.push((
1860                    event_id.clone(),
1861                    source,
1862                    detail_type,
1863                    detail,
1864                    time,
1865                    resources,
1866                    matching_targets,
1867                ));
1868            }
1869
1870            result_entries.push(json!({ "EventId": event_id }));
1871        }
1872
1873        // Drop the lock before delivering
1874        drop(state);
1875
1876        // Deliver to targets
1877        for (event_id, source, detail_type, detail, time, resources, targets) in events_to_deliver {
1878            let detail_value: Value = serde_json::from_str(&detail).unwrap_or(json!({}));
1879            let event_json = json!({
1880                "version": "0",
1881                "id": event_id,
1882                "source": source,
1883                "account": req.account_id,
1884                "detail-type": detail_type,
1885                "detail": detail_value,
1886                "time": time.format("%Y-%m-%dT%H:%M:%SZ").to_string(),
1887                "region": req.region,
1888                "resources": resources,
1889            });
1890            let event_str = event_json.to_string();
1891
1892            for target in targets {
1893                let arn = &target.arn;
1894                // Compute the message body, applying InputTransformer if present
1895                let body_str = if let Some(ref transformer) = target.input_transformer {
1896                    apply_input_transformer(transformer, &event_json)
1897                } else if let Some(ref input) = target.input {
1898                    input.clone()
1899                } else if let Some(ref input_path) = target.input_path {
1900                    resolve_json_path(&event_json, input_path)
1901                        .map(|v| v.to_string())
1902                        .unwrap_or_else(|| event_str.clone())
1903                } else {
1904                    event_str.clone()
1905                };
1906
1907                if arn.contains(":sqs:") {
1908                    // Extract FIFO parameters (MessageGroupId)
1909                    let group_id = target
1910                        .sqs_parameters
1911                        .as_ref()
1912                        .and_then(|p| p["MessageGroupId"].as_str())
1913                        .map(|s| s.to_string());
1914                    if group_id.is_some() {
1915                        // FIFO queue: send with group ID but no dedup ID.
1916                        // Queues with content-based dedup will auto-generate one;
1917                        // queues without it will reject the message.
1918                        self.delivery.send_to_sqs_with_attrs(
1919                            arn,
1920                            &body_str,
1921                            &HashMap::new(),
1922                            group_id.as_deref(),
1923                            None,
1924                        );
1925                    } else {
1926                        self.delivery.send_to_sqs(arn, &body_str, &HashMap::new());
1927                    }
1928                } else if arn.contains(":sns:") {
1929                    self.delivery
1930                        .publish_to_sns(arn, &body_str, Some(&detail_type));
1931                } else if arn.contains(":lambda:") {
1932                    tracing::info!(
1933                        function_arn = %arn,
1934                        payload = %body_str,
1935                        "EventBridge delivering to Lambda function"
1936                    );
1937                    let now = Utc::now();
1938                    let mut state = self.state.write();
1939                    state
1940                        .lambda_invocations
1941                        .push(crate::state::LambdaInvocation {
1942                            function_arn: arn.clone(),
1943                            payload: body_str.clone(),
1944                            timestamp: now,
1945                        });
1946                    drop(state);
1947                    // Record in Lambda state for cross-service visibility
1948                    if let Some(ref ls) = self.lambda_state {
1949                        ls.write().invocations.push(LambdaInvocation {
1950                            function_arn: arn.clone(),
1951                            payload: body_str.clone(),
1952                            timestamp: now,
1953                            source: "aws:events".to_string(),
1954                        });
1955                    }
1956                    // Actually invoke the Lambda function if a container runtime is available
1957                    invoke_lambda_async(
1958                        &self.container_runtime,
1959                        &self.lambda_state,
1960                        arn,
1961                        &body_str,
1962                    );
1963                } else if arn.contains(":logs:") {
1964                    tracing::info!(
1965                        log_group_arn = %arn,
1966                        payload = %body_str,
1967                        "EventBridge delivering to CloudWatch Logs"
1968                    );
1969                    let now = Utc::now();
1970                    let mut state = self.state.write();
1971                    state.log_deliveries.push(crate::state::LogDelivery {
1972                        log_group_arn: arn.clone(),
1973                        payload: body_str.clone(),
1974                        timestamp: now,
1975                    });
1976                    drop(state);
1977                    // Write event to CloudWatch Logs state
1978                    if let Some(ref log_state) = self.logs_state {
1979                        deliver_to_logs(log_state, arn, &body_str, now);
1980                    }
1981                } else if arn.contains(":states:") {
1982                    tracing::info!(
1983                        state_machine_arn = %arn,
1984                        payload = %body_str,
1985                        "EventBridge delivering to Step Functions (stub)"
1986                    );
1987                    let mut state = self.state.write();
1988                    state
1989                        .step_function_executions
1990                        .push(crate::state::StepFunctionExecution {
1991                            state_machine_arn: arn.clone(),
1992                            payload: body_str.clone(),
1993                            timestamp: Utc::now(),
1994                        });
1995                } else if arn.contains(":api-destination/") {
1996                    // ApiDestination target: look up destination + connection, then POST
1997                    let state = self.state.read();
1998                    let dest = state.api_destinations.values().find(|d| d.arn == *arn);
1999                    if let Some(dest) = dest {
2000                        let url = dest.invocation_endpoint.clone();
2001                        let method = dest.http_method.clone();
2002                        let conn = state
2003                            .connections
2004                            .values()
2005                            .find(|c| c.arn == dest.connection_arn)
2006                            .cloned();
2007                        drop(state);
2008
2009                        let payload = body_str.clone();
2010                        tokio::spawn(async move {
2011                            let client = reqwest::Client::new();
2012                            let mut req_builder = match method.as_str() {
2013                                "GET" => client.get(&url),
2014                                "PUT" => client.put(&url),
2015                                "DELETE" => client.delete(&url),
2016                                "PATCH" => client.patch(&url),
2017                                "HEAD" => client.head(&url),
2018                                _ => client.post(&url),
2019                            };
2020                            req_builder = req_builder.header("Content-Type", "application/json");
2021
2022                            // Apply auth from connection
2023                            if let Some(conn) = conn {
2024                                req_builder = apply_connection_auth(req_builder, &conn);
2025                            }
2026
2027                            let result = req_builder.body(payload).send().await;
2028                            if let Err(e) = result {
2029                                tracing::warn!(
2030                                    endpoint = %url,
2031                                    error = %e,
2032                                    "EventBridge ApiDestination delivery failed"
2033                                );
2034                            }
2035                        });
2036                    }
2037                } else if arn.starts_with("https://") || arn.starts_with("http://") {
2038                    // HTTP target — fire-and-forget POST
2039                    let url = arn.clone();
2040                    let payload = body_str.clone();
2041                    tokio::spawn(async move {
2042                        let client = reqwest::Client::new();
2043                        let result = client
2044                            .post(&url)
2045                            .header("Content-Type", "application/json")
2046                            .body(payload)
2047                            .send()
2048                            .await;
2049                        if let Err(e) = result {
2050                            tracing::warn!(
2051                                endpoint = %url,
2052                                error = %e,
2053                                "EventBridge HTTP target delivery failed"
2054                            );
2055                        }
2056                    });
2057                }
2058            }
2059        }
2060
2061        let resp = json!({
2062            "FailedEntryCount": failed_count,
2063            "Entries": result_entries,
2064        });
2065
2066        Ok(AwsResponse::ok_json(resp))
2067    }
2068
2069    // ─── Tagging ────────────────────────────────────────────────────────
2070
2071    fn tag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2072        let body = req.json_body();
2073        validate_required("ResourceARN", &body["ResourceARN"])?;
2074        let arn = body["ResourceARN"]
2075            .as_str()
2076            .ok_or_else(|| missing("ResourceARN"))?;
2077        validate_string_length("resourceARN", arn, 1, 1600)?;
2078        validate_required("Tags", &body["Tags"])?;
2079
2080        let mut state = self.state.write();
2081        let tag_map = find_tags_mut(&mut state, arn)?;
2082
2083        fakecloud_core::tags::apply_tags(tag_map, &body, "Tags", "Key", "Value").map_err(|f| {
2084            AwsServiceError::aws_error(
2085                StatusCode::BAD_REQUEST,
2086                "ValidationException",
2087                format!("{f} must be a list"),
2088            )
2089        })?;
2090
2091        Ok(AwsResponse::ok_json(json!({})))
2092    }
2093
2094    fn untag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2095        let body = req.json_body();
2096        validate_required("ResourceARN", &body["ResourceARN"])?;
2097        let arn = body["ResourceARN"]
2098            .as_str()
2099            .ok_or_else(|| missing("ResourceARN"))?;
2100        validate_string_length("resourceARN", arn, 1, 1600)?;
2101        validate_required("TagKeys", &body["TagKeys"])?;
2102
2103        let mut state = self.state.write();
2104        let tag_map = find_tags_mut(&mut state, arn)?;
2105
2106        fakecloud_core::tags::remove_tags(tag_map, &body, "TagKeys").map_err(|f| {
2107            AwsServiceError::aws_error(
2108                StatusCode::BAD_REQUEST,
2109                "ValidationException",
2110                format!("{f} must be a list"),
2111            )
2112        })?;
2113
2114        Ok(AwsResponse::ok_json(json!({})))
2115    }
2116
2117    fn list_tags_for_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2118        let body = req.json_body();
2119        validate_required("ResourceARN", &body["ResourceARN"])?;
2120        let arn = body["ResourceARN"]
2121            .as_str()
2122            .ok_or_else(|| missing("ResourceARN"))?;
2123        validate_string_length("resourceARN", arn, 1, 1600)?;
2124
2125        let state = self.state.read();
2126        let tag_map = find_tags(&state, arn)?;
2127
2128        let tags = fakecloud_core::tags::tags_to_json(tag_map, "Key", "Value");
2129
2130        Ok(AwsResponse::ok_json(json!({ "Tags": tags })))
2131    }
2132
2133    // ─── Archive Operations ─────────────────────────────────────────────
2134
2135    fn create_archive(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2136        let body = req.json_body();
2137        validate_required("ArchiveName", &body["ArchiveName"])?;
2138        let name = body["ArchiveName"]
2139            .as_str()
2140            .ok_or_else(|| missing("ArchiveName"))?
2141            .to_string();
2142        validate_string_length("archiveName", &name, 1, 48)?;
2143        validate_required("EventSourceArn", &body["EventSourceArn"])?;
2144        let event_source_arn = body["EventSourceArn"]
2145            .as_str()
2146            .ok_or_else(|| missing("EventSourceArn"))?
2147            .to_string();
2148        validate_string_length("eventSourceArn", &event_source_arn, 1, 1600)?;
2149        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2150        validate_optional_string_length("eventPattern", body["EventPattern"].as_str(), 0, 4096)?;
2151        if let Some(rd) = body["RetentionDays"].as_i64() {
2152            validate_range_i64("retentionDays", rd, 0, i64::MAX)?;
2153        }
2154        let description = body["Description"].as_str().map(|s| s.to_string());
2155        let event_pattern = body["EventPattern"].as_str().map(|s| s.to_string());
2156        let retention_days = body["RetentionDays"].as_i64().unwrap_or(0);
2157
2158        // Validate event pattern if provided
2159        if let Some(ref pattern) = event_pattern {
2160            validate_event_pattern(pattern)?;
2161        }
2162
2163        let mut state = self.state.write();
2164
2165        // Validate event bus exists
2166        let bus_name = state.resolve_bus_name(&event_source_arn);
2167        if !state.buses.contains_key(&bus_name) {
2168            return Err(AwsServiceError::aws_error(
2169                StatusCode::BAD_REQUEST,
2170                "ResourceNotFoundException",
2171                format!("Event bus {bus_name} does not exist."),
2172            ));
2173        }
2174
2175        // Check duplicate
2176        if state.archives.contains_key(&name) {
2177            return Err(AwsServiceError::aws_error(
2178                StatusCode::BAD_REQUEST,
2179                "ResourceAlreadyExistsException",
2180                format!("Archive {name} already exists."),
2181            ));
2182        }
2183
2184        let now = Utc::now();
2185        let arn = format!(
2186            "arn:aws:events:{}:{}:archive/{}",
2187            req.region, state.account_id, name
2188        );
2189
2190        let archive = Archive {
2191            name: name.clone(),
2192            arn: arn.clone(),
2193            event_source_arn: event_source_arn.clone(),
2194            description,
2195            event_pattern: event_pattern.clone(),
2196            retention_days,
2197            state: "ENABLED".to_string(),
2198            creation_time: now,
2199            event_count: 0,
2200            size_bytes: 0,
2201            events: Vec::new(),
2202        };
2203        state.archives.insert(name.clone(), archive);
2204
2205        // Create the archive rule
2206        let rule_name = format!("Events-Archive-{name}");
2207        let rule_arn = format!(
2208            "arn:aws:events:{}:{}:rule/{}",
2209            req.region, state.account_id, rule_name
2210        );
2211        // Merge archive event pattern with replay-name filter
2212        let rule_event_pattern = {
2213            let mut merged = if let Some(ref ep) = event_pattern {
2214                serde_json::from_str::<Value>(ep).unwrap_or_else(|_| json!({}))
2215            } else {
2216                json!({})
2217            };
2218            if let Some(obj) = merged.as_object_mut() {
2219                obj.insert("replay-name".to_string(), json!([{"exists": false}]));
2220            }
2221            serde_json::to_string(&merged).unwrap_or_default()
2222        };
2223
2224        // Build the archive target with InputTransformer
2225        let archive_target = EventTarget {
2226            id: name.clone(),
2227            arn: format!("arn:aws:events:{}:::", req.region),
2228            input: None,
2229            input_path: None,
2230            input_transformer: Some(json!({
2231                "InputPathsMap": {},
2232                "InputTemplate": format!(
2233                    "{{\"archive-arn\": \"{}\", \"event\": <aws.events.event.json>, \"ingestion-time\": <aws.events.event.ingestion-time>}}",
2234                    arn
2235                )
2236            })),
2237            sqs_parameters: None,
2238        };
2239
2240        let archive_rule = EventRule {
2241            name: rule_name.clone(),
2242            arn: rule_arn,
2243            event_bus_name: bus_name.clone(),
2244            event_pattern: Some(rule_event_pattern),
2245            schedule_expression: None,
2246            state: "ENABLED".to_string(),
2247            description: None,
2248            role_arn: None,
2249            managed_by: Some("prod.vhs.events.aws.internal".to_string()),
2250            created_by: Some(state.account_id.clone()),
2251            targets: vec![archive_target],
2252            tags: HashMap::new(),
2253            last_fired: None,
2254        };
2255        let key = (bus_name, rule_name);
2256        state.rules.insert(key, archive_rule);
2257
2258        Ok(AwsResponse::ok_json(json!({
2259            "ArchiveArn": arn,
2260            "CreationTime": now.timestamp() as f64,
2261            "State": "ENABLED",
2262        })))
2263    }
2264
2265    fn describe_archive(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2266        let body = req.json_body();
2267        validate_required("ArchiveName", &body["ArchiveName"])?;
2268        let name = body["ArchiveName"]
2269            .as_str()
2270            .ok_or_else(|| missing("ArchiveName"))?;
2271        validate_string_length("archiveName", name, 1, 48)?;
2272
2273        let state = self.state.read();
2274        let archive = state.archives.get(name).ok_or_else(|| {
2275            AwsServiceError::aws_error(
2276                StatusCode::BAD_REQUEST,
2277                "ResourceNotFoundException",
2278                format!("Archive {name} does not exist."),
2279            )
2280        })?;
2281
2282        let mut resp = json!({
2283            "ArchiveArn": archive.arn,
2284            "ArchiveName": archive.name,
2285            "CreationTime": archive.creation_time.timestamp() as f64,
2286            "EventCount": archive.event_count,
2287            "EventSourceArn": archive.event_source_arn,
2288            "RetentionDays": archive.retention_days,
2289            "SizeBytes": archive.size_bytes,
2290            "State": archive.state,
2291        });
2292        if let Some(ref desc) = archive.description {
2293            resp["Description"] = json!(desc);
2294        }
2295        if let Some(ref ep) = archive.event_pattern {
2296            resp["EventPattern"] = json!(ep);
2297        }
2298
2299        Ok(AwsResponse::ok_json(resp))
2300    }
2301
2302    fn list_archives(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2303        let body = req.json_body();
2304        validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 48)?;
2305        validate_optional_string_length(
2306            "eventSourceArn",
2307            body["EventSourceArn"].as_str(),
2308            1,
2309            1600,
2310        )?;
2311        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
2312        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
2313        let name_prefix = body["NamePrefix"].as_str();
2314        let source_arn = body["EventSourceArn"].as_str();
2315        let archive_state = body["State"].as_str();
2316
2317        // Validate at most one filter
2318        let filter_count = [
2319            name_prefix.is_some(),
2320            source_arn.is_some(),
2321            archive_state.is_some(),
2322        ]
2323        .iter()
2324        .filter(|&&x| x)
2325        .count();
2326        if filter_count > 1 {
2327            return Err(AwsServiceError::aws_error(
2328                StatusCode::BAD_REQUEST,
2329                "ValidationException",
2330                "At most one filter is allowed for ListArchives. Use either : State, EventSourceArn, or NamePrefix.",
2331            ));
2332        }
2333
2334        // Validate state
2335        if let Some(s) = archive_state {
2336            let valid = [
2337                "ENABLED",
2338                "DISABLED",
2339                "CREATING",
2340                "UPDATING",
2341                "CREATE_FAILED",
2342                "UPDATE_FAILED",
2343            ];
2344            if !valid.contains(&s) {
2345                return Err(AwsServiceError::aws_error(
2346                    StatusCode::BAD_REQUEST,
2347                    "ValidationException",
2348                    format!(
2349                        "1 validation error detected: Value '{}' at 'state' failed to satisfy constraint: Member must satisfy enum value set: [ENABLED, DISABLED, CREATING, UPDATING, CREATE_FAILED, UPDATE_FAILED]",
2350                        s
2351                    ),
2352                ));
2353            }
2354        }
2355
2356        let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
2357
2358        let state = self.state.read();
2359        let all: Vec<Value> = state
2360            .archives
2361            .values()
2362            .filter(|a| {
2363                if let Some(prefix) = name_prefix {
2364                    a.name.starts_with(prefix)
2365                } else if let Some(arn) = source_arn {
2366                    a.event_source_arn == arn
2367                } else if let Some(s) = archive_state {
2368                    a.state == s
2369                } else {
2370                    true
2371                }
2372            })
2373            .map(|a| {
2374                json!({
2375                    "ArchiveName": a.name,
2376                    "CreationTime": a.creation_time.timestamp() as f64,
2377                    "EventCount": a.event_count,
2378                    "EventSourceArn": a.event_source_arn,
2379                    "RetentionDays": a.retention_days,
2380                    "SizeBytes": a.size_bytes,
2381                    "State": a.state,
2382                })
2383            })
2384            .collect();
2385
2386        let (archives, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
2387        let mut resp = json!({ "Archives": archives });
2388        if let Some(token) = next_token {
2389            resp["NextToken"] = json!(token);
2390        }
2391
2392        Ok(AwsResponse::ok_json(resp))
2393    }
2394
2395    fn update_archive(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2396        let body = req.json_body();
2397        validate_required("ArchiveName", &body["ArchiveName"])?;
2398        let name = body["ArchiveName"]
2399            .as_str()
2400            .ok_or_else(|| missing("ArchiveName"))?;
2401        validate_string_length("archiveName", name, 1, 48)?;
2402        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2403        validate_optional_string_length("eventPattern", body["EventPattern"].as_str(), 0, 4096)?;
2404        if let Some(rd) = body["RetentionDays"].as_i64() {
2405            validate_range_i64("retentionDays", rd, 0, i64::MAX)?;
2406        }
2407
2408        // Validate event pattern if provided
2409        if let Some(pattern) = body["EventPattern"].as_str() {
2410            validate_event_pattern(pattern)?;
2411        }
2412
2413        let mut state = self.state.write();
2414        let archive = state.archives.get_mut(name).ok_or_else(|| {
2415            AwsServiceError::aws_error(
2416                StatusCode::BAD_REQUEST,
2417                "ResourceNotFoundException",
2418                format!("Archive {name} does not exist."),
2419            )
2420        })?;
2421
2422        if let Some(desc) = body["Description"].as_str() {
2423            archive.description = Some(desc.to_string());
2424        }
2425        if let Some(pattern) = body["EventPattern"].as_str() {
2426            archive.event_pattern = Some(pattern.to_string());
2427        }
2428        if let Some(days) = body["RetentionDays"].as_i64() {
2429            archive.retention_days = days;
2430        }
2431
2432        Ok(AwsResponse::ok_json(json!({
2433            "ArchiveArn": archive.arn,
2434            "CreationTime": archive.creation_time.timestamp() as f64,
2435            "State": archive.state,
2436        })))
2437    }
2438
2439    fn delete_archive(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2440        let body = req.json_body();
2441        validate_required("ArchiveName", &body["ArchiveName"])?;
2442        let name = body["ArchiveName"]
2443            .as_str()
2444            .ok_or_else(|| missing("ArchiveName"))?;
2445        validate_string_length("archiveName", name, 1, 48)?;
2446
2447        let mut state = self.state.write();
2448        if !state.archives.contains_key(name) {
2449            return Err(AwsServiceError::aws_error(
2450                StatusCode::BAD_REQUEST,
2451                "ResourceNotFoundException",
2452                format!("Archive {name} does not exist."),
2453            ));
2454        }
2455
2456        state.archives.remove(name);
2457
2458        // Remove the archive rule
2459        let rule_name = format!("Events-Archive-{name}");
2460        state.rules.retain(|k, _| k.1 != rule_name);
2461
2462        Ok(AwsResponse::ok_json(json!({})))
2463    }
2464
2465    // ─── Connection Operations ──────────────────────────────────────────
2466
2467    fn create_connection(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2468        let body = req.json_body();
2469        validate_required("Name", &body["Name"])?;
2470        let name = body["Name"]
2471            .as_str()
2472            .ok_or_else(|| missing("Name"))?
2473            .to_string();
2474        validate_string_length("name", &name, 1, 64)?;
2475        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2476        validate_required("AuthorizationType", &body["AuthorizationType"])?;
2477        let description = body["Description"].as_str().map(|s| s.to_string());
2478        let auth_type = body["AuthorizationType"]
2479            .as_str()
2480            .ok_or_else(|| missing("AuthorizationType"))?
2481            .to_string();
2482        validate_enum(
2483            "authorizationType",
2484            &auth_type,
2485            &["BASIC", "OAUTH_CLIENT_CREDENTIALS", "API_KEY"],
2486        )?;
2487        validate_optional_string_length(
2488            "kmsKeyIdentifier",
2489            body["KmsKeyIdentifier"].as_str(),
2490            0,
2491            2048,
2492        )?;
2493        validate_required("AuthParameters", &body["AuthParameters"])?;
2494        let auth_params = body["AuthParameters"].clone();
2495
2496        let mut state = self.state.write();
2497        let now = Utc::now();
2498        let conn_uuid = uuid::Uuid::new_v4();
2499        let arn = format!(
2500            "arn:aws:events:{}:{}:connection/{}/{}",
2501            req.region, state.account_id, name, conn_uuid
2502        );
2503        let secret_arn = format!(
2504            "arn:aws:secretsmanager:{}:{}:secret:events!connection/{}/{}",
2505            req.region, state.account_id, name, conn_uuid
2506        );
2507
2508        let conn = Connection {
2509            name: name.clone(),
2510            arn: arn.clone(),
2511            description,
2512            authorization_type: auth_type.clone(),
2513            auth_parameters: auth_params,
2514            connection_state: "AUTHORIZED".to_string(),
2515            secret_arn: secret_arn.clone(),
2516            creation_time: now,
2517            last_modified_time: now,
2518            last_authorized_time: now,
2519        };
2520        state.connections.insert(name, conn);
2521
2522        Ok(AwsResponse::ok_json(json!({
2523            "ConnectionArn": arn,
2524            "ConnectionState": "AUTHORIZED",
2525            "CreationTime": now.timestamp() as f64,
2526            "LastModifiedTime": now.timestamp() as f64,
2527        })))
2528    }
2529
2530    fn describe_connection(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2531        let body = req.json_body();
2532        validate_required("Name", &body["Name"])?;
2533        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
2534        validate_string_length("name", name, 1, 64)?;
2535
2536        let state = self.state.read();
2537        let conn = state.connections.get(name).ok_or_else(|| {
2538            AwsServiceError::aws_error(
2539                StatusCode::BAD_REQUEST,
2540                "ResourceNotFoundException",
2541                format!("Connection '{name}' does not exist."),
2542            )
2543        })?;
2544
2545        // Build auth parameters response - strip secrets
2546        let auth_params_response =
2547            build_auth_params_response(&conn.authorization_type, &conn.auth_parameters);
2548
2549        let mut resp = json!({
2550            "ConnectionArn": conn.arn,
2551            "Name": conn.name,
2552            "AuthorizationType": conn.authorization_type,
2553            "AuthParameters": auth_params_response,
2554            "ConnectionState": conn.connection_state,
2555            "SecretArn": conn.secret_arn,
2556            "CreationTime": conn.creation_time.timestamp() as f64,
2557            "LastModifiedTime": conn.last_modified_time.timestamp() as f64,
2558            "LastAuthorizedTime": conn.last_authorized_time.timestamp() as f64,
2559        });
2560        if let Some(ref desc) = conn.description {
2561            resp["Description"] = json!(desc);
2562        }
2563
2564        Ok(AwsResponse::ok_json(resp))
2565    }
2566
2567    fn list_connections(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2568        let body = req.json_body();
2569        validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 64)?;
2570        validate_optional_enum(
2571            "connectionState",
2572            body["ConnectionState"].as_str(),
2573            &[
2574                "CREATING",
2575                "UPDATING",
2576                "DELETING",
2577                "AUTHORIZED",
2578                "DEAUTHORIZED",
2579                "AUTHORIZING",
2580                "DEAUTHORIZING",
2581                "ACTIVE",
2582                "FAILED_CONNECTIVITY",
2583            ],
2584        )?;
2585        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
2586        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
2587
2588        let name_prefix = body["NamePrefix"].as_str();
2589        let connection_state = body["ConnectionState"].as_str();
2590        let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
2591
2592        let state = self.state.read();
2593        let all: Vec<Value> = state
2594            .connections
2595            .values()
2596            .filter(|c| {
2597                if let Some(prefix) = name_prefix {
2598                    if !c.name.starts_with(prefix) {
2599                        return false;
2600                    }
2601                }
2602                if let Some(cs) = connection_state {
2603                    if c.connection_state != cs {
2604                        return false;
2605                    }
2606                }
2607                true
2608            })
2609            .map(|c| {
2610                json!({
2611                    "ConnectionArn": c.arn,
2612                    "Name": c.name,
2613                    "AuthorizationType": c.authorization_type,
2614                    "ConnectionState": c.connection_state,
2615                    "CreationTime": c.creation_time.timestamp() as f64,
2616                    "LastModifiedTime": c.last_modified_time.timestamp() as f64,
2617                    "LastAuthorizedTime": c.last_authorized_time.timestamp() as f64,
2618                })
2619            })
2620            .collect();
2621
2622        let (conns, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
2623        let mut resp = json!({ "Connections": conns });
2624        if let Some(token) = next_token {
2625            resp["NextToken"] = json!(token);
2626        }
2627
2628        Ok(AwsResponse::ok_json(resp))
2629    }
2630
2631    fn update_connection(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2632        let body = req.json_body();
2633        validate_required("Name", &body["Name"])?;
2634        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
2635        validate_string_length("name", name, 1, 64)?;
2636        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2637        validate_optional_enum(
2638            "authorizationType",
2639            body["AuthorizationType"].as_str(),
2640            &["BASIC", "OAUTH_CLIENT_CREDENTIALS", "API_KEY"],
2641        )?;
2642
2643        let mut state = self.state.write();
2644        let conn = state.connections.get_mut(name).ok_or_else(|| {
2645            AwsServiceError::aws_error(
2646                StatusCode::BAD_REQUEST,
2647                "ResourceNotFoundException",
2648                format!("Connection '{name}' does not exist."),
2649            )
2650        })?;
2651
2652        if let Some(desc) = body["Description"].as_str() {
2653            conn.description = Some(desc.to_string());
2654        }
2655        if let Some(auth_type) = body["AuthorizationType"].as_str() {
2656            conn.authorization_type = auth_type.to_string();
2657        }
2658        if body.get("AuthParameters").is_some() {
2659            conn.auth_parameters = body["AuthParameters"].clone();
2660        }
2661        conn.last_modified_time = Utc::now();
2662
2663        Ok(AwsResponse::ok_json(json!({
2664            "ConnectionArn": conn.arn,
2665            "ConnectionState": conn.connection_state,
2666            "CreationTime": conn.creation_time.timestamp() as f64,
2667            "LastModifiedTime": conn.last_modified_time.timestamp() as f64,
2668            "LastAuthorizedTime": conn.last_authorized_time.timestamp() as f64,
2669        })))
2670    }
2671
2672    fn delete_connection(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2673        let body = req.json_body();
2674        validate_required("Name", &body["Name"])?;
2675        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
2676        validate_string_length("name", name, 1, 64)?;
2677
2678        let mut state = self.state.write();
2679        let conn = state.connections.remove(name).ok_or_else(|| {
2680            AwsServiceError::aws_error(
2681                StatusCode::BAD_REQUEST,
2682                "ResourceNotFoundException",
2683                format!("Connection '{name}' does not exist."),
2684            )
2685        })?;
2686
2687        Ok(AwsResponse::ok_json(json!({
2688            "ConnectionArn": conn.arn,
2689            "ConnectionState": conn.connection_state,
2690            "CreationTime": conn.creation_time.timestamp() as f64,
2691            "LastModifiedTime": conn.last_modified_time.timestamp() as f64,
2692            "LastAuthorizedTime": conn.last_authorized_time.timestamp() as f64,
2693        })))
2694    }
2695
2696    // ─── API Destination Operations ─────────────────────────────────────
2697
2698    fn create_api_destination(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2699        let body = req.json_body();
2700        validate_required("Name", &body["Name"])?;
2701        let name = body["Name"]
2702            .as_str()
2703            .ok_or_else(|| missing("Name"))?
2704            .to_string();
2705        validate_string_length("name", &name, 1, 64)?;
2706        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2707        validate_required("ConnectionArn", &body["ConnectionArn"])?;
2708        let description = body["Description"].as_str().map(|s| s.to_string());
2709        let connection_arn = body["ConnectionArn"]
2710            .as_str()
2711            .ok_or_else(|| missing("ConnectionArn"))?
2712            .to_string();
2713        validate_string_length("connectionArn", &connection_arn, 1, 1600)?;
2714        validate_required("InvocationEndpoint", &body["InvocationEndpoint"])?;
2715        let endpoint = body["InvocationEndpoint"]
2716            .as_str()
2717            .ok_or_else(|| missing("InvocationEndpoint"))?
2718            .to_string();
2719        validate_string_length("invocationEndpoint", &endpoint, 1, 2048)?;
2720        validate_required("HttpMethod", &body["HttpMethod"])?;
2721        let http_method = body["HttpMethod"]
2722            .as_str()
2723            .ok_or_else(|| missing("HttpMethod"))?
2724            .to_string();
2725        validate_enum(
2726            "httpMethod",
2727            &http_method,
2728            &["POST", "GET", "HEAD", "OPTIONS", "PUT", "PATCH", "DELETE"],
2729        )?;
2730        let rate_limit = body["InvocationRateLimitPerSecond"].as_i64();
2731        if let Some(r) = rate_limit {
2732            validate_range_i64("invocationRateLimitPerSecond", r, 1, i64::MAX)?;
2733        }
2734
2735        let mut state = self.state.write();
2736        let now = Utc::now();
2737        let dest_uuid = uuid::Uuid::new_v4();
2738        let arn = format!(
2739            "arn:aws:events:{}:{}:api-destination/{}/{}",
2740            req.region, state.account_id, name, dest_uuid
2741        );
2742
2743        let dest = ApiDestination {
2744            name: name.clone(),
2745            arn: arn.clone(),
2746            description,
2747            connection_arn,
2748            invocation_endpoint: endpoint,
2749            http_method,
2750            invocation_rate_limit_per_second: rate_limit,
2751            state: "ACTIVE".to_string(),
2752            creation_time: now,
2753            last_modified_time: now,
2754        };
2755        state.api_destinations.insert(name, dest);
2756
2757        Ok(AwsResponse::ok_json(json!({
2758            "ApiDestinationArn": arn,
2759            "ApiDestinationState": "ACTIVE",
2760            "CreationTime": now.timestamp() as f64,
2761            "LastModifiedTime": now.timestamp() as f64,
2762        })))
2763    }
2764
2765    fn describe_api_destination(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2766        let body = req.json_body();
2767        validate_required("Name", &body["Name"])?;
2768        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
2769        validate_string_length("name", name, 1, 64)?;
2770
2771        let state = self.state.read();
2772        let dest = state.api_destinations.get(name).ok_or_else(|| {
2773            AwsServiceError::aws_error(
2774                StatusCode::BAD_REQUEST,
2775                "ResourceNotFoundException",
2776                format!("An api-destination '{name}' does not exist."),
2777            )
2778        })?;
2779
2780        let mut resp = json!({
2781            "ApiDestinationArn": dest.arn,
2782            "Name": dest.name,
2783            "ConnectionArn": dest.connection_arn,
2784            "InvocationEndpoint": dest.invocation_endpoint,
2785            "HttpMethod": dest.http_method,
2786            "ApiDestinationState": dest.state,
2787            "CreationTime": dest.creation_time.timestamp() as f64,
2788            "LastModifiedTime": dest.last_modified_time.timestamp() as f64,
2789        });
2790        if let Some(ref desc) = dest.description {
2791            resp["Description"] = json!(desc);
2792        }
2793        if let Some(rate) = dest.invocation_rate_limit_per_second {
2794            resp["InvocationRateLimitPerSecond"] = json!(rate);
2795        }
2796
2797        Ok(AwsResponse::ok_json(resp))
2798    }
2799
2800    fn list_api_destinations(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2801        let body = req.json_body();
2802        validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 64)?;
2803        validate_optional_string_length("connectionArn", body["ConnectionArn"].as_str(), 1, 1600)?;
2804        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
2805        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
2806
2807        let name_prefix = body["NamePrefix"].as_str();
2808        let connection_arn = body["ConnectionArn"].as_str();
2809        let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
2810
2811        let state = self.state.read();
2812        let all: Vec<Value> = state
2813            .api_destinations
2814            .values()
2815            .filter(|d| {
2816                if let Some(prefix) = name_prefix {
2817                    if !d.name.starts_with(prefix) {
2818                        return false;
2819                    }
2820                }
2821                if let Some(arn) = connection_arn {
2822                    if d.connection_arn != arn {
2823                        return false;
2824                    }
2825                }
2826                true
2827            })
2828            .map(|d| {
2829                let mut obj = json!({
2830                    "ApiDestinationArn": d.arn,
2831                    "Name": d.name,
2832                    "ConnectionArn": d.connection_arn,
2833                    "InvocationEndpoint": d.invocation_endpoint,
2834                    "HttpMethod": d.http_method,
2835                    "ApiDestinationState": d.state,
2836                    "CreationTime": d.creation_time.timestamp() as f64,
2837                    "LastModifiedTime": d.last_modified_time.timestamp() as f64,
2838                });
2839                if let Some(rate) = d.invocation_rate_limit_per_second {
2840                    obj["InvocationRateLimitPerSecond"] = json!(rate);
2841                }
2842                obj
2843            })
2844            .collect();
2845
2846        let (dests, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
2847        let mut resp = json!({ "ApiDestinations": dests });
2848        if let Some(token) = next_token {
2849            resp["NextToken"] = json!(token);
2850        }
2851
2852        Ok(AwsResponse::ok_json(resp))
2853    }
2854
2855    fn update_api_destination(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2856        let body = req.json_body();
2857        validate_required("Name", &body["Name"])?;
2858        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
2859        validate_string_length("name", name, 1, 64)?;
2860        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2861        validate_optional_string_length("connectionArn", body["ConnectionArn"].as_str(), 1, 1600)?;
2862        validate_optional_string_length(
2863            "invocationEndpoint",
2864            body["InvocationEndpoint"].as_str(),
2865            1,
2866            2048,
2867        )?;
2868        validate_optional_enum(
2869            "httpMethod",
2870            body["HttpMethod"].as_str(),
2871            &["POST", "GET", "HEAD", "OPTIONS", "PUT", "PATCH", "DELETE"],
2872        )?;
2873        if let Some(r) = body["InvocationRateLimitPerSecond"].as_i64() {
2874            validate_range_i64("invocationRateLimitPerSecond", r, 1, i64::MAX)?;
2875        }
2876
2877        let mut state = self.state.write();
2878        let dest = state.api_destinations.get_mut(name).ok_or_else(|| {
2879            AwsServiceError::aws_error(
2880                StatusCode::BAD_REQUEST,
2881                "ResourceNotFoundException",
2882                format!("An api-destination '{name}' does not exist."),
2883            )
2884        })?;
2885
2886        if let Some(desc) = body["Description"].as_str() {
2887            dest.description = Some(desc.to_string());
2888        }
2889        if let Some(endpoint) = body["InvocationEndpoint"].as_str() {
2890            dest.invocation_endpoint = endpoint.to_string();
2891        }
2892        if let Some(method) = body["HttpMethod"].as_str() {
2893            dest.http_method = method.to_string();
2894        }
2895        if let Some(rate) = body["InvocationRateLimitPerSecond"].as_i64() {
2896            dest.invocation_rate_limit_per_second = Some(rate);
2897        }
2898        if let Some(conn) = body["ConnectionArn"].as_str() {
2899            dest.connection_arn = conn.to_string();
2900        }
2901        dest.last_modified_time = Utc::now();
2902
2903        Ok(AwsResponse::ok_json(json!({
2904            "ApiDestinationArn": dest.arn,
2905            "ApiDestinationState": dest.state,
2906            "CreationTime": dest.creation_time.timestamp() as f64,
2907            "LastModifiedTime": dest.last_modified_time.timestamp() as f64,
2908        })))
2909    }
2910
2911    fn delete_api_destination(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2912        let body = req.json_body();
2913        validate_required("Name", &body["Name"])?;
2914        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
2915        validate_string_length("name", name, 1, 64)?;
2916
2917        let mut state = self.state.write();
2918        if !state.api_destinations.contains_key(name) {
2919            return Err(AwsServiceError::aws_error(
2920                StatusCode::BAD_REQUEST,
2921                "ResourceNotFoundException",
2922                format!("An api-destination '{name}' does not exist."),
2923            ));
2924        }
2925        state.api_destinations.remove(name);
2926
2927        Ok(AwsResponse::ok_json(json!({})))
2928    }
2929
2930    // ─── Replay Operations ──────────────────────────────────────────────
2931
2932    fn start_replay(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2933        let body = req.json_body();
2934        validate_required("ReplayName", &body["ReplayName"])?;
2935        let name = body["ReplayName"]
2936            .as_str()
2937            .ok_or_else(|| missing("ReplayName"))?
2938            .to_string();
2939        validate_string_length("replayName", &name, 1, 64)?;
2940        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2941        validate_required("EventSourceArn", &body["EventSourceArn"])?;
2942        let description = body["Description"].as_str().map(|s| s.to_string());
2943        let event_source_arn = body["EventSourceArn"]
2944            .as_str()
2945            .ok_or_else(|| missing("EventSourceArn"))?
2946            .to_string();
2947        validate_string_length("eventSourceArn", &event_source_arn, 1, 1600)?;
2948        validate_required("EventStartTime", &body["EventStartTime"])?;
2949        validate_required("EventEndTime", &body["EventEndTime"])?;
2950        validate_required("Destination", &body["Destination"])?;
2951        let destination = body["Destination"].clone();
2952        let event_start_time_f = body["EventStartTime"].as_f64();
2953        let event_end_time_f = body["EventEndTime"].as_f64();
2954
2955        let event_start_time = event_start_time_f
2956            .and_then(|f| DateTime::from_timestamp(f as i64, 0))
2957            .unwrap_or_else(Utc::now);
2958        let event_end_time = event_end_time_f
2959            .and_then(|f| DateTime::from_timestamp(f as i64, 0))
2960            .unwrap_or_else(Utc::now);
2961
2962        // Validate destination ARN
2963        let dest_arn = destination["Arn"].as_str().unwrap_or("");
2964        if !dest_arn.contains(":event-bus/") {
2965            return Err(AwsServiceError::aws_error(
2966                StatusCode::BAD_REQUEST,
2967                "ValidationException",
2968                "Parameter Destination.Arn is not valid. Reason: Must contain an event bus ARN.",
2969            ));
2970        }
2971
2972        let mut state = self.state.write();
2973
2974        // Validate event bus exists
2975        let bus_name = state.resolve_bus_name(dest_arn);
2976        if !state.buses.contains_key(&bus_name) {
2977            return Err(AwsServiceError::aws_error(
2978                StatusCode::BAD_REQUEST,
2979                "ResourceNotFoundException",
2980                format!("Event bus {bus_name} does not exist."),
2981            ));
2982        }
2983
2984        // Validate archive exists
2985        let archive_name = event_source_arn
2986            .rsplit_once("archive/")
2987            .map(|(_, n)| n.to_string())
2988            .unwrap_or_default();
2989        if !state.archives.contains_key(&archive_name) {
2990            return Err(AwsServiceError::aws_error(
2991                StatusCode::BAD_REQUEST,
2992                "ValidationException",
2993                format!(
2994                    "Parameter EventSourceArn is not valid. Reason: Archive {archive_name} does not exist."
2995                ),
2996            ));
2997        }
2998
2999        // Validate archive bus matches destination bus
3000        let archive = state.archives.get(&archive_name).unwrap();
3001        let archive_bus = state.resolve_bus_name(&archive.event_source_arn);
3002        if archive_bus != bus_name {
3003            return Err(AwsServiceError::aws_error(
3004                StatusCode::BAD_REQUEST,
3005                "ValidationException",
3006                "Parameter Destination.Arn is not valid. Reason: Cross event bus replay is not permitted.",
3007            ));
3008        }
3009
3010        // Validate end time after start time
3011        if event_end_time <= event_start_time {
3012            return Err(AwsServiceError::aws_error(
3013                StatusCode::BAD_REQUEST,
3014                "ValidationException",
3015                "Parameter EventEndTime is not valid. Reason: EventStartTime must be before EventEndTime.",
3016            ));
3017        }
3018
3019        // Check duplicate
3020        if state.replays.contains_key(&name) {
3021            return Err(AwsServiceError::aws_error(
3022                StatusCode::BAD_REQUEST,
3023                "ResourceAlreadyExistsException",
3024                format!("Replay {name} already exists."),
3025            ));
3026        }
3027
3028        let now = Utc::now();
3029        let arn = format!(
3030            "arn:aws:events:{}:{}:replay/{}",
3031            req.region, state.account_id, name
3032        );
3033
3034        // Collect archived events within the replay time range
3035        let archive = state.archives.get(&archive_name).unwrap();
3036        let replay_events: Vec<PutEvent> = archive
3037            .events
3038            .iter()
3039            .filter(|e| e.time >= event_start_time && e.time < event_end_time)
3040            .cloned()
3041            .collect();
3042
3043        // Find matching rules and their targets for each replayed event
3044        let mut events_to_deliver: Vec<(PutEvent, Vec<EventTarget>)> = Vec::new();
3045
3046        for event in &replay_events {
3047            let matching_targets: Vec<EventTarget> = state
3048                .rules
3049                .values()
3050                .filter(|r| {
3051                    r.event_bus_name == bus_name
3052                        && r.state == "ENABLED"
3053                        && matches_pattern(
3054                            r.event_pattern.as_deref(),
3055                            &event.source,
3056                            &event.detail_type,
3057                            &event.detail,
3058                            &req.account_id,
3059                            &req.region,
3060                            &event.resources,
3061                        )
3062                })
3063                .flat_map(|r| r.targets.clone())
3064                .collect();
3065
3066            if !matching_targets.is_empty() {
3067                events_to_deliver.push((event.clone(), matching_targets));
3068            }
3069        }
3070
3071        let replay = Replay {
3072            name: name.clone(),
3073            arn: arn.clone(),
3074            description,
3075            event_source_arn,
3076            destination,
3077            event_start_time,
3078            event_end_time,
3079            state: "COMPLETED".to_string(),
3080            replay_start_time: now,
3081            replay_end_time: Some(now),
3082        };
3083        state.replays.insert(name, replay);
3084
3085        // Drop the lock before delivering
3086        drop(state);
3087
3088        // Deliver replayed events to targets (same logic as PutEvents)
3089        for (event, targets) in events_to_deliver {
3090            let detail_value: Value = serde_json::from_str(&event.detail).unwrap_or(json!({}));
3091            let event_json = json!({
3092                "version": "0",
3093                "id": event.event_id,
3094                "source": event.source,
3095                "account": req.account_id,
3096                "detail-type": event.detail_type,
3097                "detail": detail_value,
3098                "time": event.time.format("%Y-%m-%dT%H:%M:%SZ").to_string(),
3099                "region": req.region,
3100                "resources": event.resources,
3101                "replay-name": arn,
3102            });
3103            let event_str = event_json.to_string();
3104
3105            for target in targets {
3106                let target_arn = &target.arn;
3107                let body_str = if let Some(ref transformer) = target.input_transformer {
3108                    apply_input_transformer(transformer, &event_json)
3109                } else if let Some(ref input) = target.input {
3110                    input.clone()
3111                } else if let Some(ref input_path) = target.input_path {
3112                    resolve_json_path(&event_json, input_path)
3113                        .map(|v| v.to_string())
3114                        .unwrap_or_else(|| event_str.clone())
3115                } else {
3116                    event_str.clone()
3117                };
3118
3119                if target_arn.contains(":sqs:") {
3120                    let group_id = target
3121                        .sqs_parameters
3122                        .as_ref()
3123                        .and_then(|p| p["MessageGroupId"].as_str())
3124                        .map(|s| s.to_string());
3125                    if group_id.is_some() {
3126                        self.delivery.send_to_sqs_with_attrs(
3127                            target_arn,
3128                            &body_str,
3129                            &HashMap::new(),
3130                            group_id.as_deref(),
3131                            None,
3132                        );
3133                    } else {
3134                        self.delivery
3135                            .send_to_sqs(target_arn, &body_str, &HashMap::new());
3136                    }
3137                } else if target_arn.contains(":sns:") {
3138                    self.delivery
3139                        .publish_to_sns(target_arn, &body_str, Some(&event.detail_type));
3140                } else if target_arn.contains(":lambda:") {
3141                    let mut state = self.state.write();
3142                    state
3143                        .lambda_invocations
3144                        .push(crate::state::LambdaInvocation {
3145                            function_arn: target_arn.clone(),
3146                            payload: body_str.clone(),
3147                            timestamp: Utc::now(),
3148                        });
3149                    drop(state);
3150                    if let Some(ref ls) = self.lambda_state {
3151                        ls.write().invocations.push(LambdaInvocation {
3152                            function_arn: target_arn.clone(),
3153                            payload: body_str.clone(),
3154                            timestamp: Utc::now(),
3155                            source: "aws:events".to_string(),
3156                        });
3157                    }
3158                    invoke_lambda_async(
3159                        &self.container_runtime,
3160                        &self.lambda_state,
3161                        target_arn,
3162                        &body_str,
3163                    );
3164                } else if target_arn.contains(":logs:") {
3165                    let mut state = self.state.write();
3166                    state.log_deliveries.push(crate::state::LogDelivery {
3167                        log_group_arn: target_arn.clone(),
3168                        payload: body_str.clone(),
3169                        timestamp: Utc::now(),
3170                    });
3171                    drop(state);
3172                    if let Some(ref log_state) = self.logs_state {
3173                        deliver_to_logs(log_state, target_arn, &body_str, Utc::now());
3174                    }
3175                } else if target_arn.contains(":states:") {
3176                    let mut state = self.state.write();
3177                    state
3178                        .step_function_executions
3179                        .push(crate::state::StepFunctionExecution {
3180                            state_machine_arn: target_arn.clone(),
3181                            payload: body_str.clone(),
3182                            timestamp: Utc::now(),
3183                        });
3184                } else if target_arn.starts_with("https://") || target_arn.starts_with("http://") {
3185                    let url = target_arn.clone();
3186                    let payload = body_str.clone();
3187                    tokio::spawn(async move {
3188                        let client = reqwest::Client::new();
3189                        let _ = client
3190                            .post(&url)
3191                            .header("Content-Type", "application/json")
3192                            .body(payload)
3193                            .send()
3194                            .await;
3195                    });
3196                }
3197            }
3198        }
3199
3200        Ok(AwsResponse::ok_json(json!({
3201            "ReplayArn": arn,
3202            "ReplayStartTime": now.timestamp() as f64,
3203            "State": "STARTING",
3204        })))
3205    }
3206
3207    fn describe_replay(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3208        let body = req.json_body();
3209        validate_required("ReplayName", &body["ReplayName"])?;
3210        let name = body["ReplayName"]
3211            .as_str()
3212            .ok_or_else(|| missing("ReplayName"))?;
3213        validate_string_length("replayName", name, 1, 64)?;
3214
3215        let state = self.state.read();
3216        let replay = state.replays.get(name).ok_or_else(|| {
3217            AwsServiceError::aws_error(
3218                StatusCode::BAD_REQUEST,
3219                "ResourceNotFoundException",
3220                format!("Replay {name} does not exist."),
3221            )
3222        })?;
3223
3224        let mut resp = json!({
3225            "Destination": replay.destination,
3226            "EventSourceArn": replay.event_source_arn,
3227            "EventStartTime": replay.event_start_time.timestamp() as f64,
3228            "EventEndTime": replay.event_end_time.timestamp() as f64,
3229            "ReplayArn": replay.arn,
3230            "ReplayName": replay.name,
3231            "ReplayStartTime": replay.replay_start_time.timestamp() as f64,
3232            "State": replay.state,
3233        });
3234        if let Some(ref desc) = replay.description {
3235            resp["Description"] = json!(desc);
3236        }
3237        if let Some(ref end) = replay.replay_end_time {
3238            resp["ReplayEndTime"] = json!(end.timestamp() as f64);
3239        }
3240
3241        Ok(AwsResponse::ok_json(resp))
3242    }
3243
3244    fn list_replays(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3245        let body = req.json_body();
3246        validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 64)?;
3247        validate_optional_string_length(
3248            "eventSourceArn",
3249            body["EventSourceArn"].as_str(),
3250            1,
3251            1600,
3252        )?;
3253        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
3254        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
3255        let name_prefix = body["NamePrefix"].as_str();
3256        let source_arn = body["EventSourceArn"].as_str();
3257        let replay_state = body["State"].as_str();
3258
3259        // Validate at most one filter
3260        let filter_count = [
3261            name_prefix.is_some(),
3262            source_arn.is_some(),
3263            replay_state.is_some(),
3264        ]
3265        .iter()
3266        .filter(|&&x| x)
3267        .count();
3268        if filter_count > 1 {
3269            return Err(AwsServiceError::aws_error(
3270                StatusCode::BAD_REQUEST,
3271                "ValidationException",
3272                "At most one filter is allowed for ListReplays. Use either : State, EventSourceArn, or NamePrefix.",
3273            ));
3274        }
3275
3276        // Validate state
3277        if let Some(s) = replay_state {
3278            let valid = [
3279                "CANCELLED",
3280                "CANCELLING",
3281                "COMPLETED",
3282                "FAILED",
3283                "RUNNING",
3284                "STARTING",
3285            ];
3286            if !valid.contains(&s) {
3287                return Err(AwsServiceError::aws_error(
3288                    StatusCode::BAD_REQUEST,
3289                    "ValidationException",
3290                    format!(
3291                        "1 validation error detected: Value '{}' at 'state' failed to satisfy constraint: Member must satisfy enum value set: [CANCELLED, CANCELLING, COMPLETED, FAILED, RUNNING, STARTING]",
3292                        s
3293                    ),
3294                ));
3295            }
3296        }
3297
3298        let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
3299
3300        let state = self.state.read();
3301        let all: Vec<Value> = state
3302            .replays
3303            .values()
3304            .filter(|r| {
3305                if let Some(prefix) = name_prefix {
3306                    r.name.starts_with(prefix)
3307                } else if let Some(arn) = source_arn {
3308                    r.event_source_arn == arn
3309                } else if let Some(s) = replay_state {
3310                    r.state == s
3311                } else {
3312                    true
3313                }
3314            })
3315            .map(|r| {
3316                let mut obj = json!({
3317                    "EventSourceArn": r.event_source_arn,
3318                    "EventStartTime": r.event_start_time.timestamp() as f64,
3319                    "EventEndTime": r.event_end_time.timestamp() as f64,
3320                    "ReplayName": r.name,
3321                    "ReplayStartTime": r.replay_start_time.timestamp() as f64,
3322                    "State": r.state,
3323                });
3324                if let Some(ref end) = r.replay_end_time {
3325                    obj["ReplayEndTime"] = json!(end.timestamp() as f64);
3326                }
3327                obj
3328            })
3329            .collect();
3330
3331        let (replays, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
3332        let mut resp = json!({ "Replays": replays });
3333        if let Some(token) = next_token {
3334            resp["NextToken"] = json!(token);
3335        }
3336
3337        Ok(AwsResponse::ok_json(resp))
3338    }
3339
3340    fn cancel_replay(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3341        let body = req.json_body();
3342        validate_required("ReplayName", &body["ReplayName"])?;
3343        let name = body["ReplayName"]
3344            .as_str()
3345            .ok_or_else(|| missing("ReplayName"))?;
3346        validate_string_length("replayName", name, 1, 64)?;
3347
3348        let mut state = self.state.write();
3349        let replay = state.replays.get_mut(name).ok_or_else(|| {
3350            AwsServiceError::aws_error(
3351                StatusCode::BAD_REQUEST,
3352                "ResourceNotFoundException",
3353                format!("Replay {name} does not exist."),
3354            )
3355        })?;
3356
3357        // Can only cancel STARTING or RUNNING replays (or COMPLETED in our mock)
3358        if replay.state == "CANCELLED" || replay.state == "CANCELLING" {
3359            return Err(AwsServiceError::aws_error(
3360                StatusCode::BAD_REQUEST,
3361                "IllegalStatusException",
3362                format!("Replay {name} is not in a valid state for this operation."),
3363            ));
3364        }
3365
3366        let arn = replay.arn.clone();
3367        replay.state = "CANCELLED".to_string();
3368
3369        Ok(AwsResponse::ok_json(json!({
3370            "ReplayArn": arn,
3371            "State": "CANCELLING",
3372        })))
3373    }
3374}
3375
3376// ─── Tag Lookup Helpers ─────────────────────────────────────────────────
3377
3378fn find_tags_mut<'a>(
3379    state: &'a mut crate::state::EventBridgeState,
3380    arn: &str,
3381) -> Result<&'a mut HashMap<String, String>, AwsServiceError> {
3382    // Check buses
3383    for bus in state.buses.values_mut() {
3384        if bus.arn == arn {
3385            return Ok(&mut bus.tags);
3386        }
3387    }
3388    // Check rules
3389    for rule in state.rules.values_mut() {
3390        if rule.arn == arn {
3391            return Ok(&mut rule.tags);
3392        }
3393    }
3394
3395    // Parse ARN to give better error messages
3396    let error_msg = if arn.contains(":rule/") {
3397        // Extract rule name and bus from ARN
3398        let parts: Vec<&str> = arn.rsplitn(2, ":rule/").collect();
3399        if let Some(rule_path) = parts.first() {
3400            if let Some((bus, rule_name)) = rule_path.rsplit_once('/') {
3401                format!("Rule {rule_name} does not exist on EventBus {bus}.")
3402            } else {
3403                format!("Rule {} does not exist on EventBus default.", rule_path)
3404            }
3405        } else {
3406            format!("Resource {arn} not found.")
3407        }
3408    } else {
3409        format!("Resource {arn} not found.")
3410    };
3411
3412    Err(AwsServiceError::aws_error(
3413        StatusCode::BAD_REQUEST,
3414        "ResourceNotFoundException",
3415        error_msg,
3416    ))
3417}
3418
3419fn find_tags<'a>(
3420    state: &'a crate::state::EventBridgeState,
3421    arn: &str,
3422) -> Result<&'a HashMap<String, String>, AwsServiceError> {
3423    for bus in state.buses.values() {
3424        if bus.arn == arn {
3425            return Ok(&bus.tags);
3426        }
3427    }
3428    for rule in state.rules.values() {
3429        if rule.arn == arn {
3430            return Ok(&rule.tags);
3431        }
3432    }
3433
3434    let error_msg = if arn.contains(":rule/") {
3435        let parts: Vec<&str> = arn.rsplitn(2, ":rule/").collect();
3436        if let Some(rule_path) = parts.first() {
3437            if let Some((bus, rule_name)) = rule_path.rsplit_once('/') {
3438                format!("Rule {rule_name} does not exist on EventBus {bus}.")
3439            } else {
3440                format!("Rule {} does not exist on EventBus default.", rule_path)
3441            }
3442        } else {
3443            format!("Resource {arn} not found.")
3444        }
3445    } else {
3446        format!("Resource {arn} not found.")
3447    };
3448
3449    Err(AwsServiceError::aws_error(
3450        StatusCode::BAD_REQUEST,
3451        "ResourceNotFoundException",
3452        error_msg,
3453    ))
3454}
3455
3456// ─── Event Pattern Validation ────────────────────────────────────────
3457
3458fn validate_event_pattern(pattern: &str) -> Result<(), AwsServiceError> {
3459    let parsed: Value = serde_json::from_str(pattern).map_err(|_| {
3460        AwsServiceError::aws_error(
3461            StatusCode::BAD_REQUEST,
3462            "InvalidEventPatternException",
3463            "Event pattern is not valid. Reason: Invalid JSON",
3464        )
3465    })?;
3466
3467    validate_pattern_values(&parsed, "")?;
3468    Ok(())
3469}
3470
3471fn validate_pattern_values(value: &Value, path: &str) -> Result<(), AwsServiceError> {
3472    match value {
3473        Value::Object(obj) => {
3474            for (key, val) in obj {
3475                let new_path = if path.is_empty() {
3476                    key.clone()
3477                } else {
3478                    format!("{path}.{key}")
3479                };
3480                match val {
3481                    Value::Object(_) => validate_pattern_values(val, &new_path)?,
3482                    Value::Array(_) => {} // Arrays are fine at leaf level
3483                    _ => {
3484                        return Err(AwsServiceError::aws_error(
3485                            StatusCode::BAD_REQUEST,
3486                            "InvalidEventPatternException",
3487                            format!(
3488                                "Event pattern is not valid. Reason: '{}' must be an object or an array",
3489                                key
3490                            ),
3491                        ));
3492                    }
3493                }
3494            }
3495            Ok(())
3496        }
3497        _ => Ok(()),
3498    }
3499}
3500
3501// ─── Connection Auth Params Response Builder ────────────────────────
3502
3503fn build_auth_params_response(auth_type: &str, params: &Value) -> Value {
3504    match auth_type {
3505        "API_KEY" => {
3506            let mut resp = json!({});
3507            if let Some(api_key) = params.get("ApiKeyAuthParameters") {
3508                resp["ApiKeyAuthParameters"] = json!({
3509                    "ApiKeyName": api_key["ApiKeyName"],
3510                });
3511            }
3512            resp
3513        }
3514        "BASIC" => {
3515            let mut resp = json!({});
3516            if let Some(basic) = params.get("BasicAuthParameters") {
3517                resp["BasicAuthParameters"] = json!({
3518                    "Username": basic["Username"],
3519                });
3520            }
3521            resp
3522        }
3523        "OAUTH_CLIENT_CREDENTIALS" => {
3524            let mut resp = json!({});
3525            if let Some(oauth) = params.get("OAuthParameters") {
3526                resp["OAuthParameters"] = json!({
3527                    "AuthorizationEndpoint": oauth["AuthorizationEndpoint"],
3528                    "HttpMethod": oauth["HttpMethod"],
3529                    "ClientParameters": {
3530                        "ClientID": oauth.get("ClientParameters").and_then(|c| c.get("ClientID")),
3531                    },
3532                });
3533            }
3534            resp
3535        }
3536        _ => params.clone(),
3537    }
3538}
3539
3540// ─── Event Pattern Matching ─────────────────────────────────────────
3541
3542/// Match an event against an EventBridge event pattern.
3543pub fn matches_pattern(
3544    pattern_json: Option<&str>,
3545    source: &str,
3546    detail_type: &str,
3547    detail: &str,
3548    account: &str,
3549    region: &str,
3550    resources: &[String],
3551) -> bool {
3552    let pattern_json = match pattern_json {
3553        Some(p) => p,
3554        None => return true,
3555    };
3556
3557    let pattern: Value = match serde_json::from_str(pattern_json) {
3558        Ok(v) => v,
3559        Err(_) => return false,
3560    };
3561
3562    let pattern_obj = match pattern.as_object() {
3563        Some(o) => o,
3564        None => return false,
3565    };
3566
3567    let detail_value: Value = serde_json::from_str(detail).unwrap_or(json!({}));
3568    let event = json!({
3569        "source": source,
3570        "detail-type": detail_type,
3571        "detail": detail_value,
3572        "account": account,
3573        "region": region,
3574        "resources": resources,
3575    });
3576
3577    for (key, pattern_value) in pattern_obj {
3578        let event_value = &event[key];
3579        if !matches_value(pattern_value, event_value) {
3580            return false;
3581        }
3582    }
3583
3584    true
3585}
3586
3587fn matches_value(pattern: &Value, event_value: &Value) -> bool {
3588    match pattern {
3589        Value::Object(obj) => {
3590            for (key, sub_pattern) in obj {
3591                let sub_value = &event_value[key];
3592                if !matches_value(sub_pattern, sub_value) {
3593                    return false;
3594                }
3595            }
3596            true
3597        }
3598        Value::Array(arr) => arr.iter().any(|elem| matches_single(elem, event_value)),
3599        _ => false,
3600    }
3601}
3602
3603fn matches_single(pattern_elem: &Value, event_value: &Value) -> bool {
3604    match pattern_elem {
3605        Value::Object(obj) => {
3606            if let Some(prefix_val) = obj.get("prefix") {
3607                if let (Some(prefix), Some(actual)) = (prefix_val.as_str(), event_value.as_str()) {
3608                    return actual.starts_with(prefix);
3609                }
3610                return false;
3611            }
3612            if let Some(exists_val) = obj.get("exists") {
3613                let should_exist = exists_val.as_bool().unwrap_or(true);
3614                let does_exist = !event_value.is_null();
3615                return should_exist == does_exist;
3616            }
3617            if let Some(anything_but_val) = obj.get("anything-but") {
3618                return match anything_but_val {
3619                    Value::String(s) => event_value.as_str() != Some(s.as_str()),
3620                    Value::Array(arr) => !arr.iter().any(|v| values_equal(v, event_value)),
3621                    Value::Number(_) => event_value != anything_but_val,
3622                    _ => true,
3623                };
3624            }
3625            if let Some(numeric_val) = obj.get("numeric") {
3626                return matches_numeric(numeric_val, event_value);
3627            }
3628            false
3629        }
3630        _ => values_equal(pattern_elem, event_value),
3631    }
3632}
3633
3634fn matches_numeric(numeric_arr: &Value, event_value: &Value) -> bool {
3635    let arr = match numeric_arr.as_array() {
3636        Some(a) => a,
3637        None => return false,
3638    };
3639    let actual = match event_value.as_f64() {
3640        Some(n) => n,
3641        None => return false,
3642    };
3643    let mut i = 0;
3644    while i + 1 < arr.len() {
3645        let op = match arr[i].as_str() {
3646            Some(s) => s,
3647            None => return false,
3648        };
3649        let threshold = match arr[i + 1].as_f64() {
3650            Some(n) => n,
3651            None => return false,
3652        };
3653        let ok = match op {
3654            ">" => actual > threshold,
3655            ">=" => actual >= threshold,
3656            "<" => actual < threshold,
3657            "<=" => actual <= threshold,
3658            "=" => (actual - threshold).abs() < f64::EPSILON,
3659            _ => return false,
3660        };
3661        if !ok {
3662            return false;
3663        }
3664        i += 2;
3665    }
3666    true
3667}
3668
3669fn values_equal(a: &Value, b: &Value) -> bool {
3670    a == b
3671}
3672
3673/// Resolve a simple JSON path like `$.detail.name` against an event JSON value.
3674fn resolve_json_path(event: &Value, path: &str) -> Option<Value> {
3675    let path = path.strip_prefix('$').unwrap_or(path);
3676    let mut current = event;
3677    for segment in path.split('.') {
3678        if segment.is_empty() {
3679            continue;
3680        }
3681        current = current.get(segment)?;
3682    }
3683    Some(current.clone())
3684}
3685
3686/// Apply an EventBridge InputTransformer to an event.
3687fn apply_input_transformer(transformer: &Value, event: &Value) -> String {
3688    let input_paths_map = transformer
3689        .get("InputPathsMap")
3690        .and_then(|v| v.as_object())
3691        .cloned()
3692        .unwrap_or_default();
3693    let template = transformer
3694        .get("InputTemplate")
3695        .and_then(|v| v.as_str())
3696        .unwrap_or("")
3697        .to_string();
3698
3699    // Resolve all input paths
3700    let mut resolved: HashMap<String, Value> = HashMap::new();
3701    for (var_name, path_val) in &input_paths_map {
3702        if let Some(path_str) = path_val.as_str() {
3703            if let Some(val) = resolve_json_path(event, path_str) {
3704                resolved.insert(var_name.clone(), val);
3705            }
3706        }
3707    }
3708
3709    // Replace <varName> placeholders in template
3710    let mut result = template;
3711    for (var_name, val) in &resolved {
3712        let placeholder = format!("<{var_name}>");
3713        let replacement = match val {
3714            Value::String(s) => s.clone(),
3715            other => other.to_string(),
3716        };
3717        result = result.replace(&placeholder, &replacement);
3718    }
3719
3720    result
3721}
3722
3723fn missing(name: &str) -> AwsServiceError {
3724    AwsServiceError::aws_error(
3725        StatusCode::BAD_REQUEST,
3726        "ValidationException",
3727        format!("The request must contain the parameter {name}"),
3728    )
3729}
3730
3731/// Extract a Lambda function name from its ARN.
3732///
3733/// Handles both unqualified (`arn:aws:lambda:region:account:function:NAME`)
3734/// and qualified (`arn:aws:lambda:region:account:function:NAME:alias`) ARNs.
3735fn function_name_from_arn(arn: &str) -> &str {
3736    let parts: Vec<&str> = arn.split(':').collect();
3737    if parts.len() >= 7 && parts[5] == "function" {
3738        parts[6]
3739    } else {
3740        arn
3741    }
3742}
3743
3744/// Spawn a background task to invoke a Lambda function via ContainerRuntime.
3745/// This is fire-and-forget: EventBridge delivery is asynchronous.
3746pub fn invoke_lambda_async(
3747    container_runtime: &Option<Arc<ContainerRuntime>>,
3748    lambda_state: &Option<SharedLambdaState>,
3749    function_arn: &str,
3750    payload: &str,
3751) {
3752    let runtime = match container_runtime {
3753        Some(rt) => rt.clone(),
3754        None => return,
3755    };
3756    let lambda_state = match lambda_state {
3757        Some(ls) => ls.clone(),
3758        None => return,
3759    };
3760    let func_name = function_name_from_arn(function_arn).to_string();
3761    let payload = payload.as_bytes().to_vec();
3762
3763    tokio::spawn(async move {
3764        let func = {
3765            let state = lambda_state.read();
3766            state.functions.get(&func_name).cloned()
3767        };
3768        let func = match func {
3769            Some(f) => f,
3770            None => {
3771                tracing::warn!(
3772                    function = %func_name,
3773                    "EventBridge Lambda target not found, skipping invocation"
3774                );
3775                return;
3776            }
3777        };
3778        match runtime.invoke(&func, &payload).await {
3779            Ok(_) => {
3780                tracing::info!(function = %func_name, "EventBridge Lambda invocation succeeded");
3781            }
3782            Err(e) => {
3783                tracing::warn!(
3784                    function = %func_name,
3785                    error = %e,
3786                    "EventBridge Lambda invocation failed"
3787                );
3788            }
3789        }
3790    });
3791}
3792
3793/// Deliver an EventBridge event to CloudWatch Logs by writing a log event
3794/// to the appropriate log group and stream.
3795pub fn deliver_to_logs(
3796    logs_state: &SharedLogsState,
3797    log_group_arn: &str,
3798    payload: &str,
3799    timestamp: chrono::DateTime<chrono::Utc>,
3800) {
3801    // Extract log group name from ARN: arn:aws:logs:region:account:log-group:NAME
3802    // or just the name if it's not an ARN
3803    let group_name = if log_group_arn.contains(":log-group:") {
3804        log_group_arn
3805            .split(":log-group:")
3806            .nth(1)
3807            .unwrap_or(log_group_arn)
3808            .trim_end_matches(":*")
3809    } else {
3810        log_group_arn
3811    };
3812
3813    let stream_name = "events".to_string();
3814    let ts_millis = timestamp.timestamp_millis();
3815
3816    let mut state = logs_state.write();
3817    let region = state.region.clone();
3818    let account_id = state.account_id.clone();
3819
3820    // Auto-create log group and stream if they don't exist
3821    let group = state
3822        .log_groups
3823        .entry(group_name.to_string())
3824        .or_insert_with(|| fakecloud_logs::state::LogGroup {
3825            name: group_name.to_string(),
3826            arn: Arn::new(
3827                "logs",
3828                &region,
3829                &account_id,
3830                &format!("log-group:{group_name}"),
3831            )
3832            .to_string(),
3833            creation_time: ts_millis,
3834            retention_in_days: None,
3835            kms_key_id: None,
3836            tags: HashMap::new(),
3837            log_streams: HashMap::new(),
3838            stored_bytes: 0,
3839            subscription_filters: Vec::new(),
3840            data_protection_policy: None,
3841            index_policies: Vec::new(),
3842            transformer: None,
3843            deletion_protection: false,
3844        });
3845
3846    let stream = group
3847        .log_streams
3848        .entry(stream_name.clone())
3849        .or_insert_with(|| fakecloud_logs::state::LogStream {
3850            name: stream_name,
3851            arn: format!("{}:log-stream:events", group.arn),
3852            creation_time: ts_millis,
3853            first_event_timestamp: None,
3854            last_event_timestamp: None,
3855            last_ingestion_time: None,
3856            upload_sequence_token: "1".to_string(),
3857            events: Vec::new(),
3858        });
3859
3860    stream.events.push(fakecloud_logs::state::LogEvent {
3861        timestamp: ts_millis,
3862        message: payload.to_string(),
3863        ingestion_time: ts_millis,
3864    });
3865    stream.last_event_timestamp = Some(ts_millis);
3866    stream.last_ingestion_time = Some(ts_millis);
3867    if stream.first_event_timestamp.is_none() {
3868        stream.first_event_timestamp = Some(ts_millis);
3869    }
3870}
3871
3872/// Apply connection auth parameters to an outgoing HTTP request.
3873fn apply_connection_auth(
3874    mut builder: reqwest::RequestBuilder,
3875    conn: &Connection,
3876) -> reqwest::RequestBuilder {
3877    match conn.authorization_type.as_str() {
3878        "API_KEY" => {
3879            if let Some(params) = conn.auth_parameters.get("ApiKeyAuthParameters") {
3880                if let (Some(name), Some(value)) = (
3881                    params["ApiKeyName"].as_str(),
3882                    params["ApiKeyValue"].as_str(),
3883                ) {
3884                    builder = builder.header(name, value);
3885                }
3886            }
3887        }
3888        "BASIC" => {
3889            if let Some(params) = conn.auth_parameters.get("BasicAuthParameters") {
3890                if let (Some(user), Some(pass)) =
3891                    (params["Username"].as_str(), params["Password"].as_str())
3892                {
3893                    builder = builder.basic_auth(user, Some(pass));
3894                }
3895            }
3896        }
3897        "OAUTH_CLIENT_CREDENTIALS" => {
3898            // For OAuth, in a real implementation we'd exchange credentials for a token.
3899            // Here we pass client credentials as basic auth as a reasonable approximation.
3900            if let Some(params) = conn.auth_parameters.get("OAuthParameters") {
3901                if let (Some(client_id), Some(client_secret)) = (
3902                    params["ClientParameters"]["ClientID"].as_str(),
3903                    params["ClientParameters"]["ClientSecret"].as_str(),
3904                ) {
3905                    builder = builder.basic_auth(client_id, Some(client_secret));
3906                }
3907            }
3908        }
3909        _ => {}
3910    }
3911    builder
3912}
3913
3914#[cfg(test)]
3915mod tests {
3916    use super::*;
3917
3918    /// Test helper that calls matches_pattern with default account/region/resources
3919    fn test_matches(
3920        pattern_json: Option<&str>,
3921        source: &str,
3922        detail_type: &str,
3923        detail: &str,
3924    ) -> bool {
3925        matches_pattern(
3926            pattern_json,
3927            source,
3928            detail_type,
3929            detail,
3930            "123456789012",
3931            "us-east-1",
3932            &[],
3933        )
3934    }
3935
3936    #[test]
3937    fn pattern_matches_source() {
3938        assert!(test_matches(
3939            Some(r#"{"source": ["my.app"]}"#),
3940            "my.app",
3941            "OrderPlaced",
3942            "{}"
3943        ));
3944        assert!(!test_matches(
3945            Some(r#"{"source": ["other.app"]}"#),
3946            "my.app",
3947            "OrderPlaced",
3948            "{}"
3949        ));
3950    }
3951
3952    #[test]
3953    fn pattern_matches_detail_type() {
3954        assert!(test_matches(
3955            Some(r#"{"detail-type": ["OrderPlaced"]}"#),
3956            "my.app",
3957            "OrderPlaced",
3958            "{}"
3959        ));
3960        assert!(!test_matches(
3961            Some(r#"{"detail-type": ["OrderShipped"]}"#),
3962            "my.app",
3963            "OrderPlaced",
3964            "{}"
3965        ));
3966    }
3967
3968    #[test]
3969    fn pattern_matches_detail_field() {
3970        assert!(test_matches(
3971            Some(r#"{"detail": {"status": ["ACTIVE"]}}"#),
3972            "my.app",
3973            "StatusChange",
3974            r#"{"status": "ACTIVE"}"#
3975        ));
3976        assert!(!test_matches(
3977            Some(r#"{"detail": {"status": ["ACTIVE"]}}"#),
3978            "my.app",
3979            "StatusChange",
3980            r#"{"status": "INACTIVE"}"#
3981        ));
3982    }
3983
3984    #[test]
3985    fn no_pattern_matches_everything() {
3986        assert!(test_matches(None, "any", "any", "{}"));
3987    }
3988
3989    #[test]
3990    fn combined_pattern() {
3991        let pattern = r#"{"source": ["orders"], "detail-type": ["OrderPlaced"]}"#;
3992        assert!(test_matches(Some(pattern), "orders", "OrderPlaced", "{}"));
3993        assert!(!test_matches(Some(pattern), "orders", "OrderShipped", "{}"));
3994        assert!(!test_matches(Some(pattern), "other", "OrderPlaced", "{}"));
3995    }
3996
3997    #[test]
3998    fn nested_detail_pattern() {
3999        let pattern = r#"{"detail": {"order": {"status": ["PLACED"]}}}"#;
4000        assert!(test_matches(
4001            Some(pattern),
4002            "my.app",
4003            "OrderEvent",
4004            r#"{"order": {"status": "PLACED", "id": "123"}}"#
4005        ));
4006        assert!(!test_matches(
4007            Some(pattern),
4008            "my.app",
4009            "OrderEvent",
4010            r#"{"order": {"status": "SHIPPED", "id": "123"}}"#
4011        ));
4012        assert!(!test_matches(
4013            Some(pattern),
4014            "my.app",
4015            "OrderEvent",
4016            r#"{"order": {"id": "123"}}"#
4017        ));
4018    }
4019
4020    #[test]
4021    fn deeply_nested_detail_pattern() {
4022        let pattern = r#"{"detail": {"a": {"b": {"c": ["deep"]}}}}"#;
4023        assert!(test_matches(
4024            Some(pattern),
4025            "src",
4026            "type",
4027            r#"{"a": {"b": {"c": "deep"}}}"#
4028        ));
4029        assert!(!test_matches(
4030            Some(pattern),
4031            "src",
4032            "type",
4033            r#"{"a": {"b": {"c": "shallow"}}}"#
4034        ));
4035    }
4036
4037    #[test]
4038    fn prefix_matcher() {
4039        let pattern = r#"{"source": [{"prefix": "com.myapp"}]}"#;
4040        assert!(test_matches(
4041            Some(pattern),
4042            "com.myapp.orders",
4043            "OrderPlaced",
4044            "{}"
4045        ));
4046        assert!(test_matches(
4047            Some(pattern),
4048            "com.myapp",
4049            "OrderPlaced",
4050            "{}"
4051        ));
4052        assert!(!test_matches(
4053            Some(pattern),
4054            "com.other",
4055            "OrderPlaced",
4056            "{}"
4057        ));
4058    }
4059
4060    #[test]
4061    fn prefix_matcher_in_detail() {
4062        let pattern = r#"{"detail": {"region": [{"prefix": "us-"}]}}"#;
4063        assert!(test_matches(
4064            Some(pattern),
4065            "src",
4066            "type",
4067            r#"{"region": "us-east-1"}"#
4068        ));
4069        assert!(!test_matches(
4070            Some(pattern),
4071            "src",
4072            "type",
4073            r#"{"region": "eu-west-1"}"#
4074        ));
4075    }
4076
4077    #[test]
4078    fn exists_matcher() {
4079        let pattern = r#"{"detail": {"error": [{"exists": true}]}}"#;
4080        assert!(test_matches(
4081            Some(pattern),
4082            "src",
4083            "type",
4084            r#"{"error": "something broke"}"#
4085        ));
4086        assert!(!test_matches(
4087            Some(pattern),
4088            "src",
4089            "type",
4090            r#"{"status": "ok"}"#
4091        ));
4092
4093        let pattern = r#"{"detail": {"error": [{"exists": false}]}}"#;
4094        assert!(test_matches(
4095            Some(pattern),
4096            "src",
4097            "type",
4098            r#"{"status": "ok"}"#
4099        ));
4100        assert!(!test_matches(
4101            Some(pattern),
4102            "src",
4103            "type",
4104            r#"{"error": "something broke"}"#
4105        ));
4106    }
4107
4108    #[test]
4109    fn anything_but_matcher() {
4110        let pattern = r#"{"source": [{"anything-but": "internal"}]}"#;
4111        assert!(test_matches(Some(pattern), "external", "Event", "{}"));
4112        assert!(!test_matches(Some(pattern), "internal", "Event", "{}"));
4113
4114        let pattern = r#"{"source": [{"anything-but": ["internal", "test"]}]}"#;
4115        assert!(test_matches(Some(pattern), "external", "Event", "{}"));
4116        assert!(!test_matches(Some(pattern), "internal", "Event", "{}"));
4117        assert!(!test_matches(Some(pattern), "test", "Event", "{}"));
4118    }
4119
4120    #[test]
4121    fn anything_but_in_detail() {
4122        let pattern = r#"{"detail": {"env": [{"anything-but": "prod"}]}}"#;
4123        assert!(test_matches(
4124            Some(pattern),
4125            "src",
4126            "type",
4127            r#"{"env": "staging"}"#
4128        ));
4129        assert!(!test_matches(
4130            Some(pattern),
4131            "src",
4132            "type",
4133            r#"{"env": "prod"}"#
4134        ));
4135    }
4136
4137    #[test]
4138    fn numeric_greater_than() {
4139        let pattern = r#"{"detail": {"count": [{"numeric": [">", 100]}]}}"#;
4140        assert!(test_matches(
4141            Some(pattern),
4142            "src",
4143            "type",
4144            r#"{"count": 150}"#
4145        ));
4146        assert!(!test_matches(
4147            Some(pattern),
4148            "src",
4149            "type",
4150            r#"{"count": 100}"#
4151        ));
4152        assert!(!test_matches(
4153            Some(pattern),
4154            "src",
4155            "type",
4156            r#"{"count": 50}"#
4157        ));
4158    }
4159
4160    #[test]
4161    fn numeric_less_than() {
4162        let pattern = r#"{"detail": {"count": [{"numeric": ["<", 10]}]}}"#;
4163        assert!(test_matches(
4164            Some(pattern),
4165            "src",
4166            "type",
4167            r#"{"count": 5}"#
4168        ));
4169        assert!(!test_matches(
4170            Some(pattern),
4171            "src",
4172            "type",
4173            r#"{"count": 10}"#
4174        ));
4175        assert!(!test_matches(
4176            Some(pattern),
4177            "src",
4178            "type",
4179            r#"{"count": 15}"#
4180        ));
4181    }
4182
4183    #[test]
4184    fn numeric_range() {
4185        let pattern = r#"{"detail": {"count": [{"numeric": [">=", 50, "<", 200]}]}}"#;
4186        assert!(test_matches(
4187            Some(pattern),
4188            "src",
4189            "type",
4190            r#"{"count": 50}"#
4191        ));
4192        assert!(test_matches(
4193            Some(pattern),
4194            "src",
4195            "type",
4196            r#"{"count": 100}"#
4197        ));
4198        assert!(!test_matches(
4199            Some(pattern),
4200            "src",
4201            "type",
4202            r#"{"count": 200}"#
4203        ));
4204        assert!(!test_matches(
4205            Some(pattern),
4206            "src",
4207            "type",
4208            r#"{"count": 49}"#
4209        ));
4210    }
4211
4212    #[test]
4213    fn mixed_matchers_and_literals() {
4214        let pattern = r#"{"source": ["exact.match", {"prefix": "com.myapp"}]}"#;
4215        assert!(test_matches(Some(pattern), "exact.match", "Event", "{}"));
4216        assert!(test_matches(
4217            Some(pattern),
4218            "com.myapp.orders",
4219            "Event",
4220            "{}"
4221        ));
4222        assert!(!test_matches(Some(pattern), "other.source", "Event", "{}"));
4223    }
4224
4225    // ---- list_connections / list_api_destinations filtering & pagination ----
4226
4227    use crate::state::EventBridgeState;
4228    use fakecloud_core::delivery::DeliveryBus;
4229    use parking_lot::RwLock;
4230
4231    fn make_service() -> EventBridgeService {
4232        let state = Arc::new(RwLock::new(EventBridgeState::new(
4233            "123456789012",
4234            "us-east-1",
4235        )));
4236        let delivery = Arc::new(DeliveryBus::new());
4237        EventBridgeService::new(state, delivery)
4238    }
4239
4240    fn make_request(action: &str, body: Value) -> AwsRequest {
4241        AwsRequest {
4242            service: "events".to_string(),
4243            action: action.to_string(),
4244            region: "us-east-1".to_string(),
4245            account_id: "123456789012".to_string(),
4246            request_id: "test-id".to_string(),
4247            headers: http::HeaderMap::new(),
4248            query_params: HashMap::new(),
4249            body: serde_json::to_vec(&body).unwrap().into(),
4250            path_segments: vec![],
4251            raw_path: "/".to_string(),
4252            raw_query: String::new(),
4253            method: http::Method::POST,
4254            is_query_protocol: false,
4255            access_key_id: None,
4256        }
4257    }
4258
4259    fn create_connection(svc: &EventBridgeService, name: &str) {
4260        let req = make_request(
4261            "CreateConnection",
4262            json!({
4263                "Name": name,
4264                "AuthorizationType": "API_KEY",
4265                "AuthParameters": {
4266                    "ApiKeyAuthParameters": {
4267                        "ApiKeyName": "x-api-key",
4268                        "ApiKeyValue": "secret"
4269                    }
4270                }
4271            }),
4272        );
4273        svc.create_connection(&req).unwrap();
4274    }
4275
4276    fn create_api_destination(svc: &EventBridgeService, name: &str, conn_name: &str) {
4277        let conn_arn_field = {
4278            let state = svc.state.read();
4279            state.connections.get(conn_name).unwrap().arn.clone()
4280        };
4281        let req = make_request(
4282            "CreateApiDestination",
4283            json!({
4284                "Name": name,
4285                "ConnectionArn": conn_arn_field,
4286                "InvocationEndpoint": "https://example.com",
4287                "HttpMethod": "POST"
4288            }),
4289        );
4290        svc.create_api_destination(&req).unwrap();
4291    }
4292
4293    // -- ListConnections tests --
4294
4295    #[test]
4296    fn list_connections_returns_all_by_default() {
4297        let svc = make_service();
4298        create_connection(&svc, "conn-alpha");
4299        create_connection(&svc, "conn-beta");
4300        create_connection(&svc, "conn-gamma");
4301
4302        let req = make_request("ListConnections", json!({}));
4303        let resp = svc.list_connections(&req).unwrap();
4304        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4305        assert_eq!(body["Connections"].as_array().unwrap().len(), 3);
4306        assert!(body["NextToken"].is_null());
4307    }
4308
4309    #[test]
4310    fn list_connections_name_prefix_filter() {
4311        let svc = make_service();
4312        create_connection(&svc, "prod-conn-1");
4313        create_connection(&svc, "prod-conn-2");
4314        create_connection(&svc, "dev-conn-1");
4315
4316        let req = make_request("ListConnections", json!({ "NamePrefix": "prod-" }));
4317        let resp = svc.list_connections(&req).unwrap();
4318        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4319        let names: Vec<&str> = body["Connections"]
4320            .as_array()
4321            .unwrap()
4322            .iter()
4323            .map(|c| c["Name"].as_str().unwrap())
4324            .collect();
4325        assert_eq!(names.len(), 2);
4326        assert!(names.iter().all(|n| n.starts_with("prod-")));
4327    }
4328
4329    #[test]
4330    fn list_connections_state_filter() {
4331        let svc = make_service();
4332        create_connection(&svc, "conn-a");
4333        create_connection(&svc, "conn-b");
4334
4335        // All connections start as AUTHORIZED; change one
4336        {
4337            let mut state = svc.state.write();
4338            state
4339                .connections
4340                .get_mut("conn-b")
4341                .unwrap()
4342                .connection_state = "DEAUTHORIZED".to_string();
4343        }
4344
4345        let req = make_request(
4346            "ListConnections",
4347            json!({ "ConnectionState": "AUTHORIZED" }),
4348        );
4349        let resp = svc.list_connections(&req).unwrap();
4350        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4351        let conns = body["Connections"].as_array().unwrap();
4352        assert_eq!(conns.len(), 1);
4353        assert_eq!(conns[0]["Name"].as_str().unwrap(), "conn-a");
4354    }
4355
4356    #[test]
4357    fn list_connections_pagination() {
4358        let svc = make_service();
4359        for i in 0..5 {
4360            create_connection(&svc, &format!("conn-{i:02}"));
4361        }
4362
4363        // First page: limit 2
4364        let req = make_request("ListConnections", json!({ "Limit": 2 }));
4365        let resp = svc.list_connections(&req).unwrap();
4366        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4367        assert_eq!(body["Connections"].as_array().unwrap().len(), 2);
4368        let token = body["NextToken"].as_str().unwrap();
4369        assert_eq!(token, "2");
4370
4371        // Second page
4372        let req = make_request("ListConnections", json!({ "Limit": 2, "NextToken": token }));
4373        let resp = svc.list_connections(&req).unwrap();
4374        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4375        assert_eq!(body["Connections"].as_array().unwrap().len(), 2);
4376        let token = body["NextToken"].as_str().unwrap();
4377        assert_eq!(token, "4");
4378
4379        // Third page (only 1 remaining)
4380        let req = make_request("ListConnections", json!({ "Limit": 2, "NextToken": token }));
4381        let resp = svc.list_connections(&req).unwrap();
4382        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4383        assert_eq!(body["Connections"].as_array().unwrap().len(), 1);
4384        assert!(body["NextToken"].is_null());
4385    }
4386
4387    #[test]
4388    fn list_connections_pagination_with_filter() {
4389        let svc = make_service();
4390        for i in 0..4 {
4391            create_connection(&svc, &format!("prod-{i:02}"));
4392        }
4393        create_connection(&svc, "dev-00");
4394
4395        let req = make_request(
4396            "ListConnections",
4397            json!({ "NamePrefix": "prod-", "Limit": 2 }),
4398        );
4399        let resp = svc.list_connections(&req).unwrap();
4400        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4401        assert_eq!(body["Connections"].as_array().unwrap().len(), 2);
4402        assert!(body["NextToken"].as_str().is_some());
4403    }
4404
4405    // -- ListApiDestinations tests --
4406
4407    #[test]
4408    fn list_api_destinations_returns_all_by_default() {
4409        let svc = make_service();
4410        create_connection(&svc, "my-conn");
4411        create_api_destination(&svc, "dest-alpha", "my-conn");
4412        create_api_destination(&svc, "dest-beta", "my-conn");
4413
4414        let req = make_request("ListApiDestinations", json!({}));
4415        let resp = svc.list_api_destinations(&req).unwrap();
4416        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4417        assert_eq!(body["ApiDestinations"].as_array().unwrap().len(), 2);
4418        assert!(body["NextToken"].is_null());
4419    }
4420
4421    #[test]
4422    fn list_api_destinations_name_prefix_filter() {
4423        let svc = make_service();
4424        create_connection(&svc, "my-conn");
4425        create_api_destination(&svc, "prod-dest-1", "my-conn");
4426        create_api_destination(&svc, "prod-dest-2", "my-conn");
4427        create_api_destination(&svc, "dev-dest-1", "my-conn");
4428
4429        let req = make_request("ListApiDestinations", json!({ "NamePrefix": "prod-" }));
4430        let resp = svc.list_api_destinations(&req).unwrap();
4431        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4432        let names: Vec<&str> = body["ApiDestinations"]
4433            .as_array()
4434            .unwrap()
4435            .iter()
4436            .map(|d| d["Name"].as_str().unwrap())
4437            .collect();
4438        assert_eq!(names.len(), 2);
4439        assert!(names.iter().all(|n| n.starts_with("prod-")));
4440    }
4441
4442    #[test]
4443    fn list_api_destinations_connection_arn_filter() {
4444        let svc = make_service();
4445        create_connection(&svc, "conn-a");
4446        create_connection(&svc, "conn-b");
4447        create_api_destination(&svc, "dest-1", "conn-a");
4448        create_api_destination(&svc, "dest-2", "conn-b");
4449        create_api_destination(&svc, "dest-3", "conn-a");
4450
4451        let conn_a_arn = {
4452            let state = svc.state.read();
4453            state.connections.get("conn-a").unwrap().arn.clone()
4454        };
4455
4456        let req = make_request(
4457            "ListApiDestinations",
4458            json!({ "ConnectionArn": conn_a_arn }),
4459        );
4460        let resp = svc.list_api_destinations(&req).unwrap();
4461        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4462        let names: Vec<&str> = body["ApiDestinations"]
4463            .as_array()
4464            .unwrap()
4465            .iter()
4466            .map(|d| d["Name"].as_str().unwrap())
4467            .collect();
4468        assert_eq!(names.len(), 2);
4469        assert!(names.contains(&"dest-1"));
4470        assert!(names.contains(&"dest-3"));
4471    }
4472
4473    #[test]
4474    fn list_api_destinations_pagination() {
4475        let svc = make_service();
4476        create_connection(&svc, "my-conn");
4477        for i in 0..5 {
4478            create_api_destination(&svc, &format!("dest-{i:02}"), "my-conn");
4479        }
4480
4481        // First page
4482        let req = make_request("ListApiDestinations", json!({ "Limit": 2 }));
4483        let resp = svc.list_api_destinations(&req).unwrap();
4484        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4485        assert_eq!(body["ApiDestinations"].as_array().unwrap().len(), 2);
4486        let token = body["NextToken"].as_str().unwrap();
4487        assert_eq!(token, "2");
4488
4489        // Second page
4490        let req = make_request(
4491            "ListApiDestinations",
4492            json!({ "Limit": 2, "NextToken": token }),
4493        );
4494        let resp = svc.list_api_destinations(&req).unwrap();
4495        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4496        assert_eq!(body["ApiDestinations"].as_array().unwrap().len(), 2);
4497        let token = body["NextToken"].as_str().unwrap();
4498        assert_eq!(token, "4");
4499
4500        // Last page
4501        let req = make_request(
4502            "ListApiDestinations",
4503            json!({ "Limit": 2, "NextToken": token }),
4504        );
4505        let resp = svc.list_api_destinations(&req).unwrap();
4506        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4507        assert_eq!(body["ApiDestinations"].as_array().unwrap().len(), 1);
4508        assert!(body["NextToken"].is_null());
4509    }
4510
4511    // -- ListEventBuses pagination tests --
4512
4513    fn create_event_bus(svc: &EventBridgeService, name: &str) {
4514        let req = make_request("CreateEventBus", json!({ "Name": name }));
4515        svc.create_event_bus(&req).unwrap();
4516    }
4517
4518    #[test]
4519    fn list_event_buses_pagination() {
4520        let svc = make_service();
4521        // "default" bus already exists, create 4 more
4522        for i in 0..4 {
4523            create_event_bus(&svc, &format!("bus-{i:02}"));
4524        }
4525
4526        // First page: limit 2
4527        let req = make_request("ListEventBuses", json!({ "Limit": 2 }));
4528        let resp = svc.list_event_buses(&req).unwrap();
4529        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4530        assert_eq!(body["EventBuses"].as_array().unwrap().len(), 2);
4531        let token = body["NextToken"].as_str().unwrap();
4532        assert_eq!(token, "2");
4533
4534        // Second page
4535        let req = make_request("ListEventBuses", json!({ "Limit": 2, "NextToken": token }));
4536        let resp = svc.list_event_buses(&req).unwrap();
4537        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4538        assert_eq!(body["EventBuses"].as_array().unwrap().len(), 2);
4539        let token = body["NextToken"].as_str().unwrap();
4540        assert_eq!(token, "4");
4541
4542        // Third page (only 1 remaining)
4543        let req = make_request("ListEventBuses", json!({ "Limit": 2, "NextToken": token }));
4544        let resp = svc.list_event_buses(&req).unwrap();
4545        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4546        assert_eq!(body["EventBuses"].as_array().unwrap().len(), 1);
4547        assert!(body["NextToken"].is_null());
4548    }
4549
4550    #[test]
4551    fn list_event_buses_no_pagination_returns_all() {
4552        let svc = make_service();
4553        create_event_bus(&svc, "bus-alpha");
4554        create_event_bus(&svc, "bus-beta");
4555
4556        let req = make_request("ListEventBuses", json!({}));
4557        let resp = svc.list_event_buses(&req).unwrap();
4558        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4559        // default + 2 custom = 3
4560        assert_eq!(body["EventBuses"].as_array().unwrap().len(), 3);
4561        assert!(body["NextToken"].is_null());
4562    }
4563
4564    // -- PutEvents EndpointId tests --
4565
4566    #[test]
4567    fn put_events_never_includes_endpoint_id_in_response() {
4568        let svc = make_service();
4569        // Even when EndpointId is provided in the request, it must not appear in the response
4570        let req = make_request(
4571            "PutEvents",
4572            json!({
4573                "EndpointId": "my-endpoint.abc123",
4574                "Entries": [{
4575                    "Source": "my.source",
4576                    "DetailType": "MyType",
4577                    "Detail": "{}",
4578                    "EventBusName": "default"
4579                }]
4580            }),
4581        );
4582        let resp = svc.put_events(&req).unwrap();
4583        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4584        assert!(
4585            !body.as_object().unwrap().contains_key("EndpointId"),
4586            "EndpointId should never be in the PutEvents response"
4587        );
4588        assert_eq!(body["FailedEntryCount"], 0);
4589    }
4590
4591    // -- ListArchives pagination tests --
4592
4593    fn create_archive(svc: &EventBridgeService, name: &str) {
4594        let req = make_request(
4595            "CreateArchive",
4596            json!({
4597                "ArchiveName": name,
4598                "EventSourceArn": "arn:aws:events:us-east-1:123456789012:event-bus/default"
4599            }),
4600        );
4601        svc.create_archive(&req).unwrap();
4602    }
4603
4604    #[test]
4605    fn list_archives_pagination() {
4606        let svc = make_service();
4607        for i in 0..5 {
4608            create_archive(&svc, &format!("archive-{i:02}"));
4609        }
4610
4611        // First page: limit 2
4612        let req = make_request("ListArchives", json!({ "Limit": 2 }));
4613        let resp = svc.list_archives(&req).unwrap();
4614        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4615        assert_eq!(body["Archives"].as_array().unwrap().len(), 2);
4616        let token = body["NextToken"].as_str().unwrap();
4617        assert_eq!(token, "2");
4618
4619        // Second page
4620        let req = make_request("ListArchives", json!({ "Limit": 2, "NextToken": token }));
4621        let resp = svc.list_archives(&req).unwrap();
4622        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4623        assert_eq!(body["Archives"].as_array().unwrap().len(), 2);
4624        let token = body["NextToken"].as_str().unwrap();
4625        assert_eq!(token, "4");
4626
4627        // Third page (only 1 remaining)
4628        let req = make_request("ListArchives", json!({ "Limit": 2, "NextToken": token }));
4629        let resp = svc.list_archives(&req).unwrap();
4630        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4631        assert_eq!(body["Archives"].as_array().unwrap().len(), 1);
4632        assert!(body["NextToken"].is_null());
4633    }
4634
4635    // -- ListReplays pagination tests --
4636
4637    fn create_replay(svc: &EventBridgeService, name: &str) {
4638        // Need an archive first for the replay's event source
4639        let archive_arn = {
4640            let state = svc.state.read();
4641            if state.archives.contains_key("replay-archive") {
4642                state.archives["replay-archive"].arn.clone()
4643            } else {
4644                drop(state);
4645                create_archive(svc, "replay-archive");
4646                svc.state.read().archives["replay-archive"].arn.clone()
4647            }
4648        };
4649        let req = make_request(
4650            "StartReplay",
4651            json!({
4652                "ReplayName": name,
4653                "EventSourceArn": archive_arn,
4654                "EventStartTime": 1000000.0,
4655                "EventEndTime": 2000000.0,
4656                "Destination": {
4657                    "Arn": "arn:aws:events:us-east-1:123456789012:event-bus/default"
4658                }
4659            }),
4660        );
4661        svc.start_replay(&req).unwrap();
4662    }
4663
4664    #[test]
4665    fn list_replays_pagination() {
4666        let svc = make_service();
4667        for i in 0..5 {
4668            create_replay(&svc, &format!("replay-{i:02}"));
4669        }
4670
4671        // First page: limit 2
4672        let req = make_request("ListReplays", json!({ "Limit": 2 }));
4673        let resp = svc.list_replays(&req).unwrap();
4674        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4675        assert_eq!(body["Replays"].as_array().unwrap().len(), 2);
4676        let token = body["NextToken"].as_str().unwrap();
4677        assert_eq!(token, "2");
4678
4679        // Second page
4680        let req = make_request("ListReplays", json!({ "Limit": 2, "NextToken": token }));
4681        let resp = svc.list_replays(&req).unwrap();
4682        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4683        assert_eq!(body["Replays"].as_array().unwrap().len(), 2);
4684        let token = body["NextToken"].as_str().unwrap();
4685        assert_eq!(token, "4");
4686
4687        // Third page (only 1 remaining)
4688        let req = make_request("ListReplays", json!({ "Limit": 2, "NextToken": token }));
4689        let resp = svc.list_replays(&req).unwrap();
4690        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4691        assert_eq!(body["Replays"].as_array().unwrap().len(), 1);
4692        assert!(body["NextToken"].is_null());
4693    }
4694
4695    #[test]
4696    fn list_event_buses_invalid_next_token_returns_error() {
4697        let svc = make_service();
4698
4699        let req = make_request("ListEventBuses", json!({ "NextToken": "not-a-number" }));
4700        let result = svc.list_event_buses(&req);
4701        assert!(
4702            result.is_err(),
4703            "non-numeric NextToken should return an error"
4704        );
4705    }
4706
4707    // ---- TestEventPattern tests ----
4708
4709    #[test]
4710    fn test_event_pattern_match() {
4711        let svc = make_service();
4712        let req = make_request(
4713            "TestEventPattern",
4714            json!({
4715                "EventPattern": r#"{"source": ["my.app"]}"#,
4716                "Event": r#"{"source": "my.app", "detail-type": "Test", "detail": {}}"#
4717            }),
4718        );
4719        let resp = svc.test_event_pattern(&req).unwrap();
4720        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4721        assert_eq!(body["Result"], true);
4722    }
4723
4724    #[test]
4725    fn test_event_pattern_no_match() {
4726        let svc = make_service();
4727        let req = make_request(
4728            "TestEventPattern",
4729            json!({
4730                "EventPattern": r#"{"source": ["other.app"]}"#,
4731                "Event": r#"{"source": "my.app", "detail-type": "Test", "detail": {}}"#
4732            }),
4733        );
4734        let resp = svc.test_event_pattern(&req).unwrap();
4735        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4736        assert_eq!(body["Result"], false);
4737    }
4738
4739    #[test]
4740    fn test_event_pattern_detail_match() {
4741        let svc = make_service();
4742        let req = make_request(
4743            "TestEventPattern",
4744            json!({
4745                "EventPattern": r#"{"detail": {"status": ["PLACED"]}}"#,
4746                "Event": r#"{"source": "my.app", "detail-type": "Order", "detail": {"status": "PLACED", "id": "123"}}"#
4747            }),
4748        );
4749        let resp = svc.test_event_pattern(&req).unwrap();
4750        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4751        assert_eq!(body["Result"], true);
4752    }
4753
4754    // ---- UpdateEventBus tests ----
4755
4756    #[test]
4757    fn update_event_bus_description() {
4758        let svc = make_service();
4759        create_event_bus(&svc, "my-bus");
4760
4761        let req = make_request(
4762            "UpdateEventBus",
4763            json!({ "Name": "my-bus", "Description": "Updated desc" }),
4764        );
4765        let resp = svc.update_event_bus(&req).unwrap();
4766        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4767        assert_eq!(body["Name"], "my-bus");
4768
4769        // Verify via describe
4770        let req = make_request("DescribeEventBus", json!({ "Name": "my-bus" }));
4771        let resp = svc.describe_event_bus(&req).unwrap();
4772        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4773        assert_eq!(body["Description"], "Updated desc");
4774    }
4775
4776    #[test]
4777    fn update_event_bus_not_found() {
4778        let svc = make_service();
4779        let req = make_request(
4780            "UpdateEventBus",
4781            json!({ "Name": "ghost-bus", "Description": "nope" }),
4782        );
4783        assert!(svc.update_event_bus(&req).is_err());
4784    }
4785
4786    // ---- Endpoint CRUD tests ----
4787
4788    fn create_endpoint_helper(svc: &EventBridgeService, name: &str) {
4789        let req = make_request(
4790            "CreateEndpoint",
4791            json!({
4792                "Name": name,
4793                "RoutingConfig": {
4794                    "FailoverConfig": {
4795                        "Primary": { "HealthCheck": "" },
4796                        "Secondary": { "Route": "us-west-2" }
4797                    }
4798                },
4799                "EventBuses": [
4800                    { "EventBusArn": "arn:aws:events:us-east-1:123456789012:event-bus/default" }
4801                ]
4802            }),
4803        );
4804        svc.create_endpoint(&req).unwrap();
4805    }
4806
4807    #[test]
4808    fn endpoint_create_describe_delete() {
4809        let svc = make_service();
4810        create_endpoint_helper(&svc, "my-endpoint");
4811
4812        // Describe
4813        let req = make_request("DescribeEndpoint", json!({ "Name": "my-endpoint" }));
4814        let resp = svc.describe_endpoint(&req).unwrap();
4815        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4816        assert_eq!(body["Name"], "my-endpoint");
4817        assert_eq!(body["State"], "ACTIVE");
4818        assert!(body["EndpointId"].as_str().unwrap().contains("my-endpoint"));
4819
4820        // Delete
4821        let req = make_request("DeleteEndpoint", json!({ "Name": "my-endpoint" }));
4822        svc.delete_endpoint(&req).unwrap();
4823
4824        // Verify gone
4825        let req = make_request("DescribeEndpoint", json!({ "Name": "my-endpoint" }));
4826        assert!(svc.describe_endpoint(&req).is_err());
4827    }
4828
4829    #[test]
4830    fn endpoint_list_and_update() {
4831        let svc = make_service();
4832        create_endpoint_helper(&svc, "ep-alpha");
4833        create_endpoint_helper(&svc, "ep-beta");
4834
4835        // List all
4836        let req = make_request("ListEndpoints", json!({}));
4837        let resp = svc.list_endpoints(&req).unwrap();
4838        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4839        assert_eq!(body["Endpoints"].as_array().unwrap().len(), 2);
4840
4841        // Update
4842        let req = make_request(
4843            "UpdateEndpoint",
4844            json!({ "Name": "ep-alpha", "Description": "updated" }),
4845        );
4846        let resp = svc.update_endpoint(&req).unwrap();
4847        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4848        assert_eq!(body["Name"], "ep-alpha");
4849
4850        // Verify description
4851        let req = make_request("DescribeEndpoint", json!({ "Name": "ep-alpha" }));
4852        let resp = svc.describe_endpoint(&req).unwrap();
4853        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4854        assert_eq!(body["Description"], "updated");
4855    }
4856
4857    #[test]
4858    fn endpoint_duplicate_fails() {
4859        let svc = make_service();
4860        create_endpoint_helper(&svc, "dup-ep");
4861        let req = make_request(
4862            "CreateEndpoint",
4863            json!({
4864                "Name": "dup-ep",
4865                "RoutingConfig": {},
4866                "EventBuses": []
4867            }),
4868        );
4869        assert!(svc.create_endpoint(&req).is_err());
4870    }
4871
4872    // ---- DeauthorizeConnection tests ----
4873
4874    #[test]
4875    fn deauthorize_connection_sets_state() {
4876        let svc = make_service();
4877        create_connection(&svc, "deauth-conn");
4878
4879        let req = make_request("DeauthorizeConnection", json!({ "Name": "deauth-conn" }));
4880        let resp = svc.deauthorize_connection(&req).unwrap();
4881        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4882        assert_eq!(body["ConnectionState"], "DEAUTHORIZING");
4883        assert!(body["ConnectionArn"]
4884            .as_str()
4885            .unwrap()
4886            .contains("deauth-conn"));
4887
4888        // Verify via describe
4889        let req = make_request("DescribeConnection", json!({ "Name": "deauth-conn" }));
4890        let resp = svc.describe_connection(&req).unwrap();
4891        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4892        assert_eq!(body["ConnectionState"], "DEAUTHORIZING");
4893    }
4894
4895    #[test]
4896    fn deauthorize_connection_not_found() {
4897        let svc = make_service();
4898        let req = make_request("DeauthorizeConnection", json!({ "Name": "ghost-conn" }));
4899        assert!(svc.deauthorize_connection(&req).is_err());
4900    }
4901
4902    // ---- Partner event source tests ----
4903
4904    #[test]
4905    fn partner_event_source_crud() {
4906        let svc = make_service();
4907
4908        // Create
4909        let req = make_request(
4910            "CreatePartnerEventSource",
4911            json!({ "Name": "partner/test", "Account": "123456789012" }),
4912        );
4913        svc.create_partner_event_source(&req).unwrap();
4914
4915        // Describe
4916        let req = make_request(
4917            "DescribePartnerEventSource",
4918            json!({ "Name": "partner/test" }),
4919        );
4920        let resp = svc.describe_partner_event_source(&req).unwrap();
4921        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4922        assert_eq!(body["Name"], "partner/test");
4923
4924        // List
4925        let req = make_request("ListPartnerEventSources", json!({"NamePrefix": "partner/"}));
4926        let resp = svc.list_partner_event_sources(&req).unwrap();
4927        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4928        assert_eq!(body["PartnerEventSources"].as_array().unwrap().len(), 1);
4929
4930        // ListPartnerEventSourceAccounts
4931        let req = make_request(
4932            "ListPartnerEventSourceAccounts",
4933            json!({ "EventSourceName": "partner/test" }),
4934        );
4935        let resp = svc.list_partner_event_source_accounts(&req).unwrap();
4936        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4937        assert_eq!(
4938            body["PartnerEventSourceAccounts"].as_array().unwrap().len(),
4939            1
4940        );
4941
4942        // DescribeEventSource
4943        let req = make_request("DescribeEventSource", json!({ "Name": "partner/test" }));
4944        let resp = svc.describe_event_source(&req).unwrap();
4945        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4946        assert_eq!(body["Name"], "partner/test");
4947        assert_eq!(body["State"], "ACTIVE");
4948
4949        // ListEventSources
4950        let req = make_request("ListEventSources", json!({}));
4951        let resp = svc.list_event_sources(&req).unwrap();
4952        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4953        assert_eq!(body["EventSources"].as_array().unwrap().len(), 1);
4954
4955        // Delete
4956        let req = make_request(
4957            "DeletePartnerEventSource",
4958            json!({ "Name": "partner/test", "Account": "123456789012" }),
4959        );
4960        svc.delete_partner_event_source(&req).unwrap();
4961
4962        // Verify gone
4963        let req = make_request(
4964            "DescribePartnerEventSource",
4965            json!({ "Name": "partner/test" }),
4966        );
4967        assert!(svc.describe_partner_event_source(&req).is_err());
4968    }
4969
4970    #[test]
4971    fn activate_deactivate_event_source() {
4972        let svc = make_service();
4973
4974        // Create a partner event source first
4975        let req = make_request(
4976            "CreatePartnerEventSource",
4977            json!({ "Name": "aws.partner/test", "Account": "123456789012" }),
4978        );
4979        svc.create_partner_event_source(&req).unwrap();
4980
4981        // Deactivate it
4982        let req = make_request(
4983            "DeactivateEventSource",
4984            json!({ "Name": "aws.partner/test" }),
4985        );
4986        svc.deactivate_event_source(&req).unwrap();
4987        {
4988            let state = svc.state.read();
4989            assert_eq!(
4990                state.partner_event_sources["aws.partner/test"].state,
4991                "INACTIVE"
4992            );
4993        }
4994
4995        // Activate it
4996        let req = make_request("ActivateEventSource", json!({ "Name": "aws.partner/test" }));
4997        svc.activate_event_source(&req).unwrap();
4998        {
4999            let state = svc.state.read();
5000            assert_eq!(
5001                state.partner_event_sources["aws.partner/test"].state,
5002                "ACTIVE"
5003            );
5004        }
5005
5006        // Not-found returns error
5007        let req = make_request("ActivateEventSource", json!({ "Name": "nonexistent" }));
5008        assert!(svc.activate_event_source(&req).is_err());
5009
5010        let req = make_request("DeactivateEventSource", json!({ "Name": "nonexistent" }));
5011        assert!(svc.deactivate_event_source(&req).is_err());
5012    }
5013
5014    #[test]
5015    fn delete_partner_event_source_verifies_account() {
5016        let svc = make_service();
5017
5018        // Create a partner event source
5019        let req = make_request(
5020            "CreatePartnerEventSource",
5021            json!({ "Name": "aws.partner/test", "Account": "123456789012" }),
5022        );
5023        svc.create_partner_event_source(&req).unwrap();
5024
5025        // Deleting with wrong account fails
5026        let req = make_request(
5027            "DeletePartnerEventSource",
5028            json!({ "Name": "aws.partner/test", "Account": "999999999999" }),
5029        );
5030        assert!(svc.delete_partner_event_source(&req).is_err());
5031        // Source still exists
5032        assert!(svc
5033            .state
5034            .read()
5035            .partner_event_sources
5036            .contains_key("aws.partner/test"));
5037
5038        // Deleting with correct account succeeds
5039        let req = make_request(
5040            "DeletePartnerEventSource",
5041            json!({ "Name": "aws.partner/test", "Account": "123456789012" }),
5042        );
5043        svc.delete_partner_event_source(&req).unwrap();
5044        assert!(!svc
5045            .state
5046            .read()
5047            .partner_event_sources
5048            .contains_key("aws.partner/test"));
5049
5050        // Deleting non-existent source returns error
5051        let req = make_request(
5052            "DeletePartnerEventSource",
5053            json!({ "Name": "aws.partner/test", "Account": "123456789012" }),
5054        );
5055        assert!(svc.delete_partner_event_source(&req).is_err());
5056    }
5057
5058    #[test]
5059    fn put_partner_events() {
5060        let svc = make_service();
5061        let req = make_request(
5062            "PutPartnerEvents",
5063            json!({
5064                "Entries": [
5065                    { "Source": "partner.app", "DetailType": "Test", "Detail": "{}" }
5066                ]
5067            }),
5068        );
5069        let resp = svc.put_partner_events(&req).unwrap();
5070        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5071        assert_eq!(body["FailedEntryCount"], 0);
5072        assert_eq!(body["Entries"].as_array().unwrap().len(), 1);
5073        assert!(body["Entries"][0]["EventId"].as_str().is_some());
5074    }
5075
5076    // ---- Archive + Replay delivery tests ----
5077
5078    /// Helper: create a service with a mock SQS delivery that records messages.
5079    #[allow(clippy::type_complexity)]
5080    fn make_service_with_sqs_recorder() -> (
5081        EventBridgeService,
5082        Arc<parking_lot::Mutex<Vec<(String, String)>>>,
5083    ) {
5084        use fakecloud_core::delivery::SqsDelivery;
5085
5086        struct RecordingSqsDelivery {
5087            messages: Arc<parking_lot::Mutex<Vec<(String, String)>>>,
5088        }
5089
5090        impl SqsDelivery for RecordingSqsDelivery {
5091            fn deliver_to_queue(
5092                &self,
5093                queue_arn: &str,
5094                message_body: &str,
5095                _attributes: &HashMap<String, String>,
5096            ) {
5097                self.messages
5098                    .lock()
5099                    .push((queue_arn.to_string(), message_body.to_string()));
5100            }
5101        }
5102
5103        let messages: Arc<parking_lot::Mutex<Vec<(String, String)>>> =
5104            Arc::new(parking_lot::Mutex::new(Vec::new()));
5105        let state = Arc::new(RwLock::new(EventBridgeState::new(
5106            "123456789012",
5107            "us-east-1",
5108        )));
5109        let delivery = Arc::new(DeliveryBus::new().with_sqs(Arc::new(RecordingSqsDelivery {
5110            messages: messages.clone(),
5111        })));
5112        let svc = EventBridgeService::new(state, delivery);
5113        (svc, messages)
5114    }
5115
5116    #[test]
5117    fn start_replay_delivers_archived_events_to_sqs_target() {
5118        let (svc, messages) = make_service_with_sqs_recorder();
5119        let queue_arn = "arn:aws:sqs:us-east-1:123456789012:replay-queue";
5120
5121        // Create a rule with an SQS target
5122        let req = make_request(
5123            "PutRule",
5124            json!({
5125                "Name": "replay-test-rule",
5126                "EventPattern": r#"{"source": ["my.app"]}"#,
5127                "State": "ENABLED"
5128            }),
5129        );
5130        svc.put_rule(&req).unwrap();
5131
5132        let req = make_request(
5133            "PutTargets",
5134            json!({
5135                "Rule": "replay-test-rule",
5136                "Targets": [{
5137                    "Id": "sqs-target",
5138                    "Arn": queue_arn
5139                }]
5140            }),
5141        );
5142        svc.put_targets(&req).unwrap();
5143
5144        // Create an archive on the default bus
5145        let req = make_request(
5146            "CreateArchive",
5147            json!({
5148                "ArchiveName": "test-archive",
5149                "EventSourceArn": "arn:aws:events:us-east-1:123456789012:event-bus/default"
5150            }),
5151        );
5152        svc.create_archive(&req).unwrap();
5153
5154        // PutEvents: these should get archived and delivered
5155        let req = make_request(
5156            "PutEvents",
5157            json!({
5158                "Entries": [
5159                    {
5160                        "Source": "my.app",
5161                        "DetailType": "OrderCreated",
5162                        "Detail": "{\"orderId\": \"1\"}",
5163                        "EventBusName": "default"
5164                    },
5165                    {
5166                        "Source": "my.app",
5167                        "DetailType": "OrderShipped",
5168                        "Detail": "{\"orderId\": \"2\"}",
5169                        "EventBusName": "default"
5170                    }
5171                ]
5172            }),
5173        );
5174        let resp = svc.put_events(&req).unwrap();
5175        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5176        assert_eq!(body["FailedEntryCount"], 0);
5177
5178        // Verify archive has 2 events
5179        {
5180            let state = svc.state.read();
5181            let archive = state.archives.get("test-archive").unwrap();
5182            assert_eq!(archive.events.len(), 2);
5183            assert_eq!(archive.event_count, 2);
5184        }
5185
5186        // Clear recorded messages from PutEvents delivery
5187        messages.lock().clear();
5188
5189        // StartReplay: should re-deliver the archived events
5190        let archive_arn = {
5191            let state = svc.state.read();
5192            state.archives.get("test-archive").unwrap().arn.clone()
5193        };
5194
5195        // Use a wide time range to capture all events
5196        let start_ts = 0.0_f64;
5197        let end_ts = (chrono::Utc::now().timestamp() + 3600) as f64;
5198
5199        let req = make_request(
5200            "StartReplay",
5201            json!({
5202                "ReplayName": "my-replay",
5203                "EventSourceArn": archive_arn,
5204                "Destination": {
5205                    "Arn": "arn:aws:events:us-east-1:123456789012:event-bus/default"
5206                },
5207                "EventStartTime": start_ts,
5208                "EventEndTime": end_ts
5209            }),
5210        );
5211        let resp = svc.start_replay(&req).unwrap();
5212        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5213        assert_eq!(body["State"], "STARTING");
5214
5215        // Verify the replay delivered events to SQS
5216        let delivered = messages.lock();
5217        assert_eq!(
5218            delivered.len(),
5219            2,
5220            "expected 2 replayed events delivered to SQS"
5221        );
5222        for (arn, msg) in delivered.iter() {
5223            assert_eq!(arn, queue_arn);
5224            let event: Value = serde_json::from_str(msg).unwrap();
5225            assert_eq!(event["source"], "my.app");
5226            // Replayed events should include replay-name
5227            assert!(event["replay-name"].as_str().is_some());
5228        }
5229
5230        // Verify replay is marked as COMPLETED
5231        let state = svc.state.read();
5232        let replay = state.replays.get("my-replay").unwrap();
5233        assert_eq!(replay.state, "COMPLETED");
5234    }
5235
5236    #[test]
5237    fn apply_connection_auth_api_key() {
5238        let conn = Connection {
5239            name: "test-conn".to_string(),
5240            arn: "arn:aws:events:us-east-1:123456789012:connection/test-conn/uuid".to_string(),
5241            description: None,
5242            authorization_type: "API_KEY".to_string(),
5243            auth_parameters: json!({
5244                "ApiKeyAuthParameters": {
5245                    "ApiKeyName": "x-api-key",
5246                    "ApiKeyValue": "my-secret"
5247                }
5248            }),
5249            connection_state: "AUTHORIZED".to_string(),
5250            secret_arn: "arn:aws:secretsmanager:us-east-1:123456789012:secret:test".to_string(),
5251            creation_time: Utc::now(),
5252            last_modified_time: Utc::now(),
5253            last_authorized_time: Utc::now(),
5254        };
5255
5256        let client = reqwest::Client::new();
5257        let builder = client
5258            .post("http://localhost:12345/test")
5259            .header("Content-Type", "application/json");
5260        let builder = apply_connection_auth(builder, &conn);
5261
5262        // Build and verify the header was applied
5263        let request = builder.body("{}").build().unwrap();
5264        assert_eq!(
5265            request
5266                .headers()
5267                .get("x-api-key")
5268                .unwrap()
5269                .to_str()
5270                .unwrap(),
5271            "my-secret"
5272        );
5273    }
5274
5275    #[test]
5276    fn apply_connection_auth_basic() {
5277        let conn = Connection {
5278            name: "basic-conn".to_string(),
5279            arn: "arn:aws:events:us-east-1:123456789012:connection/basic-conn/uuid".to_string(),
5280            description: None,
5281            authorization_type: "BASIC".to_string(),
5282            auth_parameters: json!({
5283                "BasicAuthParameters": {
5284                    "Username": "user",
5285                    "Password": "pass"
5286                }
5287            }),
5288            connection_state: "AUTHORIZED".to_string(),
5289            secret_arn: "arn:aws:secretsmanager:us-east-1:123456789012:secret:test".to_string(),
5290            creation_time: Utc::now(),
5291            last_modified_time: Utc::now(),
5292            last_authorized_time: Utc::now(),
5293        };
5294
5295        let client = reqwest::Client::new();
5296        let builder = client.post("http://localhost:12345/test");
5297        let builder = apply_connection_auth(builder, &conn);
5298
5299        let request = builder.body("{}").build().unwrap();
5300        let auth_header = request
5301            .headers()
5302            .get("authorization")
5303            .unwrap()
5304            .to_str()
5305            .unwrap();
5306        assert!(
5307            auth_header.starts_with("Basic "),
5308            "Expected Basic auth header, got: {auth_header}"
5309        );
5310    }
5311
5312    #[tokio::test]
5313    async fn put_events_with_api_destination_target_resolves_destination() {
5314        // This test verifies that the PutEvents code path correctly identifies
5315        // api-destination ARN targets and resolves the destination metadata.
5316        // The actual HTTP call goes to a non-existent host (fire-and-forget).
5317        let state = Arc::new(RwLock::new(EventBridgeState::new(
5318            "123456789012",
5319            "us-east-1",
5320        )));
5321        let delivery = Arc::new(DeliveryBus::new());
5322        let svc = EventBridgeService::new(state, delivery);
5323
5324        // Create connection and api destination
5325        create_connection(&svc, "my-conn");
5326        let conn_arn = {
5327            let state = svc.state.read();
5328            state.connections.get("my-conn").unwrap().arn.clone()
5329        };
5330        let req = make_request(
5331            "CreateApiDestination",
5332            json!({
5333                "Name": "my-dest",
5334                "ConnectionArn": conn_arn,
5335                "InvocationEndpoint": "http://127.0.0.1:1/noop",
5336                "HttpMethod": "POST"
5337            }),
5338        );
5339        svc.create_api_destination(&req).unwrap();
5340
5341        let dest_arn = {
5342            let state = svc.state.read();
5343            state.api_destinations.get("my-dest").unwrap().arn.clone()
5344        };
5345
5346        // Create a rule that targets the api-destination
5347        let req = make_request(
5348            "PutRule",
5349            json!({
5350                "Name": "api-dest-rule",
5351                "EventPattern": r#"{"source":["test.app"]}"#,
5352                "State": "ENABLED"
5353            }),
5354        );
5355        svc.put_rule(&req).unwrap();
5356
5357        let req = make_request(
5358            "PutTargets",
5359            json!({
5360                "Rule": "api-dest-rule",
5361                "Targets": [{ "Id": "dest-target", "Arn": dest_arn }]
5362            }),
5363        );
5364        svc.put_targets(&req).unwrap();
5365
5366        // PutEvents - should match the rule and attempt delivery to ApiDestination
5367        let req = make_request(
5368            "PutEvents",
5369            json!({
5370                "Entries": [{
5371                    "Source": "test.app",
5372                    "DetailType": "TestEvent",
5373                    "Detail": r#"{"key":"value"}"#
5374                }]
5375            }),
5376        );
5377        let resp = svc.put_events(&req).unwrap();
5378        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5379        assert_eq!(body["FailedEntryCount"], 0);
5380        assert_eq!(body["Entries"].as_array().unwrap().len(), 1);
5381        assert!(body["Entries"][0]["EventId"].as_str().is_some());
5382    }
5383
5384    #[test]
5385    fn test_function_name_from_arn() {
5386        // Unqualified ARN
5387        assert_eq!(
5388            super::function_name_from_arn("arn:aws:lambda:us-east-1:123456789012:function:my-func"),
5389            "my-func"
5390        );
5391        // Qualified ARN with alias
5392        assert_eq!(
5393            super::function_name_from_arn(
5394                "arn:aws:lambda:us-east-1:123456789012:function:my-func:prod"
5395            ),
5396            "my-func"
5397        );
5398        // Qualified ARN with version
5399        assert_eq!(
5400            super::function_name_from_arn(
5401                "arn:aws:lambda:us-east-1:123456789012:function:my-func:42"
5402            ),
5403            "my-func"
5404        );
5405        // Plain function name (not an ARN)
5406        assert_eq!(super::function_name_from_arn("my-func"), "my-func");
5407    }
5408}