Skip to main content

fakecloud_eventbridge/
service.rs

1use async_trait::async_trait;
2use chrono::{DateTime, Utc};
3use http::StatusCode;
4use serde_json::{json, Value};
5
6use std::collections::HashMap;
7use std::sync::Arc;
8
9use fakecloud_aws::arn::Arn;
10use fakecloud_core::delivery::DeliveryBus;
11use fakecloud_core::pagination::paginate;
12use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
13use fakecloud_core::validation::*;
14
15use fakecloud_lambda::runtime::ContainerRuntime;
16use fakecloud_lambda::state::{LambdaInvocation, SharedLambdaState};
17use fakecloud_logs::state::SharedLogsState;
18
19use crate::state::{
20    ApiDestination, Archive, Connection, Endpoint, EventBus, EventRule, EventTarget,
21    PartnerEventSource, PutEvent, Replay, SharedEventBridgeState,
22};
23
24pub struct EventBridgeService {
25    state: SharedEventBridgeState,
26    delivery: Arc<DeliveryBus>,
27    lambda_state: Option<SharedLambdaState>,
28    logs_state: Option<SharedLogsState>,
29    container_runtime: Option<Arc<ContainerRuntime>>,
30}
31
32impl EventBridgeService {
33    pub fn new(state: SharedEventBridgeState, delivery: Arc<DeliveryBus>) -> Self {
34        Self {
35            state,
36            delivery,
37            lambda_state: None,
38            logs_state: None,
39            container_runtime: None,
40        }
41    }
42
43    pub fn with_lambda(mut self, lambda_state: SharedLambdaState) -> Self {
44        self.lambda_state = Some(lambda_state);
45        self
46    }
47
48    pub fn with_logs(mut self, logs_state: SharedLogsState) -> Self {
49        self.logs_state = Some(logs_state);
50        self
51    }
52
53    pub fn with_runtime(mut self, runtime: Arc<ContainerRuntime>) -> Self {
54        self.container_runtime = Some(runtime);
55        self
56    }
57}
58
59#[async_trait]
60impl AwsService for EventBridgeService {
61    fn service_name(&self) -> &str {
62        "events"
63    }
64
65    async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
66        match req.action.as_str() {
67            "CreateEventBus" => self.create_event_bus(&req),
68            "DeleteEventBus" => self.delete_event_bus(&req),
69            "ListEventBuses" => self.list_event_buses(&req),
70            "DescribeEventBus" => self.describe_event_bus(&req),
71            "PutRule" => self.put_rule(&req),
72            "DeleteRule" => self.delete_rule(&req),
73            "ListRules" => self.list_rules(&req),
74            "DescribeRule" => self.describe_rule(&req),
75            "EnableRule" => self.enable_rule(&req),
76            "DisableRule" => self.disable_rule(&req),
77            "PutTargets" => self.put_targets(&req),
78            "RemoveTargets" => self.remove_targets(&req),
79            "ListTargetsByRule" => self.list_targets_by_rule(&req),
80            "ListRuleNamesByTarget" => self.list_rule_names_by_target(&req),
81            "PutEvents" => self.put_events(&req),
82            "PutPermission" => self.put_permission(&req),
83            "RemovePermission" => self.remove_permission(&req),
84            "TagResource" => self.tag_resource(&req),
85            "UntagResource" => self.untag_resource(&req),
86            "ListTagsForResource" => self.list_tags_for_resource(&req),
87            "CreateArchive" => self.create_archive(&req),
88            "DescribeArchive" => self.describe_archive(&req),
89            "ListArchives" => self.list_archives(&req),
90            "UpdateArchive" => self.update_archive(&req),
91            "DeleteArchive" => self.delete_archive(&req),
92            "CreateConnection" => self.create_connection(&req),
93            "DescribeConnection" => self.describe_connection(&req),
94            "ListConnections" => self.list_connections(&req),
95            "UpdateConnection" => self.update_connection(&req),
96            "DeleteConnection" => self.delete_connection(&req),
97            "CreateApiDestination" => self.create_api_destination(&req),
98            "DescribeApiDestination" => self.describe_api_destination(&req),
99            "ListApiDestinations" => self.list_api_destinations(&req),
100            "UpdateApiDestination" => self.update_api_destination(&req),
101            "DeleteApiDestination" => self.delete_api_destination(&req),
102            "StartReplay" => self.start_replay(&req),
103            "DescribeReplay" => self.describe_replay(&req),
104            "ListReplays" => self.list_replays(&req),
105            "CancelReplay" => self.cancel_replay(&req),
106            "CreatePartnerEventSource" => self.create_partner_event_source(&req),
107            "DeletePartnerEventSource" => self.delete_partner_event_source(&req),
108            "DescribePartnerEventSource" => self.describe_partner_event_source(&req),
109            "ListPartnerEventSources" => self.list_partner_event_sources(&req),
110            "ListPartnerEventSourceAccounts" => self.list_partner_event_source_accounts(&req),
111            "ActivateEventSource" => self.activate_event_source(&req),
112            "DeactivateEventSource" => self.deactivate_event_source(&req),
113            "DescribeEventSource" => self.describe_event_source(&req),
114            "ListEventSources" => self.list_event_sources(&req),
115            "PutPartnerEvents" => self.put_partner_events(&req),
116            "TestEventPattern" => self.test_event_pattern(&req),
117            "UpdateEventBus" => self.update_event_bus(&req),
118            "CreateEndpoint" => self.create_endpoint(&req),
119            "DeleteEndpoint" => self.delete_endpoint(&req),
120            "DescribeEndpoint" => self.describe_endpoint(&req),
121            "ListEndpoints" => self.list_endpoints(&req),
122            "UpdateEndpoint" => self.update_endpoint(&req),
123            "DeauthorizeConnection" => self.deauthorize_connection(&req),
124            _ => Err(AwsServiceError::action_not_implemented(
125                "events",
126                &req.action,
127            )),
128        }
129    }
130
131    fn supported_actions(&self) -> &[&str] {
132        &[
133            "CreateEventBus",
134            "DeleteEventBus",
135            "ListEventBuses",
136            "DescribeEventBus",
137            "PutRule",
138            "DeleteRule",
139            "ListRules",
140            "DescribeRule",
141            "EnableRule",
142            "DisableRule",
143            "PutTargets",
144            "RemoveTargets",
145            "ListTargetsByRule",
146            "ListRuleNamesByTarget",
147            "PutEvents",
148            "PutPermission",
149            "RemovePermission",
150            "TagResource",
151            "UntagResource",
152            "ListTagsForResource",
153            "CreateArchive",
154            "DescribeArchive",
155            "ListArchives",
156            "UpdateArchive",
157            "DeleteArchive",
158            "CreateConnection",
159            "DescribeConnection",
160            "ListConnections",
161            "UpdateConnection",
162            "DeleteConnection",
163            "CreateApiDestination",
164            "DescribeApiDestination",
165            "ListApiDestinations",
166            "UpdateApiDestination",
167            "DeleteApiDestination",
168            "StartReplay",
169            "DescribeReplay",
170            "ListReplays",
171            "CancelReplay",
172            "CreatePartnerEventSource",
173            "DeletePartnerEventSource",
174            "DescribePartnerEventSource",
175            "ListPartnerEventSources",
176            "ListPartnerEventSourceAccounts",
177            "ActivateEventSource",
178            "DeactivateEventSource",
179            "DescribeEventSource",
180            "ListEventSources",
181            "PutPartnerEvents",
182            "TestEventPattern",
183            "UpdateEventBus",
184            "CreateEndpoint",
185            "DeleteEndpoint",
186            "DescribeEndpoint",
187            "ListEndpoints",
188            "UpdateEndpoint",
189            "DeauthorizeConnection",
190        ]
191    }
192}
193
194fn parse_tags(body: &Value) -> HashMap<String, String> {
195    let mut tags = HashMap::new();
196    if let Some(arr) = body["Tags"].as_array() {
197        for tag in arr {
198            if let (Some(key), Some(val)) = (tag["Key"].as_str(), tag["Value"].as_str()) {
199                tags.insert(key.to_string(), val.to_string());
200            }
201        }
202    }
203    tags
204}
205
206fn parse_target(target: &Value) -> EventTarget {
207    EventTarget {
208        id: target["Id"].as_str().unwrap_or("").to_string(),
209        arn: target["Arn"].as_str().unwrap_or("").to_string(),
210        input: target["Input"].as_str().map(|s| s.to_string()),
211        input_path: target["InputPath"].as_str().map(|s| s.to_string()),
212        input_transformer: target.get("InputTransformer").cloned(),
213        sqs_parameters: target.get("SqsParameters").cloned(),
214    }
215}
216
217fn target_to_json(t: &EventTarget) -> Value {
218    let mut obj = json!({ "Id": t.id, "Arn": t.arn });
219    if let Some(ref input) = t.input {
220        obj["Input"] = json!(input);
221    }
222    if let Some(ref input_path) = t.input_path {
223        obj["InputPath"] = json!(input_path);
224    }
225    if let Some(ref it) = t.input_transformer {
226        obj["InputTransformer"] = it.clone();
227    }
228    if let Some(ref sp) = t.sqs_parameters {
229        obj["SqsParameters"] = sp.clone();
230    }
231    obj
232}
233
234// ─── Event Bus Operations ───────────────────────────────────────────
235impl EventBridgeService {
236    fn create_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
237        let body = req.json_body();
238        validate_required("Name", &body["Name"])?;
239        let name = body["Name"]
240            .as_str()
241            .ok_or_else(|| missing("Name"))?
242            .to_string();
243        validate_string_length("name", &name, 1, 256)?;
244        validate_optional_string_length(
245            "eventSourceName",
246            body["EventSourceName"].as_str(),
247            1,
248            256,
249        )?;
250        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
251        validate_optional_string_length(
252            "kmsKeyIdentifier",
253            body["KmsKeyIdentifier"].as_str(),
254            0,
255            2048,
256        )?;
257
258        // Validate name doesn't contain '/' (unless partner bus)
259        if name.contains('/') && !name.starts_with("aws.partner/") {
260            return Err(AwsServiceError::aws_error(
261                StatusCode::BAD_REQUEST,
262                "ValidationException",
263                "Event bus name must not contain '/'.",
264            ));
265        }
266
267        // Partner event bus validation
268        if name.starts_with("aws.partner/") {
269            let event_source = body["EventSourceName"].as_str().unwrap_or("");
270            let state_r = self.state.read();
271            let has_source = state_r.partner_event_sources.contains_key(event_source);
272            drop(state_r);
273            if !has_source {
274                return Err(AwsServiceError::aws_error(
275                    StatusCode::BAD_REQUEST,
276                    "ResourceNotFoundException",
277                    format!("Event source {event_source} does not exist."),
278                ));
279            }
280        }
281
282        let mut state = self.state.write();
283
284        if state.buses.contains_key(&name) {
285            return Err(AwsServiceError::aws_error(
286                StatusCode::BAD_REQUEST,
287                "ResourceAlreadyExistsException",
288                format!("Event bus {name} already exists."),
289            ));
290        }
291
292        let arn = format!(
293            "arn:aws:events:{}:{}:event-bus/{}",
294            req.region, state.account_id, name
295        );
296        let now = Utc::now();
297        let description = body["Description"].as_str().map(|s| s.to_string());
298        let kms_key_identifier = body["KmsKeyIdentifier"].as_str().map(|s| s.to_string());
299        let dead_letter_config = body.get("DeadLetterConfig").cloned();
300
301        let tags = parse_tags(&body);
302
303        let bus = EventBus {
304            name: name.clone(),
305            arn: arn.clone(),
306            tags,
307            policy: None,
308            description,
309            kms_key_identifier,
310            dead_letter_config,
311            creation_time: now,
312            last_modified_time: now,
313        };
314        state.buses.insert(name, bus);
315
316        Ok(AwsResponse::ok_json(json!({ "EventBusArn": arn })))
317    }
318
319    fn delete_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
320        let body = req.json_body();
321        validate_required("Name", &body["Name"])?;
322        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
323        validate_string_length("name", name, 1, 256)?;
324
325        if name == "default" {
326            return Err(AwsServiceError::aws_error(
327                StatusCode::BAD_REQUEST,
328                "ValidationException",
329                format!("Cannot delete event bus {name}."),
330            ));
331        }
332
333        let mut state = self.state.write();
334        state.buses.remove(name);
335        state.rules.retain(|k, _| k.0 != name);
336
337        Ok(AwsResponse::ok_json(json!({})))
338    }
339
340    fn list_event_buses(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
341        let body = req.json_body();
342        validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 256)?;
343        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
344        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
345        let name_prefix = body["NamePrefix"].as_str();
346        let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
347        if let Some(t) = body["NextToken"].as_str() {
348            t.parse::<usize>().map_err(|_| {
349                AwsServiceError::aws_error(
350                    StatusCode::BAD_REQUEST,
351                    "InvalidNextTokenException",
352                    format!("Invalid NextToken value: '{t}'"),
353                )
354            })?;
355        }
356
357        let state = self.state.read();
358        let filtered: Vec<&_> = state
359            .buses
360            .values()
361            .filter(|b| match name_prefix {
362                Some(prefix) => b.name.starts_with(prefix),
363                None => true,
364            })
365            .collect();
366
367        let (page, next_token) = paginate(&filtered, body["NextToken"].as_str(), limit);
368        let buses: Vec<Value> = page
369            .iter()
370            .map(|b| json!({ "Name": b.name, "Arn": b.arn }))
371            .collect();
372        let mut resp = json!({ "EventBuses": buses });
373        if let Some(token) = next_token {
374            resp["NextToken"] = json!(token);
375        }
376
377        Ok(AwsResponse::ok_json(resp))
378    }
379
380    fn describe_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
381        let body = req.json_body();
382        validate_optional_string_length("name", body["Name"].as_str(), 1, 1600)?;
383        let name = body["Name"].as_str().unwrap_or("default");
384
385        let state = self.state.read();
386        let bus = state.buses.get(name).ok_or_else(|| {
387            AwsServiceError::aws_error(
388                StatusCode::BAD_REQUEST,
389                "ResourceNotFoundException",
390                format!("Event bus {name} does not exist."),
391            )
392        })?;
393
394        let mut resp = json!({
395            "Name": bus.name,
396            "Arn": bus.arn,
397            "CreationTime": bus.creation_time.timestamp() as f64,
398            "LastModifiedTime": bus.last_modified_time.timestamp() as f64,
399        });
400
401        if let Some(ref policy) = bus.policy {
402            resp["Policy"] = Value::String(serde_json::to_string(policy).unwrap());
403        }
404        if let Some(ref desc) = bus.description {
405            resp["Description"] = json!(desc);
406        }
407        if let Some(ref kms) = bus.kms_key_identifier {
408            resp["KmsKeyIdentifier"] = json!(kms);
409        }
410        if let Some(ref dlc) = bus.dead_letter_config {
411            resp["DeadLetterConfig"] = dlc.clone();
412        }
413
414        Ok(AwsResponse::ok_json(resp))
415    }
416
417    // ─── Permission Operations ──────────────────────────────────────────
418
419    fn put_permission(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
420        let body = req.json_body();
421        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 256)?;
422        validate_optional_string_length("action", body["Action"].as_str(), 1, 64)?;
423        validate_optional_string_length("principal", body["Principal"].as_str(), 1, 12)?;
424        validate_optional_string_length("statementId", body["StatementId"].as_str(), 1, 64)?;
425        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
426
427        let mut state = self.state.write();
428
429        let bus = state.buses.get_mut(event_bus_name).ok_or_else(|| {
430            AwsServiceError::aws_error(
431                StatusCode::BAD_REQUEST,
432                "ResourceNotFoundException",
433                format!("Event bus {event_bus_name} does not exist."),
434            )
435        })?;
436
437        // Check if Policy is provided (new-style)
438        if let Some(policy_str) = body["Policy"].as_str() {
439            if let Ok(policy) = serde_json::from_str::<Value>(policy_str) {
440                bus.policy = Some(policy);
441                return Ok(AwsResponse::ok_json(json!({})));
442            }
443        }
444
445        // Old-style: Action, Principal, StatementId
446        let action = body["Action"].as_str().unwrap_or("");
447        let principal = body["Principal"].as_str().unwrap_or("");
448        let statement_id = body["StatementId"].as_str().unwrap_or("");
449
450        // Validate action
451        if action != "events:PutEvents" {
452            return Err(AwsServiceError::aws_error(
453                StatusCode::BAD_REQUEST,
454                "ValidationException",
455                "Provided value in parameter 'action' is not supported.",
456            ));
457        }
458
459        let statement = json!({
460            "Sid": statement_id,
461            "Effect": "Allow",
462            "Principal": { "AWS": Arn::global("iam", principal, "root").to_string() },
463            "Action": action,
464            "Resource": bus.arn,
465        });
466
467        let policy = bus.policy.get_or_insert_with(|| {
468            json!({
469                "Version": "2012-10-17",
470                "Statement": [],
471            })
472        });
473
474        if let Some(stmts) = policy["Statement"].as_array_mut() {
475            stmts.push(statement);
476        }
477
478        Ok(AwsResponse::ok_json(json!({})))
479    }
480
481    fn remove_permission(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
482        let body = req.json_body();
483        validate_optional_string_length("statementId", body["StatementId"].as_str(), 1, 64)?;
484        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 256)?;
485        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
486        let statement_id = body["StatementId"].as_str().unwrap_or("");
487        let remove_all = body["RemoveAllPermissions"].as_bool().unwrap_or(false);
488
489        let mut state = self.state.write();
490
491        let bus = state.buses.get_mut(event_bus_name).ok_or_else(|| {
492            AwsServiceError::aws_error(
493                StatusCode::BAD_REQUEST,
494                "ResourceNotFoundException",
495                format!("Event bus {event_bus_name} does not exist."),
496            )
497        })?;
498
499        if remove_all {
500            bus.policy = None;
501            return Ok(AwsResponse::ok_json(json!({})));
502        }
503
504        let policy = bus.policy.as_mut().ok_or_else(|| {
505            AwsServiceError::aws_error(
506                StatusCode::BAD_REQUEST,
507                "ResourceNotFoundException",
508                "EventBus does not have a policy.",
509            )
510        })?;
511
512        if let Some(stmts) = policy["Statement"].as_array_mut() {
513            let before = stmts.len();
514            stmts.retain(|s| s["Sid"].as_str() != Some(statement_id));
515            if stmts.len() == before {
516                return Err(AwsServiceError::aws_error(
517                    StatusCode::BAD_REQUEST,
518                    "ResourceNotFoundException",
519                    "Statement with the provided id does not exist.",
520                ));
521            }
522            if stmts.is_empty() {
523                bus.policy = None;
524            }
525        }
526
527        Ok(AwsResponse::ok_json(json!({})))
528    }
529
530    // ─── Rule Operations ────────────────────────────────────────────────
531
532    fn put_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
533        let body = req.json_body();
534        validate_required("Name", &body["Name"])?;
535        let name = body["Name"]
536            .as_str()
537            .ok_or_else(|| missing("Name"))?
538            .to_string();
539        validate_string_length("name", &name, 1, 64)?;
540        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
541        validate_optional_string_length(
542            "scheduleExpression",
543            body["ScheduleExpression"].as_str(),
544            0,
545            256,
546        )?;
547        validate_optional_string_length("eventPattern", body["EventPattern"].as_str(), 0, 4096)?;
548        validate_optional_enum(
549            "state",
550            body["State"].as_str(),
551            &[
552                "ENABLED",
553                "DISABLED",
554                "ENABLED_WITH_ALL_CLOUDTRAIL_MANAGEMENT_EVENTS",
555            ],
556        )?;
557        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
558        validate_optional_string_length("roleArn", body["RoleArn"].as_str(), 1, 1600)?;
559
560        let raw_bus = body["EventBusName"]
561            .as_str()
562            .unwrap_or("default")
563            .to_string();
564
565        let mut state = self.state.write();
566        let event_bus_name = state.resolve_bus_name(&raw_bus);
567
568        let event_pattern = body["EventPattern"].as_str().and_then(|s| {
569            if s.is_empty() {
570                None
571            } else {
572                Some(s.to_string())
573            }
574        });
575        let schedule_expression = body["ScheduleExpression"].as_str().and_then(|s| {
576            if s.is_empty() {
577                None
578            } else {
579                Some(s.to_string())
580            }
581        });
582        let description = body["Description"].as_str().map(|s| s.to_string());
583        let role_arn = body["RoleArn"].as_str().map(|s| s.to_string());
584        let rule_state = body["State"].as_str().unwrap_or("ENABLED").to_string();
585
586        // Validate: schedule expressions only on default bus
587        if schedule_expression.is_some() && event_bus_name != "default" {
588            return Err(AwsServiceError::aws_error(
589                StatusCode::BAD_REQUEST,
590                "ValidationException",
591                "ScheduleExpression is supported only on the default event bus.",
592            ));
593        }
594
595        if !state.buses.contains_key(&event_bus_name) {
596            return Err(AwsServiceError::aws_error(
597                StatusCode::BAD_REQUEST,
598                "ResourceNotFoundException",
599                format!("Event bus {event_bus_name} does not exist."),
600            ));
601        }
602
603        let arn = if event_bus_name == "default" {
604            format!(
605                "arn:aws:events:{}:{}:rule/{}",
606                req.region, state.account_id, name
607            )
608        } else {
609            format!(
610                "arn:aws:events:{}:{}:rule/{}/{}",
611                req.region, state.account_id, event_bus_name, name
612            )
613        };
614
615        let key = (event_bus_name.clone(), name.clone());
616        let targets = state
617            .rules
618            .get(&key)
619            .map(|r| r.targets.clone())
620            .unwrap_or_default();
621
622        let tags = parse_tags(&body);
623
624        let rule = EventRule {
625            name: name.clone(),
626            arn: arn.clone(),
627            event_bus_name,
628            event_pattern,
629            schedule_expression,
630            state: rule_state,
631            description,
632            role_arn,
633            managed_by: None,
634            created_by: None,
635            targets,
636            tags,
637            last_fired: None,
638        };
639
640        state.rules.insert(key, rule);
641        Ok(AwsResponse::ok_json(json!({ "RuleArn": arn })))
642    }
643
644    fn delete_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
645        let body = req.json_body();
646        validate_required("Name", &body["Name"])?;
647        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
648        validate_string_length("name", name, 1, 64)?;
649        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
650        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
651
652        let mut state = self.state.write();
653        let bus_name = state.resolve_bus_name(event_bus_name);
654        let key = (bus_name, name.to_string());
655
656        // Check if rule has targets
657        if let Some(rule) = state.rules.get(&key) {
658            if !rule.targets.is_empty() {
659                return Err(AwsServiceError::aws_error(
660                    StatusCode::BAD_REQUEST,
661                    "ValidationException",
662                    "Rule can't be deleted since it has targets.",
663                ));
664            }
665        }
666
667        state.rules.remove(&key);
668        Ok(AwsResponse::ok_json(json!({})))
669    }
670
671    fn list_rules(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
672        let body = req.json_body();
673        validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 64)?;
674        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
675        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
676        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
677        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
678        let name_prefix = body["NamePrefix"].as_str();
679        let limit = body["Limit"].as_u64().map(|n| n as usize);
680        let next_token = body["NextToken"].as_str();
681
682        let state = self.state.read();
683        let bus_name = state.resolve_bus_name(event_bus_name);
684
685        let mut rules: Vec<&EventRule> = state
686            .rules
687            .values()
688            .filter(|r| r.event_bus_name == bus_name)
689            .filter(|r| match name_prefix {
690                Some(prefix) => r.name.starts_with(prefix),
691                None => true,
692            })
693            .collect();
694        rules.sort_by(|a, b| a.name.cmp(&b.name));
695
696        // Pagination
697        let start = next_token
698            .and_then(|t| t.parse::<usize>().ok())
699            .unwrap_or(0)
700            .min(rules.len());
701        let rules_slice = &rules[start..];
702
703        let (page, new_next_token) = if let Some(lim) = limit {
704            if rules_slice.len() > lim {
705                (&rules_slice[..lim], Some((start + lim).to_string()))
706            } else {
707                (rules_slice, None)
708            }
709        } else {
710            (rules_slice, None)
711        };
712
713        let rules_json: Vec<Value> = page
714            .iter()
715            .map(|r| {
716                let mut obj = json!({
717                    "Name": r.name,
718                    "Arn": r.arn,
719                    "EventBusName": r.event_bus_name,
720                    "State": r.state,
721                });
722                if let Some(ref desc) = r.description {
723                    obj["Description"] = json!(desc);
724                }
725                if let Some(ref ep) = r.event_pattern {
726                    obj["EventPattern"] = json!(ep);
727                }
728                if let Some(ref se) = r.schedule_expression {
729                    obj["ScheduleExpression"] = json!(se);
730                }
731                if let Some(ref mb) = r.managed_by {
732                    obj["ManagedBy"] = json!(mb);
733                }
734                obj
735            })
736            .collect();
737
738        let mut resp = json!({ "Rules": rules_json });
739        if let Some(token) = new_next_token {
740            resp["NextToken"] = json!(token);
741        }
742
743        Ok(AwsResponse::ok_json(resp))
744    }
745
746    fn describe_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
747        let body = req.json_body();
748        validate_required("Name", &body["Name"])?;
749        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
750        validate_string_length("name", name, 1, 64)?;
751        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
752        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
753
754        let state = self.state.read();
755        let bus_name = state.resolve_bus_name(event_bus_name);
756        let key = (bus_name.clone(), name.to_string());
757
758        let rule = state.rules.get(&key).ok_or_else(|| {
759            AwsServiceError::aws_error(
760                StatusCode::BAD_REQUEST,
761                "ResourceNotFoundException",
762                format!("Rule {name} does not exist."),
763            )
764        })?;
765
766        let mut resp = json!({
767            "Name": rule.name,
768            "Arn": rule.arn,
769            "EventBusName": rule.event_bus_name,
770            "State": rule.state,
771        });
772
773        if let Some(ref desc) = rule.description {
774            resp["Description"] = json!(desc);
775        }
776        if let Some(ref ep) = rule.event_pattern {
777            resp["EventPattern"] = json!(ep);
778        }
779        if let Some(ref se) = rule.schedule_expression {
780            resp["ScheduleExpression"] = json!(se);
781        }
782        if let Some(ref role) = rule.role_arn {
783            resp["RoleArn"] = json!(role);
784        }
785        if let Some(ref mb) = rule.managed_by {
786            resp["ManagedBy"] = json!(mb);
787        }
788        if let Some(ref cb) = rule.created_by {
789            resp["CreatedBy"] = json!(cb);
790        }
791        // If non-default bus, set CreatedBy to account_id
792        if rule.event_bus_name != "default" && rule.created_by.is_none() {
793            resp["CreatedBy"] = json!(state.account_id);
794        }
795
796        Ok(AwsResponse::ok_json(resp))
797    }
798
799    fn enable_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
800        let body = req.json_body();
801        validate_required("Name", &body["Name"])?;
802        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
803        validate_string_length("name", name, 1, 64)?;
804        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
805        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
806
807        let mut state = self.state.write();
808        let bus_name = state.resolve_bus_name(event_bus_name);
809        let key = (bus_name, name.to_string());
810
811        let rule = state.rules.get_mut(&key).ok_or_else(|| {
812            AwsServiceError::aws_error(
813                StatusCode::BAD_REQUEST,
814                "ResourceNotFoundException",
815                format!("Rule {name} does not exist."),
816            )
817        })?;
818
819        rule.state = "ENABLED".to_string();
820        Ok(AwsResponse::ok_json(json!({})))
821    }
822
823    fn disable_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
824        let body = req.json_body();
825        validate_required("Name", &body["Name"])?;
826        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
827        validate_string_length("name", name, 1, 64)?;
828        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
829        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
830
831        let mut state = self.state.write();
832        let bus_name = state.resolve_bus_name(event_bus_name);
833        let key = (bus_name, name.to_string());
834
835        let rule = state.rules.get_mut(&key).ok_or_else(|| {
836            AwsServiceError::aws_error(
837                StatusCode::BAD_REQUEST,
838                "ResourceNotFoundException",
839                format!("Rule {name} does not exist."),
840            )
841        })?;
842
843        rule.state = "DISABLED".to_string();
844        Ok(AwsResponse::ok_json(json!({})))
845    }
846
847    // ─── Target Operations ──────────────────────────────────────────────
848
849    fn put_targets(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
850        let body = req.json_body();
851        validate_required("Rule", &body["Rule"])?;
852        let rule_name = body["Rule"].as_str().ok_or_else(|| missing("Rule"))?;
853        validate_string_length("rule", rule_name, 1, 64)?;
854        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
855        validate_required("Targets", &body["Targets"])?;
856        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
857        let targets = body["Targets"]
858            .as_array()
859            .ok_or_else(|| missing("Targets"))?;
860
861        // Validate targets - check for FIFO SQS without SqsParameters
862        for target in targets {
863            let target_id = target["Id"].as_str().unwrap_or("");
864            let target_arn = target["Arn"].as_str().unwrap_or("");
865
866            if target_arn.ends_with(".fifo") && target.get("SqsParameters").is_none() {
867                return Err(AwsServiceError::aws_error(
868                    StatusCode::BAD_REQUEST,
869                    "ValidationException",
870                    format!(
871                        "Parameter(s) SqsParameters must be specified for target: {target_id}."
872                    ),
873                ));
874            }
875
876            // Validate ARN format
877            if !target_arn.starts_with("arn:") {
878                return Err(AwsServiceError::aws_error(
879                    StatusCode::BAD_REQUEST,
880                    "ValidationException",
881                    format!(
882                        "Parameter {target_arn} is not valid. Reason: Provided Arn is not in correct format."
883                    ),
884                ));
885            }
886        }
887
888        let mut state = self.state.write();
889        let bus_name = state.resolve_bus_name(event_bus_name);
890        let key = (bus_name.clone(), rule_name.to_string());
891
892        let rule = state.rules.get_mut(&key).ok_or_else(|| {
893            AwsServiceError::aws_error(
894                StatusCode::BAD_REQUEST,
895                "ResourceNotFoundException",
896                format!("Rule {rule_name} does not exist on EventBus {bus_name}."),
897            )
898        })?;
899
900        for target in targets {
901            let et = parse_target(target);
902            // Remove existing target with same ID
903            rule.targets.retain(|t| t.id != et.id);
904            rule.targets.push(et);
905        }
906
907        Ok(AwsResponse::ok_json(json!({
908            "FailedEntryCount": 0,
909            "FailedEntries": [],
910        })))
911    }
912
913    fn remove_targets(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
914        let body = req.json_body();
915        validate_required("Rule", &body["Rule"])?;
916        let rule_name = body["Rule"].as_str().ok_or_else(|| missing("Rule"))?;
917        validate_string_length("rule", rule_name, 1, 64)?;
918        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
919        validate_required("Ids", &body["Ids"])?;
920        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
921        let ids = body["Ids"].as_array().ok_or_else(|| missing("Ids"))?;
922
923        let target_ids: Vec<String> = ids
924            .iter()
925            .filter_map(|v| v.as_str().map(|s| s.to_string()))
926            .collect();
927
928        let mut state = self.state.write();
929        let bus_name = state.resolve_bus_name(event_bus_name);
930        let key = (bus_name.clone(), rule_name.to_string());
931
932        let rule = state.rules.get_mut(&key).ok_or_else(|| {
933            AwsServiceError::aws_error(
934                StatusCode::BAD_REQUEST,
935                "ResourceNotFoundException",
936                format!("Rule {rule_name} does not exist on EventBus {bus_name}."),
937            )
938        })?;
939
940        rule.targets.retain(|t| !target_ids.contains(&t.id));
941
942        Ok(AwsResponse::ok_json(json!({
943            "FailedEntryCount": 0,
944            "FailedEntries": [],
945        })))
946    }
947
948    fn list_targets_by_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
949        let body = req.json_body();
950        validate_required("Rule", &body["Rule"])?;
951        let rule_name = body["Rule"].as_str().ok_or_else(|| missing("Rule"))?;
952        validate_string_length("rule", rule_name, 1, 64)?;
953        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
954        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
955        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
956        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
957        let limit = body["Limit"].as_u64().map(|n| n as usize);
958        let next_token = body["NextToken"].as_str();
959
960        let state = self.state.read();
961        let bus_name = state.resolve_bus_name(event_bus_name);
962        let key = (bus_name, rule_name.to_string());
963
964        let rule = state.rules.get(&key).ok_or_else(|| {
965            AwsServiceError::aws_error(
966                StatusCode::BAD_REQUEST,
967                "ResourceNotFoundException",
968                format!("Rule {rule_name} does not exist."),
969            )
970        })?;
971
972        let all_targets = &rule.targets;
973        let start = next_token
974            .and_then(|t| t.parse::<usize>().ok())
975            .unwrap_or(0)
976            .min(all_targets.len());
977        let slice = &all_targets[start..];
978
979        let (page, new_next_token) = if let Some(lim) = limit {
980            if slice.len() > lim {
981                (&slice[..lim], Some((start + lim).to_string()))
982            } else {
983                (slice, None)
984            }
985        } else {
986            (slice, None)
987        };
988
989        let targets: Vec<Value> = page.iter().map(target_to_json).collect();
990
991        let mut resp = json!({ "Targets": targets });
992        if let Some(token) = new_next_token {
993            resp["NextToken"] = json!(token);
994        }
995
996        Ok(AwsResponse::ok_json(resp))
997    }
998
999    fn list_rule_names_by_target(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1000        let body = req.json_body();
1001        validate_required("TargetArn", &body["TargetArn"])?;
1002        let target_arn = body["TargetArn"]
1003            .as_str()
1004            .ok_or_else(|| missing("TargetArn"))?;
1005        validate_string_length("targetArn", target_arn, 1, 1600)?;
1006        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
1007        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1008        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1009        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
1010        let limit = body["Limit"].as_u64().map(|n| n as usize);
1011        let next_token = body["NextToken"].as_str();
1012
1013        let state = self.state.read();
1014        let bus_name = state.resolve_bus_name(event_bus_name);
1015
1016        // Deduplicate rule names
1017        let mut rule_names: Vec<String> = Vec::new();
1018        for rule in state.rules.values() {
1019            if rule.event_bus_name == bus_name
1020                && rule.targets.iter().any(|t| t.arn == target_arn)
1021                && !rule_names.contains(&rule.name)
1022            {
1023                rule_names.push(rule.name.clone());
1024            }
1025        }
1026        rule_names.sort();
1027
1028        let start = next_token
1029            .and_then(|t| t.parse::<usize>().ok())
1030            .unwrap_or(0)
1031            .min(rule_names.len());
1032        let slice = &rule_names[start..];
1033
1034        let (page, new_next_token) = if let Some(lim) = limit {
1035            if slice.len() > lim {
1036                (&slice[..lim], Some((start + lim).to_string()))
1037            } else {
1038                (slice, None)
1039            }
1040        } else {
1041            (slice, None)
1042        };
1043
1044        let mut resp = json!({ "RuleNames": page });
1045        if let Some(token) = new_next_token {
1046            resp["NextToken"] = json!(token);
1047        }
1048
1049        Ok(AwsResponse::ok_json(resp))
1050    }
1051
1052    // ─── Partner Event Sources ────────────���───────────────────────────
1053
1054    fn create_partner_event_source(
1055        &self,
1056        req: &AwsRequest,
1057    ) -> Result<AwsResponse, AwsServiceError> {
1058        let body = req.json_body();
1059        validate_required("Name", &body["Name"])?;
1060        let name = body["Name"]
1061            .as_str()
1062            .ok_or_else(|| missing("Name"))?
1063            .to_string();
1064        validate_string_length("name", &name, 1, 256)?;
1065        validate_required("Account", &body["Account"])?;
1066        let account = body["Account"]
1067            .as_str()
1068            .ok_or_else(|| missing("Account"))?
1069            .to_string();
1070        validate_string_length("account", &account, 12, 12)?;
1071
1072        let mut state = self.state.write();
1073        if state.partner_event_sources.contains_key(&name) {
1074            return Err(AwsServiceError::aws_error(
1075                StatusCode::CONFLICT,
1076                "ResourceAlreadyExistsException",
1077                format!("Partner event source {name} already exists."),
1078            ));
1079        }
1080        let arn = format!(
1081            "arn:aws:events:{}::event-source/aws.partner/{}",
1082            state.region, name
1083        );
1084        let now = Utc::now();
1085        let ps = PartnerEventSource {
1086            name: name.clone(),
1087            arn: arn.clone(),
1088            account,
1089            creation_time: now,
1090            expiration_time: None,
1091            state: "ACTIVE".to_string(),
1092        };
1093        state.partner_event_sources.insert(name.clone(), ps);
1094
1095        Ok(AwsResponse::ok_json(json!({ "EventSourceArn": arn })))
1096    }
1097
1098    fn delete_partner_event_source(
1099        &self,
1100        req: &AwsRequest,
1101    ) -> Result<AwsResponse, AwsServiceError> {
1102        let body = req.json_body();
1103        validate_required("Name", &body["Name"])?;
1104        let name = body["Name"]
1105            .as_str()
1106            .ok_or_else(|| missing("Name"))?
1107            .to_string();
1108        validate_required("Account", &body["Account"])?;
1109        let account = body["Account"]
1110            .as_str()
1111            .ok_or_else(|| missing("Account"))?
1112            .to_string();
1113
1114        let mut state = self.state.write();
1115        match state.partner_event_sources.get(&name) {
1116            Some(ps) if ps.account == account => {
1117                state.partner_event_sources.remove(&name);
1118            }
1119            Some(_) => {
1120                return Err(AwsServiceError::aws_error(
1121                    StatusCode::NOT_FOUND,
1122                    "ResourceNotFoundException",
1123                    format!("Partner event source {name} does not exist for account {account}."),
1124                ));
1125            }
1126            None => {
1127                return Err(AwsServiceError::aws_error(
1128                    StatusCode::NOT_FOUND,
1129                    "ResourceNotFoundException",
1130                    format!("Partner event source {name} does not exist."),
1131                ));
1132            }
1133        }
1134
1135        Ok(AwsResponse::ok_json(json!({})))
1136    }
1137
1138    fn describe_partner_event_source(
1139        &self,
1140        req: &AwsRequest,
1141    ) -> Result<AwsResponse, AwsServiceError> {
1142        let body = req.json_body();
1143        validate_required("Name", &body["Name"])?;
1144        let name = body["Name"]
1145            .as_str()
1146            .ok_or_else(|| missing("Name"))?
1147            .to_string();
1148        validate_string_length("name", &name, 1, 256)?;
1149
1150        let state = self.state.read();
1151        let ps = state.partner_event_sources.get(&name).ok_or_else(|| {
1152            AwsServiceError::aws_error(
1153                StatusCode::NOT_FOUND,
1154                "ResourceNotFoundException",
1155                format!("Partner event source {name} does not exist."),
1156            )
1157        })?;
1158
1159        Ok(AwsResponse::ok_json(json!({
1160            "Arn": ps.arn,
1161            "Name": ps.name,
1162        })))
1163    }
1164
1165    fn list_partner_event_sources(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1166        let body = req.json_body();
1167        validate_required("namePrefix", &body["NamePrefix"])?;
1168        let name_prefix = body["NamePrefix"]
1169            .as_str()
1170            .ok_or_else(|| missing("NamePrefix"))?;
1171        validate_string_length("namePrefix", name_prefix, 1, 256)?;
1172        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1173        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1174        let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
1175
1176        let state = self.state.read();
1177        let all: Vec<Value> = state
1178            .partner_event_sources
1179            .values()
1180            .filter(|ps| ps.name.starts_with(name_prefix))
1181            .map(|ps| {
1182                json!({
1183                    "Arn": ps.arn,
1184                    "Name": ps.name,
1185                })
1186            })
1187            .collect();
1188
1189        let (sources, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
1190        let mut resp = json!({ "PartnerEventSources": sources });
1191        if let Some(token) = next_token {
1192            resp["NextToken"] = json!(token);
1193        }
1194
1195        Ok(AwsResponse::ok_json(resp))
1196    }
1197
1198    fn list_partner_event_source_accounts(
1199        &self,
1200        req: &AwsRequest,
1201    ) -> Result<AwsResponse, AwsServiceError> {
1202        let body = req.json_body();
1203        validate_required("EventSourceName", &body["EventSourceName"])?;
1204        let event_source_name = body["EventSourceName"]
1205            .as_str()
1206            .ok_or_else(|| missing("EventSourceName"))?;
1207        validate_string_length("eventSourceName", event_source_name, 1, 256)?;
1208        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1209        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1210
1211        let state = self.state.read();
1212        let accounts: Vec<Value> = state
1213            .partner_event_sources
1214            .values()
1215            .filter(|ps| ps.name == event_source_name)
1216            .map(|ps| json!({ "Account": ps.account }))
1217            .collect();
1218
1219        Ok(AwsResponse::ok_json(json!({
1220            "PartnerEventSourceAccounts": accounts
1221        })))
1222    }
1223
1224    fn activate_event_source(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1225        let body = req.json_body();
1226        validate_required("Name", &body["Name"])?;
1227        let name = body["Name"]
1228            .as_str()
1229            .ok_or_else(|| missing("Name"))?
1230            .to_string();
1231
1232        let mut state = self.state.write();
1233        let ps = state.partner_event_sources.get_mut(&name).ok_or_else(|| {
1234            AwsServiceError::aws_error(
1235                StatusCode::NOT_FOUND,
1236                "ResourceNotFoundException",
1237                format!("Event source {name} does not exist."),
1238            )
1239        })?;
1240        ps.state = "ACTIVE".to_string();
1241
1242        Ok(AwsResponse::ok_json(json!({})))
1243    }
1244
1245    fn deactivate_event_source(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1246        let body = req.json_body();
1247        validate_required("Name", &body["Name"])?;
1248        let name = body["Name"]
1249            .as_str()
1250            .ok_or_else(|| missing("Name"))?
1251            .to_string();
1252
1253        let mut state = self.state.write();
1254        let ps = state.partner_event_sources.get_mut(&name).ok_or_else(|| {
1255            AwsServiceError::aws_error(
1256                StatusCode::NOT_FOUND,
1257                "ResourceNotFoundException",
1258                format!("Event source {name} does not exist."),
1259            )
1260        })?;
1261        ps.state = "INACTIVE".to_string();
1262
1263        Ok(AwsResponse::ok_json(json!({})))
1264    }
1265
1266    fn describe_event_source(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1267        let body = req.json_body();
1268        validate_required("Name", &body["Name"])?;
1269        let name = body["Name"]
1270            .as_str()
1271            .ok_or_else(|| missing("Name"))?
1272            .to_string();
1273
1274        let state = self.state.read();
1275        let ps = state.partner_event_sources.get(&name).ok_or_else(|| {
1276            AwsServiceError::aws_error(
1277                StatusCode::NOT_FOUND,
1278                "ResourceNotFoundException",
1279                format!("Event source {name} does not exist."),
1280            )
1281        })?;
1282
1283        Ok(AwsResponse::ok_json(json!({
1284            "Arn": ps.arn,
1285            "Name": ps.name,
1286            "CreatedBy": ps.account,
1287            "CreationTime": ps.creation_time.timestamp() as f64,
1288            "State": ps.state,
1289        })))
1290    }
1291
1292    fn list_event_sources(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1293        let body = req.json_body();
1294        validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 256)?;
1295        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1296        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1297        let name_prefix = body["NamePrefix"].as_str();
1298        let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
1299
1300        let state = self.state.read();
1301        let all: Vec<Value> = state
1302            .partner_event_sources
1303            .values()
1304            .filter(|ps| match name_prefix {
1305                Some(prefix) => ps.name.starts_with(prefix),
1306                None => true,
1307            })
1308            .map(|ps| {
1309                json!({
1310                    "Arn": ps.arn,
1311                    "Name": ps.name,
1312                    "CreatedBy": ps.account,
1313                    "CreationTime": ps.creation_time.timestamp() as f64,
1314                    "State": ps.state,
1315                })
1316            })
1317            .collect();
1318
1319        let (sources, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
1320        let mut resp = json!({ "EventSources": sources });
1321        if let Some(token) = next_token {
1322            resp["NextToken"] = json!(token);
1323        }
1324
1325        Ok(AwsResponse::ok_json(resp))
1326    }
1327
1328    fn put_partner_events(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1329        let body = req.json_body();
1330        validate_required("Entries", &body["Entries"])?;
1331        let entries = body["Entries"]
1332            .as_array()
1333            .ok_or_else(|| missing("Entries"))?;
1334
1335        let mut result_entries = Vec::new();
1336        for _entry in entries {
1337            let event_id = uuid::Uuid::new_v4().to_string();
1338            result_entries.push(json!({ "EventId": event_id }));
1339        }
1340
1341        Ok(AwsResponse::ok_json(json!({
1342            "FailedEntryCount": 0,
1343            "Entries": result_entries,
1344        })))
1345    }
1346
1347    // ─── TestEventPattern ────────────────────────────────────────────────
1348
1349    fn test_event_pattern(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1350        let body = req.json_body();
1351        validate_required("EventPattern", &body["EventPattern"])?;
1352        validate_required("Event", &body["Event"])?;
1353        let event_pattern = body["EventPattern"]
1354            .as_str()
1355            .ok_or_else(|| missing("EventPattern"))?;
1356        let event_str = body["Event"].as_str().ok_or_else(|| missing("Event"))?;
1357
1358        // Parse the event JSON
1359        let event: Value = serde_json::from_str(event_str).map_err(|_| {
1360            AwsServiceError::aws_error(
1361                StatusCode::BAD_REQUEST,
1362                "InvalidEventPatternException",
1363                "Event is not valid JSON.",
1364            )
1365        })?;
1366
1367        // Parse the pattern JSON
1368        let _pattern: Value = serde_json::from_str(event_pattern).map_err(|_| {
1369            AwsServiceError::aws_error(
1370                StatusCode::BAD_REQUEST,
1371                "InvalidEventPatternException",
1372                "Event pattern is not valid JSON.",
1373            )
1374        })?;
1375
1376        let source = event["source"].as_str().unwrap_or("");
1377        let detail_type = event["detail-type"].as_str().unwrap_or("");
1378        let detail = event
1379            .get("detail")
1380            .map(|v| serde_json::to_string(v).unwrap_or_default())
1381            .unwrap_or_else(|| "{}".to_string());
1382        let account = event["account"].as_str().unwrap_or("");
1383        let region = event["region"].as_str().unwrap_or("");
1384        let resources: Vec<String> = event["resources"]
1385            .as_array()
1386            .map(|arr| {
1387                arr.iter()
1388                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
1389                    .collect()
1390            })
1391            .unwrap_or_default();
1392
1393        let result = matches_pattern(
1394            Some(event_pattern),
1395            source,
1396            detail_type,
1397            &detail,
1398            account,
1399            region,
1400            &resources,
1401        );
1402
1403        Ok(AwsResponse::ok_json(json!({ "Result": result })))
1404    }
1405
1406    // ─── UpdateEventBus ─────────────────────────────────────────────────
1407
1408    fn update_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1409        let body = req.json_body();
1410        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
1411        validate_optional_string_length(
1412            "kmsKeyIdentifier",
1413            body["KmsKeyIdentifier"].as_str(),
1414            0,
1415            2048,
1416        )?;
1417        let name = body["Name"].as_str().unwrap_or("default");
1418
1419        let mut state = self.state.write();
1420        let bus = state.buses.get_mut(name).ok_or_else(|| {
1421            AwsServiceError::aws_error(
1422                StatusCode::BAD_REQUEST,
1423                "ResourceNotFoundException",
1424                format!("Event bus {name} does not exist."),
1425            )
1426        })?;
1427
1428        if let Some(desc) = body["Description"].as_str() {
1429            bus.description = Some(desc.to_string());
1430        }
1431        if let Some(kms) = body["KmsKeyIdentifier"].as_str() {
1432            bus.kms_key_identifier = Some(kms.to_string());
1433        }
1434        if let Some(dlc) = body.get("DeadLetterConfig") {
1435            bus.dead_letter_config = Some(dlc.clone());
1436        }
1437        bus.last_modified_time = Utc::now();
1438
1439        let arn = bus.arn.clone();
1440        let bus_name = bus.name.clone();
1441
1442        Ok(AwsResponse::ok_json(json!({
1443            "Arn": arn,
1444            "Name": bus_name,
1445        })))
1446    }
1447
1448    // ─── Endpoint Operations ────────────────────────────────────────────
1449
1450    fn create_endpoint(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1451        let body = req.json_body();
1452        validate_required("Name", &body["Name"])?;
1453        let name = body["Name"]
1454            .as_str()
1455            .ok_or_else(|| missing("Name"))?
1456            .to_string();
1457        validate_string_length("name", &name, 1, 64)?;
1458        validate_required("RoutingConfig", &body["RoutingConfig"])?;
1459        validate_required("EventBuses", &body["EventBuses"])?;
1460
1461        let description = body["Description"].as_str().map(|s| s.to_string());
1462        let routing_config = body["RoutingConfig"].clone();
1463        let replication_config = body.get("ReplicationConfig").cloned();
1464        let event_buses = body["EventBuses"].as_array().cloned().unwrap_or_default();
1465        let role_arn = body["RoleArn"].as_str().map(|s| s.to_string());
1466
1467        let mut state = self.state.write();
1468        if state.endpoints.contains_key(&name) {
1469            return Err(AwsServiceError::aws_error(
1470                StatusCode::CONFLICT,
1471                "ResourceAlreadyExistsException",
1472                format!("Endpoint {name} already exists."),
1473            ));
1474        }
1475
1476        let endpoint_id = format!("{}.abc123", name);
1477        let arn = format!(
1478            "arn:aws:events:{}:{}:endpoint/{}",
1479            req.region, state.account_id, name
1480        );
1481        let endpoint_url = format!(
1482            "https://{}.endpoint.events.{}.amazonaws.com",
1483            endpoint_id, req.region
1484        );
1485        let now = Utc::now();
1486
1487        let endpoint = Endpoint {
1488            name: name.clone(),
1489            arn: arn.clone(),
1490            endpoint_id: endpoint_id.clone(),
1491            endpoint_url: Some(endpoint_url),
1492            description,
1493            routing_config: routing_config.clone(),
1494            replication_config: replication_config.clone(),
1495            event_buses: event_buses.clone(),
1496            role_arn: role_arn.clone(),
1497            state: "ACTIVE".to_string(),
1498            creation_time: now,
1499            last_modified_time: now,
1500        };
1501        state.endpoints.insert(name.clone(), endpoint);
1502
1503        let mut resp = json!({
1504            "Name": name,
1505            "Arn": arn,
1506            "State": "ACTIVE",
1507            "RoutingConfig": routing_config,
1508            "EventBuses": event_buses,
1509        });
1510        if let Some(ref rc) = replication_config {
1511            resp["ReplicationConfig"] = rc.clone();
1512        }
1513        if let Some(ref ra) = role_arn {
1514            resp["RoleArn"] = json!(ra);
1515        }
1516
1517        Ok(AwsResponse::ok_json(resp))
1518    }
1519
1520    fn delete_endpoint(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1521        let body = req.json_body();
1522        validate_required("Name", &body["Name"])?;
1523        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
1524
1525        let mut state = self.state.write();
1526        state.endpoints.remove(name).ok_or_else(|| {
1527            AwsServiceError::aws_error(
1528                StatusCode::BAD_REQUEST,
1529                "ResourceNotFoundException",
1530                format!("Endpoint '{name}' does not exist."),
1531            )
1532        })?;
1533
1534        Ok(AwsResponse::ok_json(json!({})))
1535    }
1536
1537    fn describe_endpoint(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1538        let body = req.json_body();
1539        validate_required("Name", &body["Name"])?;
1540        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
1541
1542        let state = self.state.read();
1543        let ep = state.endpoints.get(name).ok_or_else(|| {
1544            AwsServiceError::aws_error(
1545                StatusCode::BAD_REQUEST,
1546                "ResourceNotFoundException",
1547                format!("Endpoint '{name}' does not exist."),
1548            )
1549        })?;
1550
1551        let mut resp = json!({
1552            "Name": ep.name,
1553            "Arn": ep.arn,
1554            "EndpointId": ep.endpoint_id,
1555            "State": ep.state,
1556            "RoutingConfig": ep.routing_config,
1557            "EventBuses": ep.event_buses,
1558            "CreationTime": ep.creation_time.timestamp() as f64,
1559            "LastModifiedTime": ep.last_modified_time.timestamp() as f64,
1560        });
1561        if let Some(ref url) = ep.endpoint_url {
1562            resp["EndpointUrl"] = json!(url);
1563        }
1564        if let Some(ref desc) = ep.description {
1565            resp["Description"] = json!(desc);
1566        }
1567        if let Some(ref rc) = ep.replication_config {
1568            resp["ReplicationConfig"] = rc.clone();
1569        }
1570        if let Some(ref ra) = ep.role_arn {
1571            resp["RoleArn"] = json!(ra);
1572        }
1573
1574        Ok(AwsResponse::ok_json(resp))
1575    }
1576
1577    fn list_endpoints(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1578        let body = req.json_body();
1579        validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 64)?;
1580        validate_optional_string_length("homeRegion", body["HomeRegion"].as_str(), 9, 20)?;
1581        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1582        validate_optional_range_i64("maxResults", body["MaxResults"].as_i64(), 1, 100)?;
1583        let name_prefix = body["NamePrefix"].as_str();
1584        let limit = body["MaxResults"].as_i64().unwrap_or(100) as usize;
1585
1586        let state = self.state.read();
1587        let all: Vec<Value> = state
1588            .endpoints
1589            .values()
1590            .filter(|ep| match name_prefix {
1591                Some(prefix) => ep.name.starts_with(prefix),
1592                None => true,
1593            })
1594            .map(|ep| {
1595                let mut obj = json!({
1596                    "Name": ep.name,
1597                    "Arn": ep.arn,
1598                    "EndpointId": ep.endpoint_id,
1599                    "State": ep.state,
1600                    "RoutingConfig": ep.routing_config,
1601                    "EventBuses": ep.event_buses,
1602                    "CreationTime": ep.creation_time.timestamp() as f64,
1603                    "LastModifiedTime": ep.last_modified_time.timestamp() as f64,
1604                });
1605                if let Some(ref url) = ep.endpoint_url {
1606                    obj["EndpointUrl"] = json!(url);
1607                }
1608                obj
1609            })
1610            .collect();
1611
1612        let (endpoints, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
1613        let mut resp = json!({ "Endpoints": endpoints });
1614        if let Some(token) = next_token {
1615            resp["NextToken"] = json!(token);
1616        }
1617
1618        Ok(AwsResponse::ok_json(resp))
1619    }
1620
1621    fn update_endpoint(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1622        let body = req.json_body();
1623        validate_required("Name", &body["Name"])?;
1624        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
1625
1626        let mut state = self.state.write();
1627        let ep = state.endpoints.get_mut(name).ok_or_else(|| {
1628            AwsServiceError::aws_error(
1629                StatusCode::BAD_REQUEST,
1630                "ResourceNotFoundException",
1631                format!("Endpoint '{name}' does not exist."),
1632            )
1633        })?;
1634
1635        if let Some(desc) = body["Description"].as_str() {
1636            ep.description = Some(desc.to_string());
1637        }
1638        if !body["RoutingConfig"].is_null() {
1639            ep.routing_config = body["RoutingConfig"].clone();
1640        }
1641        if let Some(rc) = body.get("ReplicationConfig") {
1642            ep.replication_config = Some(rc.clone());
1643        }
1644        if let Some(buses) = body["EventBuses"].as_array() {
1645            ep.event_buses = buses.clone();
1646        }
1647        if let Some(ra) = body["RoleArn"].as_str() {
1648            ep.role_arn = Some(ra.to_string());
1649        }
1650        ep.last_modified_time = Utc::now();
1651
1652        let resp = json!({
1653            "Name": ep.name,
1654            "Arn": ep.arn,
1655            "EndpointId": ep.endpoint_id,
1656            "State": ep.state,
1657            "RoutingConfig": ep.routing_config,
1658            "EventBuses": ep.event_buses,
1659        });
1660
1661        Ok(AwsResponse::ok_json(resp))
1662    }
1663
1664    // ─── DeauthorizeConnection ──────────────────────────────────────────
1665
1666    fn deauthorize_connection(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1667        let body = req.json_body();
1668        validate_required("Name", &body["Name"])?;
1669        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
1670        validate_string_length("name", name, 1, 64)?;
1671
1672        let mut state = self.state.write();
1673        let conn = state.connections.get_mut(name).ok_or_else(|| {
1674            AwsServiceError::aws_error(
1675                StatusCode::BAD_REQUEST,
1676                "ResourceNotFoundException",
1677                format!("Connection '{name}' does not exist."),
1678            )
1679        })?;
1680
1681        conn.connection_state = "DEAUTHORIZING".to_string();
1682        conn.last_modified_time = Utc::now();
1683
1684        let resp = json!({
1685            "ConnectionArn": conn.arn,
1686            "ConnectionState": conn.connection_state,
1687            "CreationTime": conn.creation_time.timestamp() as f64,
1688            "LastModifiedTime": conn.last_modified_time.timestamp() as f64,
1689            "LastAuthorizedTime": conn.last_authorized_time.timestamp() as f64,
1690        });
1691
1692        Ok(AwsResponse::ok_json(resp))
1693    }
1694
1695    // ─── PutEvents ──────────────────────────────────────────────────────
1696
1697    fn put_events(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1698        let body = req.json_body();
1699        validate_required("Entries", &body["Entries"])?;
1700        validate_optional_string_length("endpointId", body["EndpointId"].as_str(), 1, 50)?;
1701        let entries = body["Entries"]
1702            .as_array()
1703            .ok_or_else(|| missing("Entries"))?;
1704
1705        // Validate entries count
1706        if entries.is_empty() {
1707            return Err(AwsServiceError::aws_error(
1708                StatusCode::BAD_REQUEST,
1709                "ValidationException",
1710                "1 validation error detected: Value '[PutEventsRequestEntry]' at 'entries' failed to satisfy constraint: Member must have length greater than or equal to 1",
1711            ));
1712        }
1713        if entries.len() > 10 {
1714            return Err(AwsServiceError::aws_error(
1715                StatusCode::BAD_REQUEST,
1716                "ValidationException",
1717                "1 validation error detected: Value '[PutEventsRequestEntry]' at 'entries' failed to satisfy constraint: Member must have length less than or equal to 10",
1718            ));
1719        }
1720
1721        let mut state = self.state.write();
1722        let mut result_entries = Vec::new();
1723        let mut events_to_deliver = Vec::new();
1724        let mut failed_count = 0;
1725
1726        for entry in entries {
1727            let source = entry["Source"].as_str().unwrap_or("").to_string();
1728            let detail_type = entry["DetailType"].as_str().unwrap_or("").to_string();
1729            let detail = entry["Detail"].as_str().unwrap_or("").to_string();
1730
1731            // Validate required fields
1732            if source.is_empty() {
1733                failed_count += 1;
1734                result_entries.push(json!({
1735                    "ErrorCode": "InvalidArgument",
1736                    "ErrorMessage": "Parameter Source is not valid. Reason: Source is a required argument.",
1737                }));
1738                continue;
1739            }
1740            if detail_type.is_empty() {
1741                failed_count += 1;
1742                result_entries.push(json!({
1743                    "ErrorCode": "InvalidArgument",
1744                    "ErrorMessage": "Parameter DetailType is not valid. Reason: DetailType is a required argument.",
1745                }));
1746                continue;
1747            }
1748            if detail.is_empty() {
1749                failed_count += 1;
1750                result_entries.push(json!({
1751                    "ErrorCode": "InvalidArgument",
1752                    "ErrorMessage": "Parameter Detail is not valid. Reason: Detail is a required argument.",
1753                }));
1754                continue;
1755            }
1756
1757            // Validate Detail is valid JSON
1758            if serde_json::from_str::<Value>(&detail).is_err() {
1759                failed_count += 1;
1760                result_entries.push(json!({
1761                    "ErrorCode": "MalformedDetail",
1762                    "ErrorMessage": "Detail is malformed.",
1763                }));
1764                continue;
1765            }
1766
1767            let event_id = uuid::Uuid::new_v4().to_string();
1768            let raw_bus = entry["EventBusName"]
1769                .as_str()
1770                .unwrap_or("default")
1771                .to_string();
1772            let event_bus_name = state.resolve_bus_name(&raw_bus);
1773            let time = if let Some(s) = entry["Time"].as_str() {
1774                DateTime::parse_from_rfc3339(s)
1775                    .map(|dt| dt.with_timezone(&Utc))
1776                    .unwrap_or_else(|_| Utc::now())
1777            } else if let Some(ts) = entry["Time"].as_f64() {
1778                DateTime::from_timestamp(ts as i64, ((ts.fract()) * 1_000_000_000.0) as u32)
1779                    .unwrap_or_else(Utc::now)
1780            } else if let Some(ts) = entry["Time"].as_i64() {
1781                DateTime::from_timestamp(ts, 0).unwrap_or_else(Utc::now)
1782            } else {
1783                Utc::now()
1784            };
1785            let resources: Vec<String> = entry["Resources"]
1786                .as_array()
1787                .map(|arr| {
1788                    arr.iter()
1789                        .filter_map(|v| v.as_str().map(|s| s.to_string()))
1790                        .collect()
1791                })
1792                .unwrap_or_default();
1793
1794            let event = PutEvent {
1795                event_id: event_id.clone(),
1796                source: source.clone(),
1797                detail_type: detail_type.clone(),
1798                detail: detail.clone(),
1799                event_bus_name: event_bus_name.clone(),
1800                time,
1801                resources: resources.clone(),
1802            };
1803
1804            // Archive matching events
1805            let archive_keys: Vec<String> = state.archives.keys().cloned().collect();
1806            for akey in archive_keys {
1807                let (archive_bus, archive_pattern, archive_enabled) = {
1808                    let a = &state.archives[&akey];
1809                    (
1810                        state.resolve_bus_name(&a.event_source_arn),
1811                        a.event_pattern.clone(),
1812                        a.state == "ENABLED",
1813                    )
1814                };
1815                if archive_bus == event_bus_name && archive_enabled {
1816                    // Check if event matches archive's event pattern
1817                    let pattern_matches = matches_pattern(
1818                        archive_pattern.as_deref(),
1819                        &source,
1820                        &detail_type,
1821                        &detail,
1822                        &req.account_id,
1823                        &req.region,
1824                        &resources,
1825                    );
1826                    if pattern_matches {
1827                        if let Some(archive) = state.archives.get_mut(&akey) {
1828                            archive.event_count += 1;
1829                            archive.size_bytes += detail.len() as i64;
1830                            archive.events.push(event.clone());
1831                        }
1832                    }
1833                }
1834            }
1835
1836            state.events.push(event);
1837
1838            // Find matching rules and their targets
1839            let matching_targets: Vec<EventTarget> = state
1840                .rules
1841                .values()
1842                .filter(|r| {
1843                    r.event_bus_name == event_bus_name
1844                        && r.state == "ENABLED"
1845                        && matches_pattern(
1846                            r.event_pattern.as_deref(),
1847                            &source,
1848                            &detail_type,
1849                            &detail,
1850                            &req.account_id,
1851                            &req.region,
1852                            &resources,
1853                        )
1854                })
1855                .flat_map(|r| r.targets.clone())
1856                .collect();
1857
1858            if !matching_targets.is_empty() {
1859                events_to_deliver.push((
1860                    event_id.clone(),
1861                    source,
1862                    detail_type,
1863                    detail,
1864                    time,
1865                    resources,
1866                    matching_targets,
1867                ));
1868            }
1869
1870            result_entries.push(json!({ "EventId": event_id }));
1871        }
1872
1873        // Drop the lock before delivering
1874        drop(state);
1875
1876        // Deliver to targets
1877        for (event_id, source, detail_type, detail, time, resources, targets) in events_to_deliver {
1878            let detail_value: Value = serde_json::from_str(&detail).unwrap_or(json!({}));
1879            let event_json = json!({
1880                "version": "0",
1881                "id": event_id,
1882                "source": source,
1883                "account": req.account_id,
1884                "detail-type": detail_type,
1885                "detail": detail_value,
1886                "time": time.format("%Y-%m-%dT%H:%M:%SZ").to_string(),
1887                "region": req.region,
1888                "resources": resources,
1889            });
1890            let event_str = event_json.to_string();
1891
1892            for target in targets {
1893                let arn = &target.arn;
1894                // Compute the message body, applying InputTransformer if present
1895                let body_str = if let Some(ref transformer) = target.input_transformer {
1896                    apply_input_transformer(transformer, &event_json)
1897                } else if let Some(ref input) = target.input {
1898                    input.clone()
1899                } else if let Some(ref input_path) = target.input_path {
1900                    resolve_json_path(&event_json, input_path)
1901                        .map(|v| v.to_string())
1902                        .unwrap_or_else(|| event_str.clone())
1903                } else {
1904                    event_str.clone()
1905                };
1906
1907                if arn.contains(":sqs:") {
1908                    // Extract FIFO parameters (MessageGroupId)
1909                    let group_id = target
1910                        .sqs_parameters
1911                        .as_ref()
1912                        .and_then(|p| p["MessageGroupId"].as_str())
1913                        .map(|s| s.to_string());
1914                    if group_id.is_some() {
1915                        // FIFO queue: send with group ID but no dedup ID.
1916                        // Queues with content-based dedup will auto-generate one;
1917                        // queues without it will reject the message.
1918                        self.delivery.send_to_sqs_with_attrs(
1919                            arn,
1920                            &body_str,
1921                            &HashMap::new(),
1922                            group_id.as_deref(),
1923                            None,
1924                        );
1925                    } else {
1926                        self.delivery.send_to_sqs(arn, &body_str, &HashMap::new());
1927                    }
1928                } else if arn.contains(":sns:") {
1929                    self.delivery
1930                        .publish_to_sns(arn, &body_str, Some(&detail_type));
1931                } else if arn.contains(":lambda:") {
1932                    tracing::info!(
1933                        function_arn = %arn,
1934                        payload = %body_str,
1935                        "EventBridge delivering to Lambda function"
1936                    );
1937                    let now = Utc::now();
1938                    let mut state = self.state.write();
1939                    state
1940                        .lambda_invocations
1941                        .push(crate::state::LambdaInvocation {
1942                            function_arn: arn.clone(),
1943                            payload: body_str.clone(),
1944                            timestamp: now,
1945                        });
1946                    drop(state);
1947                    // Record in Lambda state for cross-service visibility
1948                    if let Some(ref ls) = self.lambda_state {
1949                        ls.write().invocations.push(LambdaInvocation {
1950                            function_arn: arn.clone(),
1951                            payload: body_str.clone(),
1952                            timestamp: now,
1953                            source: "aws:events".to_string(),
1954                        });
1955                    }
1956                    // Actually invoke the Lambda function if a container runtime is available
1957                    invoke_lambda_async(
1958                        &self.container_runtime,
1959                        &self.lambda_state,
1960                        arn,
1961                        &body_str,
1962                    );
1963                } else if arn.contains(":logs:") {
1964                    tracing::info!(
1965                        log_group_arn = %arn,
1966                        payload = %body_str,
1967                        "EventBridge delivering to CloudWatch Logs"
1968                    );
1969                    let now = Utc::now();
1970                    let mut state = self.state.write();
1971                    state.log_deliveries.push(crate::state::LogDelivery {
1972                        log_group_arn: arn.clone(),
1973                        payload: body_str.clone(),
1974                        timestamp: now,
1975                    });
1976                    drop(state);
1977                    // Write event to CloudWatch Logs state
1978                    if let Some(ref log_state) = self.logs_state {
1979                        deliver_to_logs(log_state, arn, &body_str, now);
1980                    }
1981                } else if arn.contains(":kinesis:") {
1982                    tracing::info!(
1983                        stream_arn = %arn,
1984                        "EventBridge delivering to Kinesis stream"
1985                    );
1986                    // Use event ID as partition key for even distribution
1987                    self.delivery.send_to_kinesis(arn, &body_str, &event_id);
1988                } else if arn.contains(":states:") {
1989                    tracing::info!(
1990                        state_machine_arn = %arn,
1991                        payload = %body_str,
1992                        "EventBridge delivering to Step Functions (stub)"
1993                    );
1994                    let mut state = self.state.write();
1995                    state
1996                        .step_function_executions
1997                        .push(crate::state::StepFunctionExecution {
1998                            state_machine_arn: arn.clone(),
1999                            payload: body_str.clone(),
2000                            timestamp: Utc::now(),
2001                        });
2002                } else if arn.contains(":api-destination/") {
2003                    // ApiDestination target: look up destination + connection, then POST
2004                    let state = self.state.read();
2005                    let dest = state.api_destinations.values().find(|d| d.arn == *arn);
2006                    if let Some(dest) = dest {
2007                        let url = dest.invocation_endpoint.clone();
2008                        let method = dest.http_method.clone();
2009                        let conn = state
2010                            .connections
2011                            .values()
2012                            .find(|c| c.arn == dest.connection_arn)
2013                            .cloned();
2014                        drop(state);
2015
2016                        let payload = body_str.clone();
2017                        tokio::spawn(async move {
2018                            let client = reqwest::Client::new();
2019                            let mut req_builder = match method.as_str() {
2020                                "GET" => client.get(&url),
2021                                "PUT" => client.put(&url),
2022                                "DELETE" => client.delete(&url),
2023                                "PATCH" => client.patch(&url),
2024                                "HEAD" => client.head(&url),
2025                                _ => client.post(&url),
2026                            };
2027                            req_builder = req_builder.header("Content-Type", "application/json");
2028
2029                            // Apply auth from connection
2030                            if let Some(conn) = conn {
2031                                req_builder = apply_connection_auth(req_builder, &conn);
2032                            }
2033
2034                            let result = req_builder.body(payload).send().await;
2035                            if let Err(e) = result {
2036                                tracing::warn!(
2037                                    endpoint = %url,
2038                                    error = %e,
2039                                    "EventBridge ApiDestination delivery failed"
2040                                );
2041                            }
2042                        });
2043                    }
2044                } else if arn.starts_with("https://") || arn.starts_with("http://") {
2045                    // HTTP target — fire-and-forget POST
2046                    let url = arn.clone();
2047                    let payload = body_str.clone();
2048                    tokio::spawn(async move {
2049                        let client = reqwest::Client::new();
2050                        let result = client
2051                            .post(&url)
2052                            .header("Content-Type", "application/json")
2053                            .body(payload)
2054                            .send()
2055                            .await;
2056                        if let Err(e) = result {
2057                            tracing::warn!(
2058                                endpoint = %url,
2059                                error = %e,
2060                                "EventBridge HTTP target delivery failed"
2061                            );
2062                        }
2063                    });
2064                }
2065            }
2066        }
2067
2068        let resp = json!({
2069            "FailedEntryCount": failed_count,
2070            "Entries": result_entries,
2071        });
2072
2073        Ok(AwsResponse::ok_json(resp))
2074    }
2075
2076    // ─── Tagging ────────────────────────────────────────────────────────
2077
2078    fn tag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2079        let body = req.json_body();
2080        validate_required("ResourceARN", &body["ResourceARN"])?;
2081        let arn = body["ResourceARN"]
2082            .as_str()
2083            .ok_or_else(|| missing("ResourceARN"))?;
2084        validate_string_length("resourceARN", arn, 1, 1600)?;
2085        validate_required("Tags", &body["Tags"])?;
2086
2087        let mut state = self.state.write();
2088        let tag_map = find_tags_mut(&mut state, arn)?;
2089
2090        fakecloud_core::tags::apply_tags(tag_map, &body, "Tags", "Key", "Value").map_err(|f| {
2091            AwsServiceError::aws_error(
2092                StatusCode::BAD_REQUEST,
2093                "ValidationException",
2094                format!("{f} must be a list"),
2095            )
2096        })?;
2097
2098        Ok(AwsResponse::ok_json(json!({})))
2099    }
2100
2101    fn untag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2102        let body = req.json_body();
2103        validate_required("ResourceARN", &body["ResourceARN"])?;
2104        let arn = body["ResourceARN"]
2105            .as_str()
2106            .ok_or_else(|| missing("ResourceARN"))?;
2107        validate_string_length("resourceARN", arn, 1, 1600)?;
2108        validate_required("TagKeys", &body["TagKeys"])?;
2109
2110        let mut state = self.state.write();
2111        let tag_map = find_tags_mut(&mut state, arn)?;
2112
2113        fakecloud_core::tags::remove_tags(tag_map, &body, "TagKeys").map_err(|f| {
2114            AwsServiceError::aws_error(
2115                StatusCode::BAD_REQUEST,
2116                "ValidationException",
2117                format!("{f} must be a list"),
2118            )
2119        })?;
2120
2121        Ok(AwsResponse::ok_json(json!({})))
2122    }
2123
2124    fn list_tags_for_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2125        let body = req.json_body();
2126        validate_required("ResourceARN", &body["ResourceARN"])?;
2127        let arn = body["ResourceARN"]
2128            .as_str()
2129            .ok_or_else(|| missing("ResourceARN"))?;
2130        validate_string_length("resourceARN", arn, 1, 1600)?;
2131
2132        let state = self.state.read();
2133        let tag_map = find_tags(&state, arn)?;
2134
2135        let tags = fakecloud_core::tags::tags_to_json(tag_map, "Key", "Value");
2136
2137        Ok(AwsResponse::ok_json(json!({ "Tags": tags })))
2138    }
2139
2140    // ─── Archive Operations ─────────────────────────────────────────────
2141
2142    fn create_archive(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2143        let body = req.json_body();
2144        validate_required("ArchiveName", &body["ArchiveName"])?;
2145        let name = body["ArchiveName"]
2146            .as_str()
2147            .ok_or_else(|| missing("ArchiveName"))?
2148            .to_string();
2149        validate_string_length("archiveName", &name, 1, 48)?;
2150        validate_required("EventSourceArn", &body["EventSourceArn"])?;
2151        let event_source_arn = body["EventSourceArn"]
2152            .as_str()
2153            .ok_or_else(|| missing("EventSourceArn"))?
2154            .to_string();
2155        validate_string_length("eventSourceArn", &event_source_arn, 1, 1600)?;
2156        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2157        validate_optional_string_length("eventPattern", body["EventPattern"].as_str(), 0, 4096)?;
2158        if let Some(rd) = body["RetentionDays"].as_i64() {
2159            validate_range_i64("retentionDays", rd, 0, i64::MAX)?;
2160        }
2161        let description = body["Description"].as_str().map(|s| s.to_string());
2162        let event_pattern = body["EventPattern"].as_str().map(|s| s.to_string());
2163        let retention_days = body["RetentionDays"].as_i64().unwrap_or(0);
2164
2165        // Validate event pattern if provided
2166        if let Some(ref pattern) = event_pattern {
2167            validate_event_pattern(pattern)?;
2168        }
2169
2170        let mut state = self.state.write();
2171
2172        // Validate event bus exists
2173        let bus_name = state.resolve_bus_name(&event_source_arn);
2174        if !state.buses.contains_key(&bus_name) {
2175            return Err(AwsServiceError::aws_error(
2176                StatusCode::BAD_REQUEST,
2177                "ResourceNotFoundException",
2178                format!("Event bus {bus_name} does not exist."),
2179            ));
2180        }
2181
2182        // Check duplicate
2183        if state.archives.contains_key(&name) {
2184            return Err(AwsServiceError::aws_error(
2185                StatusCode::BAD_REQUEST,
2186                "ResourceAlreadyExistsException",
2187                format!("Archive {name} already exists."),
2188            ));
2189        }
2190
2191        let now = Utc::now();
2192        let arn = format!(
2193            "arn:aws:events:{}:{}:archive/{}",
2194            req.region, state.account_id, name
2195        );
2196
2197        let archive = Archive {
2198            name: name.clone(),
2199            arn: arn.clone(),
2200            event_source_arn: event_source_arn.clone(),
2201            description,
2202            event_pattern: event_pattern.clone(),
2203            retention_days,
2204            state: "ENABLED".to_string(),
2205            creation_time: now,
2206            event_count: 0,
2207            size_bytes: 0,
2208            events: Vec::new(),
2209        };
2210        state.archives.insert(name.clone(), archive);
2211
2212        // Create the archive rule
2213        let rule_name = format!("Events-Archive-{name}");
2214        let rule_arn = format!(
2215            "arn:aws:events:{}:{}:rule/{}",
2216            req.region, state.account_id, rule_name
2217        );
2218        // Merge archive event pattern with replay-name filter
2219        let rule_event_pattern = {
2220            let mut merged = if let Some(ref ep) = event_pattern {
2221                serde_json::from_str::<Value>(ep).unwrap_or_else(|_| json!({}))
2222            } else {
2223                json!({})
2224            };
2225            if let Some(obj) = merged.as_object_mut() {
2226                obj.insert("replay-name".to_string(), json!([{"exists": false}]));
2227            }
2228            serde_json::to_string(&merged).unwrap_or_default()
2229        };
2230
2231        // Build the archive target with InputTransformer
2232        let archive_target = EventTarget {
2233            id: name.clone(),
2234            arn: format!("arn:aws:events:{}:::", req.region),
2235            input: None,
2236            input_path: None,
2237            input_transformer: Some(json!({
2238                "InputPathsMap": {},
2239                "InputTemplate": format!(
2240                    "{{\"archive-arn\": \"{}\", \"event\": <aws.events.event.json>, \"ingestion-time\": <aws.events.event.ingestion-time>}}",
2241                    arn
2242                )
2243            })),
2244            sqs_parameters: None,
2245        };
2246
2247        let archive_rule = EventRule {
2248            name: rule_name.clone(),
2249            arn: rule_arn,
2250            event_bus_name: bus_name.clone(),
2251            event_pattern: Some(rule_event_pattern),
2252            schedule_expression: None,
2253            state: "ENABLED".to_string(),
2254            description: None,
2255            role_arn: None,
2256            managed_by: Some("prod.vhs.events.aws.internal".to_string()),
2257            created_by: Some(state.account_id.clone()),
2258            targets: vec![archive_target],
2259            tags: HashMap::new(),
2260            last_fired: None,
2261        };
2262        let key = (bus_name, rule_name);
2263        state.rules.insert(key, archive_rule);
2264
2265        Ok(AwsResponse::ok_json(json!({
2266            "ArchiveArn": arn,
2267            "CreationTime": now.timestamp() as f64,
2268            "State": "ENABLED",
2269        })))
2270    }
2271
2272    fn describe_archive(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2273        let body = req.json_body();
2274        validate_required("ArchiveName", &body["ArchiveName"])?;
2275        let name = body["ArchiveName"]
2276            .as_str()
2277            .ok_or_else(|| missing("ArchiveName"))?;
2278        validate_string_length("archiveName", name, 1, 48)?;
2279
2280        let state = self.state.read();
2281        let archive = state.archives.get(name).ok_or_else(|| {
2282            AwsServiceError::aws_error(
2283                StatusCode::BAD_REQUEST,
2284                "ResourceNotFoundException",
2285                format!("Archive {name} does not exist."),
2286            )
2287        })?;
2288
2289        let mut resp = json!({
2290            "ArchiveArn": archive.arn,
2291            "ArchiveName": archive.name,
2292            "CreationTime": archive.creation_time.timestamp() as f64,
2293            "EventCount": archive.event_count,
2294            "EventSourceArn": archive.event_source_arn,
2295            "RetentionDays": archive.retention_days,
2296            "SizeBytes": archive.size_bytes,
2297            "State": archive.state,
2298        });
2299        if let Some(ref desc) = archive.description {
2300            resp["Description"] = json!(desc);
2301        }
2302        if let Some(ref ep) = archive.event_pattern {
2303            resp["EventPattern"] = json!(ep);
2304        }
2305
2306        Ok(AwsResponse::ok_json(resp))
2307    }
2308
2309    fn list_archives(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2310        let body = req.json_body();
2311        validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 48)?;
2312        validate_optional_string_length(
2313            "eventSourceArn",
2314            body["EventSourceArn"].as_str(),
2315            1,
2316            1600,
2317        )?;
2318        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
2319        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
2320        let name_prefix = body["NamePrefix"].as_str();
2321        let source_arn = body["EventSourceArn"].as_str();
2322        let archive_state = body["State"].as_str();
2323
2324        // Validate at most one filter
2325        let filter_count = [
2326            name_prefix.is_some(),
2327            source_arn.is_some(),
2328            archive_state.is_some(),
2329        ]
2330        .iter()
2331        .filter(|&&x| x)
2332        .count();
2333        if filter_count > 1 {
2334            return Err(AwsServiceError::aws_error(
2335                StatusCode::BAD_REQUEST,
2336                "ValidationException",
2337                "At most one filter is allowed for ListArchives. Use either : State, EventSourceArn, or NamePrefix.",
2338            ));
2339        }
2340
2341        // Validate state
2342        if let Some(s) = archive_state {
2343            let valid = [
2344                "ENABLED",
2345                "DISABLED",
2346                "CREATING",
2347                "UPDATING",
2348                "CREATE_FAILED",
2349                "UPDATE_FAILED",
2350            ];
2351            if !valid.contains(&s) {
2352                return Err(AwsServiceError::aws_error(
2353                    StatusCode::BAD_REQUEST,
2354                    "ValidationException",
2355                    format!(
2356                        "1 validation error detected: Value '{}' at 'state' failed to satisfy constraint: Member must satisfy enum value set: [ENABLED, DISABLED, CREATING, UPDATING, CREATE_FAILED, UPDATE_FAILED]",
2357                        s
2358                    ),
2359                ));
2360            }
2361        }
2362
2363        let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
2364
2365        let state = self.state.read();
2366        let all: Vec<Value> = state
2367            .archives
2368            .values()
2369            .filter(|a| {
2370                if let Some(prefix) = name_prefix {
2371                    a.name.starts_with(prefix)
2372                } else if let Some(arn) = source_arn {
2373                    a.event_source_arn == arn
2374                } else if let Some(s) = archive_state {
2375                    a.state == s
2376                } else {
2377                    true
2378                }
2379            })
2380            .map(|a| {
2381                json!({
2382                    "ArchiveName": a.name,
2383                    "CreationTime": a.creation_time.timestamp() as f64,
2384                    "EventCount": a.event_count,
2385                    "EventSourceArn": a.event_source_arn,
2386                    "RetentionDays": a.retention_days,
2387                    "SizeBytes": a.size_bytes,
2388                    "State": a.state,
2389                })
2390            })
2391            .collect();
2392
2393        let (archives, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
2394        let mut resp = json!({ "Archives": archives });
2395        if let Some(token) = next_token {
2396            resp["NextToken"] = json!(token);
2397        }
2398
2399        Ok(AwsResponse::ok_json(resp))
2400    }
2401
2402    fn update_archive(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2403        let body = req.json_body();
2404        validate_required("ArchiveName", &body["ArchiveName"])?;
2405        let name = body["ArchiveName"]
2406            .as_str()
2407            .ok_or_else(|| missing("ArchiveName"))?;
2408        validate_string_length("archiveName", name, 1, 48)?;
2409        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2410        validate_optional_string_length("eventPattern", body["EventPattern"].as_str(), 0, 4096)?;
2411        if let Some(rd) = body["RetentionDays"].as_i64() {
2412            validate_range_i64("retentionDays", rd, 0, i64::MAX)?;
2413        }
2414
2415        // Validate event pattern if provided
2416        if let Some(pattern) = body["EventPattern"].as_str() {
2417            validate_event_pattern(pattern)?;
2418        }
2419
2420        let mut state = self.state.write();
2421        let archive = state.archives.get_mut(name).ok_or_else(|| {
2422            AwsServiceError::aws_error(
2423                StatusCode::BAD_REQUEST,
2424                "ResourceNotFoundException",
2425                format!("Archive {name} does not exist."),
2426            )
2427        })?;
2428
2429        if let Some(desc) = body["Description"].as_str() {
2430            archive.description = Some(desc.to_string());
2431        }
2432        if let Some(pattern) = body["EventPattern"].as_str() {
2433            archive.event_pattern = Some(pattern.to_string());
2434        }
2435        if let Some(days) = body["RetentionDays"].as_i64() {
2436            archive.retention_days = days;
2437        }
2438
2439        Ok(AwsResponse::ok_json(json!({
2440            "ArchiveArn": archive.arn,
2441            "CreationTime": archive.creation_time.timestamp() as f64,
2442            "State": archive.state,
2443        })))
2444    }
2445
2446    fn delete_archive(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2447        let body = req.json_body();
2448        validate_required("ArchiveName", &body["ArchiveName"])?;
2449        let name = body["ArchiveName"]
2450            .as_str()
2451            .ok_or_else(|| missing("ArchiveName"))?;
2452        validate_string_length("archiveName", name, 1, 48)?;
2453
2454        let mut state = self.state.write();
2455        if !state.archives.contains_key(name) {
2456            return Err(AwsServiceError::aws_error(
2457                StatusCode::BAD_REQUEST,
2458                "ResourceNotFoundException",
2459                format!("Archive {name} does not exist."),
2460            ));
2461        }
2462
2463        state.archives.remove(name);
2464
2465        // Remove the archive rule
2466        let rule_name = format!("Events-Archive-{name}");
2467        state.rules.retain(|k, _| k.1 != rule_name);
2468
2469        Ok(AwsResponse::ok_json(json!({})))
2470    }
2471
2472    // ─── Connection Operations ──────────────────────────────────────────
2473
2474    fn create_connection(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2475        let body = req.json_body();
2476        validate_required("Name", &body["Name"])?;
2477        let name = body["Name"]
2478            .as_str()
2479            .ok_or_else(|| missing("Name"))?
2480            .to_string();
2481        validate_string_length("name", &name, 1, 64)?;
2482        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2483        validate_required("AuthorizationType", &body["AuthorizationType"])?;
2484        let description = body["Description"].as_str().map(|s| s.to_string());
2485        let auth_type = body["AuthorizationType"]
2486            .as_str()
2487            .ok_or_else(|| missing("AuthorizationType"))?
2488            .to_string();
2489        validate_enum(
2490            "authorizationType",
2491            &auth_type,
2492            &["BASIC", "OAUTH_CLIENT_CREDENTIALS", "API_KEY"],
2493        )?;
2494        validate_optional_string_length(
2495            "kmsKeyIdentifier",
2496            body["KmsKeyIdentifier"].as_str(),
2497            0,
2498            2048,
2499        )?;
2500        validate_required("AuthParameters", &body["AuthParameters"])?;
2501        let auth_params = body["AuthParameters"].clone();
2502
2503        let mut state = self.state.write();
2504        let now = Utc::now();
2505        let conn_uuid = uuid::Uuid::new_v4();
2506        let arn = format!(
2507            "arn:aws:events:{}:{}:connection/{}/{}",
2508            req.region, state.account_id, name, conn_uuid
2509        );
2510        let secret_arn = format!(
2511            "arn:aws:secretsmanager:{}:{}:secret:events!connection/{}/{}",
2512            req.region, state.account_id, name, conn_uuid
2513        );
2514
2515        let conn = Connection {
2516            name: name.clone(),
2517            arn: arn.clone(),
2518            description,
2519            authorization_type: auth_type.clone(),
2520            auth_parameters: auth_params,
2521            connection_state: "AUTHORIZED".to_string(),
2522            secret_arn: secret_arn.clone(),
2523            creation_time: now,
2524            last_modified_time: now,
2525            last_authorized_time: now,
2526        };
2527        state.connections.insert(name, conn);
2528
2529        Ok(AwsResponse::ok_json(json!({
2530            "ConnectionArn": arn,
2531            "ConnectionState": "AUTHORIZED",
2532            "CreationTime": now.timestamp() as f64,
2533            "LastModifiedTime": now.timestamp() as f64,
2534        })))
2535    }
2536
2537    fn describe_connection(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2538        let body = req.json_body();
2539        validate_required("Name", &body["Name"])?;
2540        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
2541        validate_string_length("name", name, 1, 64)?;
2542
2543        let state = self.state.read();
2544        let conn = state.connections.get(name).ok_or_else(|| {
2545            AwsServiceError::aws_error(
2546                StatusCode::BAD_REQUEST,
2547                "ResourceNotFoundException",
2548                format!("Connection '{name}' does not exist."),
2549            )
2550        })?;
2551
2552        // Build auth parameters response - strip secrets
2553        let auth_params_response =
2554            build_auth_params_response(&conn.authorization_type, &conn.auth_parameters);
2555
2556        let mut resp = json!({
2557            "ConnectionArn": conn.arn,
2558            "Name": conn.name,
2559            "AuthorizationType": conn.authorization_type,
2560            "AuthParameters": auth_params_response,
2561            "ConnectionState": conn.connection_state,
2562            "SecretArn": conn.secret_arn,
2563            "CreationTime": conn.creation_time.timestamp() as f64,
2564            "LastModifiedTime": conn.last_modified_time.timestamp() as f64,
2565            "LastAuthorizedTime": conn.last_authorized_time.timestamp() as f64,
2566        });
2567        if let Some(ref desc) = conn.description {
2568            resp["Description"] = json!(desc);
2569        }
2570
2571        Ok(AwsResponse::ok_json(resp))
2572    }
2573
2574    fn list_connections(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2575        let body = req.json_body();
2576        validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 64)?;
2577        validate_optional_enum(
2578            "connectionState",
2579            body["ConnectionState"].as_str(),
2580            &[
2581                "CREATING",
2582                "UPDATING",
2583                "DELETING",
2584                "AUTHORIZED",
2585                "DEAUTHORIZED",
2586                "AUTHORIZING",
2587                "DEAUTHORIZING",
2588                "ACTIVE",
2589                "FAILED_CONNECTIVITY",
2590            ],
2591        )?;
2592        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
2593        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
2594
2595        let name_prefix = body["NamePrefix"].as_str();
2596        let connection_state = body["ConnectionState"].as_str();
2597        let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
2598
2599        let state = self.state.read();
2600        let all: Vec<Value> = state
2601            .connections
2602            .values()
2603            .filter(|c| {
2604                if let Some(prefix) = name_prefix {
2605                    if !c.name.starts_with(prefix) {
2606                        return false;
2607                    }
2608                }
2609                if let Some(cs) = connection_state {
2610                    if c.connection_state != cs {
2611                        return false;
2612                    }
2613                }
2614                true
2615            })
2616            .map(|c| {
2617                json!({
2618                    "ConnectionArn": c.arn,
2619                    "Name": c.name,
2620                    "AuthorizationType": c.authorization_type,
2621                    "ConnectionState": c.connection_state,
2622                    "CreationTime": c.creation_time.timestamp() as f64,
2623                    "LastModifiedTime": c.last_modified_time.timestamp() as f64,
2624                    "LastAuthorizedTime": c.last_authorized_time.timestamp() as f64,
2625                })
2626            })
2627            .collect();
2628
2629        let (conns, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
2630        let mut resp = json!({ "Connections": conns });
2631        if let Some(token) = next_token {
2632            resp["NextToken"] = json!(token);
2633        }
2634
2635        Ok(AwsResponse::ok_json(resp))
2636    }
2637
2638    fn update_connection(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2639        let body = req.json_body();
2640        validate_required("Name", &body["Name"])?;
2641        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
2642        validate_string_length("name", name, 1, 64)?;
2643        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2644        validate_optional_enum(
2645            "authorizationType",
2646            body["AuthorizationType"].as_str(),
2647            &["BASIC", "OAUTH_CLIENT_CREDENTIALS", "API_KEY"],
2648        )?;
2649
2650        let mut state = self.state.write();
2651        let conn = state.connections.get_mut(name).ok_or_else(|| {
2652            AwsServiceError::aws_error(
2653                StatusCode::BAD_REQUEST,
2654                "ResourceNotFoundException",
2655                format!("Connection '{name}' does not exist."),
2656            )
2657        })?;
2658
2659        if let Some(desc) = body["Description"].as_str() {
2660            conn.description = Some(desc.to_string());
2661        }
2662        if let Some(auth_type) = body["AuthorizationType"].as_str() {
2663            conn.authorization_type = auth_type.to_string();
2664        }
2665        if body.get("AuthParameters").is_some() {
2666            conn.auth_parameters = body["AuthParameters"].clone();
2667        }
2668        conn.last_modified_time = Utc::now();
2669
2670        Ok(AwsResponse::ok_json(json!({
2671            "ConnectionArn": conn.arn,
2672            "ConnectionState": conn.connection_state,
2673            "CreationTime": conn.creation_time.timestamp() as f64,
2674            "LastModifiedTime": conn.last_modified_time.timestamp() as f64,
2675            "LastAuthorizedTime": conn.last_authorized_time.timestamp() as f64,
2676        })))
2677    }
2678
2679    fn delete_connection(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2680        let body = req.json_body();
2681        validate_required("Name", &body["Name"])?;
2682        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
2683        validate_string_length("name", name, 1, 64)?;
2684
2685        let mut state = self.state.write();
2686        let conn = state.connections.remove(name).ok_or_else(|| {
2687            AwsServiceError::aws_error(
2688                StatusCode::BAD_REQUEST,
2689                "ResourceNotFoundException",
2690                format!("Connection '{name}' does not exist."),
2691            )
2692        })?;
2693
2694        Ok(AwsResponse::ok_json(json!({
2695            "ConnectionArn": conn.arn,
2696            "ConnectionState": conn.connection_state,
2697            "CreationTime": conn.creation_time.timestamp() as f64,
2698            "LastModifiedTime": conn.last_modified_time.timestamp() as f64,
2699            "LastAuthorizedTime": conn.last_authorized_time.timestamp() as f64,
2700        })))
2701    }
2702
2703    // ─── API Destination Operations ─────────────────────────────────────
2704
2705    fn create_api_destination(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2706        let body = req.json_body();
2707        validate_required("Name", &body["Name"])?;
2708        let name = body["Name"]
2709            .as_str()
2710            .ok_or_else(|| missing("Name"))?
2711            .to_string();
2712        validate_string_length("name", &name, 1, 64)?;
2713        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2714        validate_required("ConnectionArn", &body["ConnectionArn"])?;
2715        let description = body["Description"].as_str().map(|s| s.to_string());
2716        let connection_arn = body["ConnectionArn"]
2717            .as_str()
2718            .ok_or_else(|| missing("ConnectionArn"))?
2719            .to_string();
2720        validate_string_length("connectionArn", &connection_arn, 1, 1600)?;
2721        validate_required("InvocationEndpoint", &body["InvocationEndpoint"])?;
2722        let endpoint = body["InvocationEndpoint"]
2723            .as_str()
2724            .ok_or_else(|| missing("InvocationEndpoint"))?
2725            .to_string();
2726        validate_string_length("invocationEndpoint", &endpoint, 1, 2048)?;
2727        validate_required("HttpMethod", &body["HttpMethod"])?;
2728        let http_method = body["HttpMethod"]
2729            .as_str()
2730            .ok_or_else(|| missing("HttpMethod"))?
2731            .to_string();
2732        validate_enum(
2733            "httpMethod",
2734            &http_method,
2735            &["POST", "GET", "HEAD", "OPTIONS", "PUT", "PATCH", "DELETE"],
2736        )?;
2737        let rate_limit = body["InvocationRateLimitPerSecond"].as_i64();
2738        if let Some(r) = rate_limit {
2739            validate_range_i64("invocationRateLimitPerSecond", r, 1, i64::MAX)?;
2740        }
2741
2742        let mut state = self.state.write();
2743        let now = Utc::now();
2744        let dest_uuid = uuid::Uuid::new_v4();
2745        let arn = format!(
2746            "arn:aws:events:{}:{}:api-destination/{}/{}",
2747            req.region, state.account_id, name, dest_uuid
2748        );
2749
2750        let dest = ApiDestination {
2751            name: name.clone(),
2752            arn: arn.clone(),
2753            description,
2754            connection_arn,
2755            invocation_endpoint: endpoint,
2756            http_method,
2757            invocation_rate_limit_per_second: rate_limit,
2758            state: "ACTIVE".to_string(),
2759            creation_time: now,
2760            last_modified_time: now,
2761        };
2762        state.api_destinations.insert(name, dest);
2763
2764        Ok(AwsResponse::ok_json(json!({
2765            "ApiDestinationArn": arn,
2766            "ApiDestinationState": "ACTIVE",
2767            "CreationTime": now.timestamp() as f64,
2768            "LastModifiedTime": now.timestamp() as f64,
2769        })))
2770    }
2771
2772    fn describe_api_destination(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2773        let body = req.json_body();
2774        validate_required("Name", &body["Name"])?;
2775        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
2776        validate_string_length("name", name, 1, 64)?;
2777
2778        let state = self.state.read();
2779        let dest = state.api_destinations.get(name).ok_or_else(|| {
2780            AwsServiceError::aws_error(
2781                StatusCode::BAD_REQUEST,
2782                "ResourceNotFoundException",
2783                format!("An api-destination '{name}' does not exist."),
2784            )
2785        })?;
2786
2787        let mut resp = json!({
2788            "ApiDestinationArn": dest.arn,
2789            "Name": dest.name,
2790            "ConnectionArn": dest.connection_arn,
2791            "InvocationEndpoint": dest.invocation_endpoint,
2792            "HttpMethod": dest.http_method,
2793            "ApiDestinationState": dest.state,
2794            "CreationTime": dest.creation_time.timestamp() as f64,
2795            "LastModifiedTime": dest.last_modified_time.timestamp() as f64,
2796        });
2797        if let Some(ref desc) = dest.description {
2798            resp["Description"] = json!(desc);
2799        }
2800        if let Some(rate) = dest.invocation_rate_limit_per_second {
2801            resp["InvocationRateLimitPerSecond"] = json!(rate);
2802        }
2803
2804        Ok(AwsResponse::ok_json(resp))
2805    }
2806
2807    fn list_api_destinations(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2808        let body = req.json_body();
2809        validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 64)?;
2810        validate_optional_string_length("connectionArn", body["ConnectionArn"].as_str(), 1, 1600)?;
2811        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
2812        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
2813
2814        let name_prefix = body["NamePrefix"].as_str();
2815        let connection_arn = body["ConnectionArn"].as_str();
2816        let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
2817
2818        let state = self.state.read();
2819        let all: Vec<Value> = state
2820            .api_destinations
2821            .values()
2822            .filter(|d| {
2823                if let Some(prefix) = name_prefix {
2824                    if !d.name.starts_with(prefix) {
2825                        return false;
2826                    }
2827                }
2828                if let Some(arn) = connection_arn {
2829                    if d.connection_arn != arn {
2830                        return false;
2831                    }
2832                }
2833                true
2834            })
2835            .map(|d| {
2836                let mut obj = json!({
2837                    "ApiDestinationArn": d.arn,
2838                    "Name": d.name,
2839                    "ConnectionArn": d.connection_arn,
2840                    "InvocationEndpoint": d.invocation_endpoint,
2841                    "HttpMethod": d.http_method,
2842                    "ApiDestinationState": d.state,
2843                    "CreationTime": d.creation_time.timestamp() as f64,
2844                    "LastModifiedTime": d.last_modified_time.timestamp() as f64,
2845                });
2846                if let Some(rate) = d.invocation_rate_limit_per_second {
2847                    obj["InvocationRateLimitPerSecond"] = json!(rate);
2848                }
2849                obj
2850            })
2851            .collect();
2852
2853        let (dests, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
2854        let mut resp = json!({ "ApiDestinations": dests });
2855        if let Some(token) = next_token {
2856            resp["NextToken"] = json!(token);
2857        }
2858
2859        Ok(AwsResponse::ok_json(resp))
2860    }
2861
2862    fn update_api_destination(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2863        let body = req.json_body();
2864        validate_required("Name", &body["Name"])?;
2865        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
2866        validate_string_length("name", name, 1, 64)?;
2867        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2868        validate_optional_string_length("connectionArn", body["ConnectionArn"].as_str(), 1, 1600)?;
2869        validate_optional_string_length(
2870            "invocationEndpoint",
2871            body["InvocationEndpoint"].as_str(),
2872            1,
2873            2048,
2874        )?;
2875        validate_optional_enum(
2876            "httpMethod",
2877            body["HttpMethod"].as_str(),
2878            &["POST", "GET", "HEAD", "OPTIONS", "PUT", "PATCH", "DELETE"],
2879        )?;
2880        if let Some(r) = body["InvocationRateLimitPerSecond"].as_i64() {
2881            validate_range_i64("invocationRateLimitPerSecond", r, 1, i64::MAX)?;
2882        }
2883
2884        let mut state = self.state.write();
2885        let dest = state.api_destinations.get_mut(name).ok_or_else(|| {
2886            AwsServiceError::aws_error(
2887                StatusCode::BAD_REQUEST,
2888                "ResourceNotFoundException",
2889                format!("An api-destination '{name}' does not exist."),
2890            )
2891        })?;
2892
2893        if let Some(desc) = body["Description"].as_str() {
2894            dest.description = Some(desc.to_string());
2895        }
2896        if let Some(endpoint) = body["InvocationEndpoint"].as_str() {
2897            dest.invocation_endpoint = endpoint.to_string();
2898        }
2899        if let Some(method) = body["HttpMethod"].as_str() {
2900            dest.http_method = method.to_string();
2901        }
2902        if let Some(rate) = body["InvocationRateLimitPerSecond"].as_i64() {
2903            dest.invocation_rate_limit_per_second = Some(rate);
2904        }
2905        if let Some(conn) = body["ConnectionArn"].as_str() {
2906            dest.connection_arn = conn.to_string();
2907        }
2908        dest.last_modified_time = Utc::now();
2909
2910        Ok(AwsResponse::ok_json(json!({
2911            "ApiDestinationArn": dest.arn,
2912            "ApiDestinationState": dest.state,
2913            "CreationTime": dest.creation_time.timestamp() as f64,
2914            "LastModifiedTime": dest.last_modified_time.timestamp() as f64,
2915        })))
2916    }
2917
2918    fn delete_api_destination(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2919        let body = req.json_body();
2920        validate_required("Name", &body["Name"])?;
2921        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
2922        validate_string_length("name", name, 1, 64)?;
2923
2924        let mut state = self.state.write();
2925        if !state.api_destinations.contains_key(name) {
2926            return Err(AwsServiceError::aws_error(
2927                StatusCode::BAD_REQUEST,
2928                "ResourceNotFoundException",
2929                format!("An api-destination '{name}' does not exist."),
2930            ));
2931        }
2932        state.api_destinations.remove(name);
2933
2934        Ok(AwsResponse::ok_json(json!({})))
2935    }
2936
2937    // ─── Replay Operations ──────────────────────────────────────────────
2938
2939    fn start_replay(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2940        let body = req.json_body();
2941        validate_required("ReplayName", &body["ReplayName"])?;
2942        let name = body["ReplayName"]
2943            .as_str()
2944            .ok_or_else(|| missing("ReplayName"))?
2945            .to_string();
2946        validate_string_length("replayName", &name, 1, 64)?;
2947        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2948        validate_required("EventSourceArn", &body["EventSourceArn"])?;
2949        let description = body["Description"].as_str().map(|s| s.to_string());
2950        let event_source_arn = body["EventSourceArn"]
2951            .as_str()
2952            .ok_or_else(|| missing("EventSourceArn"))?
2953            .to_string();
2954        validate_string_length("eventSourceArn", &event_source_arn, 1, 1600)?;
2955        validate_required("EventStartTime", &body["EventStartTime"])?;
2956        validate_required("EventEndTime", &body["EventEndTime"])?;
2957        validate_required("Destination", &body["Destination"])?;
2958        let destination = body["Destination"].clone();
2959        let event_start_time_f = body["EventStartTime"].as_f64();
2960        let event_end_time_f = body["EventEndTime"].as_f64();
2961
2962        let event_start_time = event_start_time_f
2963            .and_then(|f| DateTime::from_timestamp(f as i64, 0))
2964            .unwrap_or_else(Utc::now);
2965        let event_end_time = event_end_time_f
2966            .and_then(|f| DateTime::from_timestamp(f as i64, 0))
2967            .unwrap_or_else(Utc::now);
2968
2969        // Validate destination ARN
2970        let dest_arn = destination["Arn"].as_str().unwrap_or("");
2971        if !dest_arn.contains(":event-bus/") {
2972            return Err(AwsServiceError::aws_error(
2973                StatusCode::BAD_REQUEST,
2974                "ValidationException",
2975                "Parameter Destination.Arn is not valid. Reason: Must contain an event bus ARN.",
2976            ));
2977        }
2978
2979        let mut state = self.state.write();
2980
2981        // Validate event bus exists
2982        let bus_name = state.resolve_bus_name(dest_arn);
2983        if !state.buses.contains_key(&bus_name) {
2984            return Err(AwsServiceError::aws_error(
2985                StatusCode::BAD_REQUEST,
2986                "ResourceNotFoundException",
2987                format!("Event bus {bus_name} does not exist."),
2988            ));
2989        }
2990
2991        // Validate archive exists
2992        let archive_name = event_source_arn
2993            .rsplit_once("archive/")
2994            .map(|(_, n)| n.to_string())
2995            .unwrap_or_default();
2996        if !state.archives.contains_key(&archive_name) {
2997            return Err(AwsServiceError::aws_error(
2998                StatusCode::BAD_REQUEST,
2999                "ValidationException",
3000                format!(
3001                    "Parameter EventSourceArn is not valid. Reason: Archive {archive_name} does not exist."
3002                ),
3003            ));
3004        }
3005
3006        // Validate archive bus matches destination bus
3007        let archive = state.archives.get(&archive_name).unwrap();
3008        let archive_bus = state.resolve_bus_name(&archive.event_source_arn);
3009        if archive_bus != bus_name {
3010            return Err(AwsServiceError::aws_error(
3011                StatusCode::BAD_REQUEST,
3012                "ValidationException",
3013                "Parameter Destination.Arn is not valid. Reason: Cross event bus replay is not permitted.",
3014            ));
3015        }
3016
3017        // Validate end time after start time
3018        if event_end_time <= event_start_time {
3019            return Err(AwsServiceError::aws_error(
3020                StatusCode::BAD_REQUEST,
3021                "ValidationException",
3022                "Parameter EventEndTime is not valid. Reason: EventStartTime must be before EventEndTime.",
3023            ));
3024        }
3025
3026        // Check duplicate
3027        if state.replays.contains_key(&name) {
3028            return Err(AwsServiceError::aws_error(
3029                StatusCode::BAD_REQUEST,
3030                "ResourceAlreadyExistsException",
3031                format!("Replay {name} already exists."),
3032            ));
3033        }
3034
3035        let now = Utc::now();
3036        let arn = format!(
3037            "arn:aws:events:{}:{}:replay/{}",
3038            req.region, state.account_id, name
3039        );
3040
3041        // Collect archived events within the replay time range
3042        let archive = state.archives.get(&archive_name).unwrap();
3043        let replay_events: Vec<PutEvent> = archive
3044            .events
3045            .iter()
3046            .filter(|e| e.time >= event_start_time && e.time < event_end_time)
3047            .cloned()
3048            .collect();
3049
3050        // Find matching rules and their targets for each replayed event
3051        let mut events_to_deliver: Vec<(PutEvent, Vec<EventTarget>)> = Vec::new();
3052
3053        for event in &replay_events {
3054            let matching_targets: Vec<EventTarget> = state
3055                .rules
3056                .values()
3057                .filter(|r| {
3058                    r.event_bus_name == bus_name
3059                        && r.state == "ENABLED"
3060                        && matches_pattern(
3061                            r.event_pattern.as_deref(),
3062                            &event.source,
3063                            &event.detail_type,
3064                            &event.detail,
3065                            &req.account_id,
3066                            &req.region,
3067                            &event.resources,
3068                        )
3069                })
3070                .flat_map(|r| r.targets.clone())
3071                .collect();
3072
3073            if !matching_targets.is_empty() {
3074                events_to_deliver.push((event.clone(), matching_targets));
3075            }
3076        }
3077
3078        let replay = Replay {
3079            name: name.clone(),
3080            arn: arn.clone(),
3081            description,
3082            event_source_arn,
3083            destination,
3084            event_start_time,
3085            event_end_time,
3086            state: "COMPLETED".to_string(),
3087            replay_start_time: now,
3088            replay_end_time: Some(now),
3089        };
3090        state.replays.insert(name, replay);
3091
3092        // Drop the lock before delivering
3093        drop(state);
3094
3095        // Deliver replayed events to targets (same logic as PutEvents)
3096        for (event, targets) in events_to_deliver {
3097            let detail_value: Value = serde_json::from_str(&event.detail).unwrap_or(json!({}));
3098            let event_json = json!({
3099                "version": "0",
3100                "id": event.event_id,
3101                "source": event.source,
3102                "account": req.account_id,
3103                "detail-type": event.detail_type,
3104                "detail": detail_value,
3105                "time": event.time.format("%Y-%m-%dT%H:%M:%SZ").to_string(),
3106                "region": req.region,
3107                "resources": event.resources,
3108                "replay-name": arn,
3109            });
3110            let event_str = event_json.to_string();
3111
3112            for target in targets {
3113                let target_arn = &target.arn;
3114                let body_str = if let Some(ref transformer) = target.input_transformer {
3115                    apply_input_transformer(transformer, &event_json)
3116                } else if let Some(ref input) = target.input {
3117                    input.clone()
3118                } else if let Some(ref input_path) = target.input_path {
3119                    resolve_json_path(&event_json, input_path)
3120                        .map(|v| v.to_string())
3121                        .unwrap_or_else(|| event_str.clone())
3122                } else {
3123                    event_str.clone()
3124                };
3125
3126                if target_arn.contains(":sqs:") {
3127                    let group_id = target
3128                        .sqs_parameters
3129                        .as_ref()
3130                        .and_then(|p| p["MessageGroupId"].as_str())
3131                        .map(|s| s.to_string());
3132                    if group_id.is_some() {
3133                        self.delivery.send_to_sqs_with_attrs(
3134                            target_arn,
3135                            &body_str,
3136                            &HashMap::new(),
3137                            group_id.as_deref(),
3138                            None,
3139                        );
3140                    } else {
3141                        self.delivery
3142                            .send_to_sqs(target_arn, &body_str, &HashMap::new());
3143                    }
3144                } else if target_arn.contains(":sns:") {
3145                    self.delivery
3146                        .publish_to_sns(target_arn, &body_str, Some(&event.detail_type));
3147                } else if target_arn.contains(":lambda:") {
3148                    let mut state = self.state.write();
3149                    state
3150                        .lambda_invocations
3151                        .push(crate::state::LambdaInvocation {
3152                            function_arn: target_arn.clone(),
3153                            payload: body_str.clone(),
3154                            timestamp: Utc::now(),
3155                        });
3156                    drop(state);
3157                    if let Some(ref ls) = self.lambda_state {
3158                        ls.write().invocations.push(LambdaInvocation {
3159                            function_arn: target_arn.clone(),
3160                            payload: body_str.clone(),
3161                            timestamp: Utc::now(),
3162                            source: "aws:events".to_string(),
3163                        });
3164                    }
3165                    invoke_lambda_async(
3166                        &self.container_runtime,
3167                        &self.lambda_state,
3168                        target_arn,
3169                        &body_str,
3170                    );
3171                } else if target_arn.contains(":logs:") {
3172                    let mut state = self.state.write();
3173                    state.log_deliveries.push(crate::state::LogDelivery {
3174                        log_group_arn: target_arn.clone(),
3175                        payload: body_str.clone(),
3176                        timestamp: Utc::now(),
3177                    });
3178                    drop(state);
3179                    if let Some(ref log_state) = self.logs_state {
3180                        deliver_to_logs(log_state, target_arn, &body_str, Utc::now());
3181                    }
3182                } else if target_arn.contains(":states:") {
3183                    let mut state = self.state.write();
3184                    state
3185                        .step_function_executions
3186                        .push(crate::state::StepFunctionExecution {
3187                            state_machine_arn: target_arn.clone(),
3188                            payload: body_str.clone(),
3189                            timestamp: Utc::now(),
3190                        });
3191                } else if target_arn.starts_with("https://") || target_arn.starts_with("http://") {
3192                    let url = target_arn.clone();
3193                    let payload = body_str.clone();
3194                    tokio::spawn(async move {
3195                        let client = reqwest::Client::new();
3196                        let _ = client
3197                            .post(&url)
3198                            .header("Content-Type", "application/json")
3199                            .body(payload)
3200                            .send()
3201                            .await;
3202                    });
3203                }
3204            }
3205        }
3206
3207        Ok(AwsResponse::ok_json(json!({
3208            "ReplayArn": arn,
3209            "ReplayStartTime": now.timestamp() as f64,
3210            "State": "STARTING",
3211        })))
3212    }
3213
3214    fn describe_replay(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3215        let body = req.json_body();
3216        validate_required("ReplayName", &body["ReplayName"])?;
3217        let name = body["ReplayName"]
3218            .as_str()
3219            .ok_or_else(|| missing("ReplayName"))?;
3220        validate_string_length("replayName", name, 1, 64)?;
3221
3222        let state = self.state.read();
3223        let replay = state.replays.get(name).ok_or_else(|| {
3224            AwsServiceError::aws_error(
3225                StatusCode::BAD_REQUEST,
3226                "ResourceNotFoundException",
3227                format!("Replay {name} does not exist."),
3228            )
3229        })?;
3230
3231        let mut resp = json!({
3232            "Destination": replay.destination,
3233            "EventSourceArn": replay.event_source_arn,
3234            "EventStartTime": replay.event_start_time.timestamp() as f64,
3235            "EventEndTime": replay.event_end_time.timestamp() as f64,
3236            "ReplayArn": replay.arn,
3237            "ReplayName": replay.name,
3238            "ReplayStartTime": replay.replay_start_time.timestamp() as f64,
3239            "State": replay.state,
3240        });
3241        if let Some(ref desc) = replay.description {
3242            resp["Description"] = json!(desc);
3243        }
3244        if let Some(ref end) = replay.replay_end_time {
3245            resp["ReplayEndTime"] = json!(end.timestamp() as f64);
3246        }
3247
3248        Ok(AwsResponse::ok_json(resp))
3249    }
3250
3251    fn list_replays(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3252        let body = req.json_body();
3253        validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 64)?;
3254        validate_optional_string_length(
3255            "eventSourceArn",
3256            body["EventSourceArn"].as_str(),
3257            1,
3258            1600,
3259        )?;
3260        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
3261        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
3262        let name_prefix = body["NamePrefix"].as_str();
3263        let source_arn = body["EventSourceArn"].as_str();
3264        let replay_state = body["State"].as_str();
3265
3266        // Validate at most one filter
3267        let filter_count = [
3268            name_prefix.is_some(),
3269            source_arn.is_some(),
3270            replay_state.is_some(),
3271        ]
3272        .iter()
3273        .filter(|&&x| x)
3274        .count();
3275        if filter_count > 1 {
3276            return Err(AwsServiceError::aws_error(
3277                StatusCode::BAD_REQUEST,
3278                "ValidationException",
3279                "At most one filter is allowed for ListReplays. Use either : State, EventSourceArn, or NamePrefix.",
3280            ));
3281        }
3282
3283        // Validate state
3284        if let Some(s) = replay_state {
3285            let valid = [
3286                "CANCELLED",
3287                "CANCELLING",
3288                "COMPLETED",
3289                "FAILED",
3290                "RUNNING",
3291                "STARTING",
3292            ];
3293            if !valid.contains(&s) {
3294                return Err(AwsServiceError::aws_error(
3295                    StatusCode::BAD_REQUEST,
3296                    "ValidationException",
3297                    format!(
3298                        "1 validation error detected: Value '{}' at 'state' failed to satisfy constraint: Member must satisfy enum value set: [CANCELLED, CANCELLING, COMPLETED, FAILED, RUNNING, STARTING]",
3299                        s
3300                    ),
3301                ));
3302            }
3303        }
3304
3305        let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
3306
3307        let state = self.state.read();
3308        let all: Vec<Value> = state
3309            .replays
3310            .values()
3311            .filter(|r| {
3312                if let Some(prefix) = name_prefix {
3313                    r.name.starts_with(prefix)
3314                } else if let Some(arn) = source_arn {
3315                    r.event_source_arn == arn
3316                } else if let Some(s) = replay_state {
3317                    r.state == s
3318                } else {
3319                    true
3320                }
3321            })
3322            .map(|r| {
3323                let mut obj = json!({
3324                    "EventSourceArn": r.event_source_arn,
3325                    "EventStartTime": r.event_start_time.timestamp() as f64,
3326                    "EventEndTime": r.event_end_time.timestamp() as f64,
3327                    "ReplayName": r.name,
3328                    "ReplayStartTime": r.replay_start_time.timestamp() as f64,
3329                    "State": r.state,
3330                });
3331                if let Some(ref end) = r.replay_end_time {
3332                    obj["ReplayEndTime"] = json!(end.timestamp() as f64);
3333                }
3334                obj
3335            })
3336            .collect();
3337
3338        let (replays, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
3339        let mut resp = json!({ "Replays": replays });
3340        if let Some(token) = next_token {
3341            resp["NextToken"] = json!(token);
3342        }
3343
3344        Ok(AwsResponse::ok_json(resp))
3345    }
3346
3347    fn cancel_replay(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3348        let body = req.json_body();
3349        validate_required("ReplayName", &body["ReplayName"])?;
3350        let name = body["ReplayName"]
3351            .as_str()
3352            .ok_or_else(|| missing("ReplayName"))?;
3353        validate_string_length("replayName", name, 1, 64)?;
3354
3355        let mut state = self.state.write();
3356        let replay = state.replays.get_mut(name).ok_or_else(|| {
3357            AwsServiceError::aws_error(
3358                StatusCode::BAD_REQUEST,
3359                "ResourceNotFoundException",
3360                format!("Replay {name} does not exist."),
3361            )
3362        })?;
3363
3364        // Can only cancel STARTING or RUNNING replays (or COMPLETED in our mock)
3365        if replay.state == "CANCELLED" || replay.state == "CANCELLING" {
3366            return Err(AwsServiceError::aws_error(
3367                StatusCode::BAD_REQUEST,
3368                "IllegalStatusException",
3369                format!("Replay {name} is not in a valid state for this operation."),
3370            ));
3371        }
3372
3373        let arn = replay.arn.clone();
3374        replay.state = "CANCELLED".to_string();
3375
3376        Ok(AwsResponse::ok_json(json!({
3377            "ReplayArn": arn,
3378            "State": "CANCELLING",
3379        })))
3380    }
3381}
3382
3383// ─── Tag Lookup Helpers ─────────────────────────────────────────────────
3384
3385fn find_tags_mut<'a>(
3386    state: &'a mut crate::state::EventBridgeState,
3387    arn: &str,
3388) -> Result<&'a mut HashMap<String, String>, AwsServiceError> {
3389    // Check buses
3390    for bus in state.buses.values_mut() {
3391        if bus.arn == arn {
3392            return Ok(&mut bus.tags);
3393        }
3394    }
3395    // Check rules
3396    for rule in state.rules.values_mut() {
3397        if rule.arn == arn {
3398            return Ok(&mut rule.tags);
3399        }
3400    }
3401
3402    // Parse ARN to give better error messages
3403    let error_msg = if arn.contains(":rule/") {
3404        // Extract rule name and bus from ARN
3405        let parts: Vec<&str> = arn.rsplitn(2, ":rule/").collect();
3406        if let Some(rule_path) = parts.first() {
3407            if let Some((bus, rule_name)) = rule_path.rsplit_once('/') {
3408                format!("Rule {rule_name} does not exist on EventBus {bus}.")
3409            } else {
3410                format!("Rule {} does not exist on EventBus default.", rule_path)
3411            }
3412        } else {
3413            format!("Resource {arn} not found.")
3414        }
3415    } else {
3416        format!("Resource {arn} not found.")
3417    };
3418
3419    Err(AwsServiceError::aws_error(
3420        StatusCode::BAD_REQUEST,
3421        "ResourceNotFoundException",
3422        error_msg,
3423    ))
3424}
3425
3426fn find_tags<'a>(
3427    state: &'a crate::state::EventBridgeState,
3428    arn: &str,
3429) -> Result<&'a HashMap<String, String>, AwsServiceError> {
3430    for bus in state.buses.values() {
3431        if bus.arn == arn {
3432            return Ok(&bus.tags);
3433        }
3434    }
3435    for rule in state.rules.values() {
3436        if rule.arn == arn {
3437            return Ok(&rule.tags);
3438        }
3439    }
3440
3441    let error_msg = if arn.contains(":rule/") {
3442        let parts: Vec<&str> = arn.rsplitn(2, ":rule/").collect();
3443        if let Some(rule_path) = parts.first() {
3444            if let Some((bus, rule_name)) = rule_path.rsplit_once('/') {
3445                format!("Rule {rule_name} does not exist on EventBus {bus}.")
3446            } else {
3447                format!("Rule {} does not exist on EventBus default.", rule_path)
3448            }
3449        } else {
3450            format!("Resource {arn} not found.")
3451        }
3452    } else {
3453        format!("Resource {arn} not found.")
3454    };
3455
3456    Err(AwsServiceError::aws_error(
3457        StatusCode::BAD_REQUEST,
3458        "ResourceNotFoundException",
3459        error_msg,
3460    ))
3461}
3462
3463// ─── Event Pattern Validation ────────────────────────────────────────
3464
3465fn validate_event_pattern(pattern: &str) -> Result<(), AwsServiceError> {
3466    let parsed: Value = serde_json::from_str(pattern).map_err(|_| {
3467        AwsServiceError::aws_error(
3468            StatusCode::BAD_REQUEST,
3469            "InvalidEventPatternException",
3470            "Event pattern is not valid. Reason: Invalid JSON",
3471        )
3472    })?;
3473
3474    validate_pattern_values(&parsed, "")?;
3475    Ok(())
3476}
3477
3478fn validate_pattern_values(value: &Value, path: &str) -> Result<(), AwsServiceError> {
3479    match value {
3480        Value::Object(obj) => {
3481            for (key, val) in obj {
3482                let new_path = if path.is_empty() {
3483                    key.clone()
3484                } else {
3485                    format!("{path}.{key}")
3486                };
3487                match val {
3488                    Value::Object(_) => validate_pattern_values(val, &new_path)?,
3489                    Value::Array(_) => {} // Arrays are fine at leaf level
3490                    _ => {
3491                        return Err(AwsServiceError::aws_error(
3492                            StatusCode::BAD_REQUEST,
3493                            "InvalidEventPatternException",
3494                            format!(
3495                                "Event pattern is not valid. Reason: '{}' must be an object or an array",
3496                                key
3497                            ),
3498                        ));
3499                    }
3500                }
3501            }
3502            Ok(())
3503        }
3504        _ => Ok(()),
3505    }
3506}
3507
3508// ─── Connection Auth Params Response Builder ────────────────────────
3509
3510fn build_auth_params_response(auth_type: &str, params: &Value) -> Value {
3511    match auth_type {
3512        "API_KEY" => {
3513            let mut resp = json!({});
3514            if let Some(api_key) = params.get("ApiKeyAuthParameters") {
3515                resp["ApiKeyAuthParameters"] = json!({
3516                    "ApiKeyName": api_key["ApiKeyName"],
3517                });
3518            }
3519            resp
3520        }
3521        "BASIC" => {
3522            let mut resp = json!({});
3523            if let Some(basic) = params.get("BasicAuthParameters") {
3524                resp["BasicAuthParameters"] = json!({
3525                    "Username": basic["Username"],
3526                });
3527            }
3528            resp
3529        }
3530        "OAUTH_CLIENT_CREDENTIALS" => {
3531            let mut resp = json!({});
3532            if let Some(oauth) = params.get("OAuthParameters") {
3533                resp["OAuthParameters"] = json!({
3534                    "AuthorizationEndpoint": oauth["AuthorizationEndpoint"],
3535                    "HttpMethod": oauth["HttpMethod"],
3536                    "ClientParameters": {
3537                        "ClientID": oauth.get("ClientParameters").and_then(|c| c.get("ClientID")),
3538                    },
3539                });
3540            }
3541            resp
3542        }
3543        _ => params.clone(),
3544    }
3545}
3546
3547// ─── Event Pattern Matching ─────────────────────────────────────────
3548
3549/// Match an event against an EventBridge event pattern.
3550pub fn matches_pattern(
3551    pattern_json: Option<&str>,
3552    source: &str,
3553    detail_type: &str,
3554    detail: &str,
3555    account: &str,
3556    region: &str,
3557    resources: &[String],
3558) -> bool {
3559    let pattern_json = match pattern_json {
3560        Some(p) => p,
3561        None => return true,
3562    };
3563
3564    let pattern: Value = match serde_json::from_str(pattern_json) {
3565        Ok(v) => v,
3566        Err(_) => return false,
3567    };
3568
3569    let pattern_obj = match pattern.as_object() {
3570        Some(o) => o,
3571        None => return false,
3572    };
3573
3574    let detail_value: Value = serde_json::from_str(detail).unwrap_or(json!({}));
3575    let event = json!({
3576        "source": source,
3577        "detail-type": detail_type,
3578        "detail": detail_value,
3579        "account": account,
3580        "region": region,
3581        "resources": resources,
3582    });
3583
3584    for (key, pattern_value) in pattern_obj {
3585        let event_value = &event[key];
3586        if !matches_value(pattern_value, event_value) {
3587            return false;
3588        }
3589    }
3590
3591    true
3592}
3593
3594fn matches_value(pattern: &Value, event_value: &Value) -> bool {
3595    match pattern {
3596        Value::Object(obj) => {
3597            for (key, sub_pattern) in obj {
3598                let sub_value = &event_value[key];
3599                if !matches_value(sub_pattern, sub_value) {
3600                    return false;
3601                }
3602            }
3603            true
3604        }
3605        Value::Array(arr) => arr.iter().any(|elem| matches_single(elem, event_value)),
3606        _ => false,
3607    }
3608}
3609
3610fn matches_single(pattern_elem: &Value, event_value: &Value) -> bool {
3611    match pattern_elem {
3612        Value::Object(obj) => {
3613            if let Some(prefix_val) = obj.get("prefix") {
3614                if let (Some(prefix), Some(actual)) = (prefix_val.as_str(), event_value.as_str()) {
3615                    return actual.starts_with(prefix);
3616                }
3617                return false;
3618            }
3619            if let Some(exists_val) = obj.get("exists") {
3620                let should_exist = exists_val.as_bool().unwrap_or(true);
3621                let does_exist = !event_value.is_null();
3622                return should_exist == does_exist;
3623            }
3624            if let Some(anything_but_val) = obj.get("anything-but") {
3625                return match anything_but_val {
3626                    Value::String(s) => event_value.as_str() != Some(s.as_str()),
3627                    Value::Array(arr) => !arr.iter().any(|v| values_equal(v, event_value)),
3628                    Value::Number(_) => event_value != anything_but_val,
3629                    _ => true,
3630                };
3631            }
3632            if let Some(numeric_val) = obj.get("numeric") {
3633                return matches_numeric(numeric_val, event_value);
3634            }
3635            false
3636        }
3637        _ => values_equal(pattern_elem, event_value),
3638    }
3639}
3640
3641fn matches_numeric(numeric_arr: &Value, event_value: &Value) -> bool {
3642    let arr = match numeric_arr.as_array() {
3643        Some(a) => a,
3644        None => return false,
3645    };
3646    let actual = match event_value.as_f64() {
3647        Some(n) => n,
3648        None => return false,
3649    };
3650    let mut i = 0;
3651    while i + 1 < arr.len() {
3652        let op = match arr[i].as_str() {
3653            Some(s) => s,
3654            None => return false,
3655        };
3656        let threshold = match arr[i + 1].as_f64() {
3657            Some(n) => n,
3658            None => return false,
3659        };
3660        let ok = match op {
3661            ">" => actual > threshold,
3662            ">=" => actual >= threshold,
3663            "<" => actual < threshold,
3664            "<=" => actual <= threshold,
3665            "=" => (actual - threshold).abs() < f64::EPSILON,
3666            _ => return false,
3667        };
3668        if !ok {
3669            return false;
3670        }
3671        i += 2;
3672    }
3673    true
3674}
3675
3676fn values_equal(a: &Value, b: &Value) -> bool {
3677    a == b
3678}
3679
3680/// Resolve a simple JSON path like `$.detail.name` against an event JSON value.
3681fn resolve_json_path(event: &Value, path: &str) -> Option<Value> {
3682    let path = path.strip_prefix('$').unwrap_or(path);
3683    let mut current = event;
3684    for segment in path.split('.') {
3685        if segment.is_empty() {
3686            continue;
3687        }
3688        current = current.get(segment)?;
3689    }
3690    Some(current.clone())
3691}
3692
3693/// Apply an EventBridge InputTransformer to an event.
3694fn apply_input_transformer(transformer: &Value, event: &Value) -> String {
3695    let input_paths_map = transformer
3696        .get("InputPathsMap")
3697        .and_then(|v| v.as_object())
3698        .cloned()
3699        .unwrap_or_default();
3700    let template = transformer
3701        .get("InputTemplate")
3702        .and_then(|v| v.as_str())
3703        .unwrap_or("")
3704        .to_string();
3705
3706    // Resolve all input paths
3707    let mut resolved: HashMap<String, Value> = HashMap::new();
3708    for (var_name, path_val) in &input_paths_map {
3709        if let Some(path_str) = path_val.as_str() {
3710            if let Some(val) = resolve_json_path(event, path_str) {
3711                resolved.insert(var_name.clone(), val);
3712            }
3713        }
3714    }
3715
3716    // Replace <varName> placeholders in template
3717    let mut result = template;
3718    for (var_name, val) in &resolved {
3719        let placeholder = format!("<{var_name}>");
3720        let replacement = match val {
3721            Value::String(s) => s.clone(),
3722            other => other.to_string(),
3723        };
3724        result = result.replace(&placeholder, &replacement);
3725    }
3726
3727    result
3728}
3729
3730fn missing(name: &str) -> AwsServiceError {
3731    AwsServiceError::aws_error(
3732        StatusCode::BAD_REQUEST,
3733        "ValidationException",
3734        format!("The request must contain the parameter {name}"),
3735    )
3736}
3737
3738/// Extract a Lambda function name from its ARN.
3739///
3740/// Handles both unqualified (`arn:aws:lambda:region:account:function:NAME`)
3741/// and qualified (`arn:aws:lambda:region:account:function:NAME:alias`) ARNs.
3742fn function_name_from_arn(arn: &str) -> &str {
3743    let parts: Vec<&str> = arn.split(':').collect();
3744    if parts.len() >= 7 && parts[5] == "function" {
3745        parts[6]
3746    } else {
3747        arn
3748    }
3749}
3750
3751/// Spawn a background task to invoke a Lambda function via ContainerRuntime.
3752/// This is fire-and-forget: EventBridge delivery is asynchronous.
3753pub fn invoke_lambda_async(
3754    container_runtime: &Option<Arc<ContainerRuntime>>,
3755    lambda_state: &Option<SharedLambdaState>,
3756    function_arn: &str,
3757    payload: &str,
3758) {
3759    let runtime = match container_runtime {
3760        Some(rt) => rt.clone(),
3761        None => return,
3762    };
3763    let lambda_state = match lambda_state {
3764        Some(ls) => ls.clone(),
3765        None => return,
3766    };
3767    let func_name = function_name_from_arn(function_arn).to_string();
3768    let payload = payload.as_bytes().to_vec();
3769
3770    tokio::spawn(async move {
3771        let func = {
3772            let state = lambda_state.read();
3773            state.functions.get(&func_name).cloned()
3774        };
3775        let func = match func {
3776            Some(f) => f,
3777            None => {
3778                tracing::warn!(
3779                    function = %func_name,
3780                    "EventBridge Lambda target not found, skipping invocation"
3781                );
3782                return;
3783            }
3784        };
3785        match runtime.invoke(&func, &payload).await {
3786            Ok(_) => {
3787                tracing::info!(function = %func_name, "EventBridge Lambda invocation succeeded");
3788            }
3789            Err(e) => {
3790                tracing::warn!(
3791                    function = %func_name,
3792                    error = %e,
3793                    "EventBridge Lambda invocation failed"
3794                );
3795            }
3796        }
3797    });
3798}
3799
3800/// Deliver an EventBridge event to CloudWatch Logs by writing a log event
3801/// to the appropriate log group and stream.
3802pub fn deliver_to_logs(
3803    logs_state: &SharedLogsState,
3804    log_group_arn: &str,
3805    payload: &str,
3806    timestamp: chrono::DateTime<chrono::Utc>,
3807) {
3808    // Extract log group name from ARN: arn:aws:logs:region:account:log-group:NAME
3809    // or just the name if it's not an ARN
3810    let group_name = if log_group_arn.contains(":log-group:") {
3811        log_group_arn
3812            .split(":log-group:")
3813            .nth(1)
3814            .unwrap_or(log_group_arn)
3815            .trim_end_matches(":*")
3816    } else {
3817        log_group_arn
3818    };
3819
3820    let stream_name = "events".to_string();
3821    let ts_millis = timestamp.timestamp_millis();
3822
3823    let mut state = logs_state.write();
3824    let region = state.region.clone();
3825    let account_id = state.account_id.clone();
3826
3827    // Auto-create log group and stream if they don't exist
3828    let group = state
3829        .log_groups
3830        .entry(group_name.to_string())
3831        .or_insert_with(|| fakecloud_logs::state::LogGroup {
3832            name: group_name.to_string(),
3833            arn: Arn::new(
3834                "logs",
3835                &region,
3836                &account_id,
3837                &format!("log-group:{group_name}"),
3838            )
3839            .to_string(),
3840            creation_time: ts_millis,
3841            retention_in_days: None,
3842            kms_key_id: None,
3843            tags: HashMap::new(),
3844            log_streams: HashMap::new(),
3845            stored_bytes: 0,
3846            subscription_filters: Vec::new(),
3847            data_protection_policy: None,
3848            index_policies: Vec::new(),
3849            transformer: None,
3850            deletion_protection: false,
3851        });
3852
3853    let stream = group
3854        .log_streams
3855        .entry(stream_name.clone())
3856        .or_insert_with(|| fakecloud_logs::state::LogStream {
3857            name: stream_name,
3858            arn: format!("{}:log-stream:events", group.arn),
3859            creation_time: ts_millis,
3860            first_event_timestamp: None,
3861            last_event_timestamp: None,
3862            last_ingestion_time: None,
3863            upload_sequence_token: "1".to_string(),
3864            events: Vec::new(),
3865        });
3866
3867    stream.events.push(fakecloud_logs::state::LogEvent {
3868        timestamp: ts_millis,
3869        message: payload.to_string(),
3870        ingestion_time: ts_millis,
3871    });
3872    stream.last_event_timestamp = Some(ts_millis);
3873    stream.last_ingestion_time = Some(ts_millis);
3874    if stream.first_event_timestamp.is_none() {
3875        stream.first_event_timestamp = Some(ts_millis);
3876    }
3877}
3878
3879/// Apply connection auth parameters to an outgoing HTTP request.
3880fn apply_connection_auth(
3881    mut builder: reqwest::RequestBuilder,
3882    conn: &Connection,
3883) -> reqwest::RequestBuilder {
3884    match conn.authorization_type.as_str() {
3885        "API_KEY" => {
3886            if let Some(params) = conn.auth_parameters.get("ApiKeyAuthParameters") {
3887                if let (Some(name), Some(value)) = (
3888                    params["ApiKeyName"].as_str(),
3889                    params["ApiKeyValue"].as_str(),
3890                ) {
3891                    builder = builder.header(name, value);
3892                }
3893            }
3894        }
3895        "BASIC" => {
3896            if let Some(params) = conn.auth_parameters.get("BasicAuthParameters") {
3897                if let (Some(user), Some(pass)) =
3898                    (params["Username"].as_str(), params["Password"].as_str())
3899                {
3900                    builder = builder.basic_auth(user, Some(pass));
3901                }
3902            }
3903        }
3904        "OAUTH_CLIENT_CREDENTIALS" => {
3905            // For OAuth, in a real implementation we'd exchange credentials for a token.
3906            // Here we pass client credentials as basic auth as a reasonable approximation.
3907            if let Some(params) = conn.auth_parameters.get("OAuthParameters") {
3908                if let (Some(client_id), Some(client_secret)) = (
3909                    params["ClientParameters"]["ClientID"].as_str(),
3910                    params["ClientParameters"]["ClientSecret"].as_str(),
3911                ) {
3912                    builder = builder.basic_auth(client_id, Some(client_secret));
3913                }
3914            }
3915        }
3916        _ => {}
3917    }
3918    builder
3919}
3920
3921#[cfg(test)]
3922mod tests {
3923    use super::*;
3924
3925    /// Test helper that calls matches_pattern with default account/region/resources
3926    fn test_matches(
3927        pattern_json: Option<&str>,
3928        source: &str,
3929        detail_type: &str,
3930        detail: &str,
3931    ) -> bool {
3932        matches_pattern(
3933            pattern_json,
3934            source,
3935            detail_type,
3936            detail,
3937            "123456789012",
3938            "us-east-1",
3939            &[],
3940        )
3941    }
3942
3943    #[test]
3944    fn pattern_matches_source() {
3945        assert!(test_matches(
3946            Some(r#"{"source": ["my.app"]}"#),
3947            "my.app",
3948            "OrderPlaced",
3949            "{}"
3950        ));
3951        assert!(!test_matches(
3952            Some(r#"{"source": ["other.app"]}"#),
3953            "my.app",
3954            "OrderPlaced",
3955            "{}"
3956        ));
3957    }
3958
3959    #[test]
3960    fn pattern_matches_detail_type() {
3961        assert!(test_matches(
3962            Some(r#"{"detail-type": ["OrderPlaced"]}"#),
3963            "my.app",
3964            "OrderPlaced",
3965            "{}"
3966        ));
3967        assert!(!test_matches(
3968            Some(r#"{"detail-type": ["OrderShipped"]}"#),
3969            "my.app",
3970            "OrderPlaced",
3971            "{}"
3972        ));
3973    }
3974
3975    #[test]
3976    fn pattern_matches_detail_field() {
3977        assert!(test_matches(
3978            Some(r#"{"detail": {"status": ["ACTIVE"]}}"#),
3979            "my.app",
3980            "StatusChange",
3981            r#"{"status": "ACTIVE"}"#
3982        ));
3983        assert!(!test_matches(
3984            Some(r#"{"detail": {"status": ["ACTIVE"]}}"#),
3985            "my.app",
3986            "StatusChange",
3987            r#"{"status": "INACTIVE"}"#
3988        ));
3989    }
3990
3991    #[test]
3992    fn no_pattern_matches_everything() {
3993        assert!(test_matches(None, "any", "any", "{}"));
3994    }
3995
3996    #[test]
3997    fn combined_pattern() {
3998        let pattern = r#"{"source": ["orders"], "detail-type": ["OrderPlaced"]}"#;
3999        assert!(test_matches(Some(pattern), "orders", "OrderPlaced", "{}"));
4000        assert!(!test_matches(Some(pattern), "orders", "OrderShipped", "{}"));
4001        assert!(!test_matches(Some(pattern), "other", "OrderPlaced", "{}"));
4002    }
4003
4004    #[test]
4005    fn nested_detail_pattern() {
4006        let pattern = r#"{"detail": {"order": {"status": ["PLACED"]}}}"#;
4007        assert!(test_matches(
4008            Some(pattern),
4009            "my.app",
4010            "OrderEvent",
4011            r#"{"order": {"status": "PLACED", "id": "123"}}"#
4012        ));
4013        assert!(!test_matches(
4014            Some(pattern),
4015            "my.app",
4016            "OrderEvent",
4017            r#"{"order": {"status": "SHIPPED", "id": "123"}}"#
4018        ));
4019        assert!(!test_matches(
4020            Some(pattern),
4021            "my.app",
4022            "OrderEvent",
4023            r#"{"order": {"id": "123"}}"#
4024        ));
4025    }
4026
4027    #[test]
4028    fn deeply_nested_detail_pattern() {
4029        let pattern = r#"{"detail": {"a": {"b": {"c": ["deep"]}}}}"#;
4030        assert!(test_matches(
4031            Some(pattern),
4032            "src",
4033            "type",
4034            r#"{"a": {"b": {"c": "deep"}}}"#
4035        ));
4036        assert!(!test_matches(
4037            Some(pattern),
4038            "src",
4039            "type",
4040            r#"{"a": {"b": {"c": "shallow"}}}"#
4041        ));
4042    }
4043
4044    #[test]
4045    fn prefix_matcher() {
4046        let pattern = r#"{"source": [{"prefix": "com.myapp"}]}"#;
4047        assert!(test_matches(
4048            Some(pattern),
4049            "com.myapp.orders",
4050            "OrderPlaced",
4051            "{}"
4052        ));
4053        assert!(test_matches(
4054            Some(pattern),
4055            "com.myapp",
4056            "OrderPlaced",
4057            "{}"
4058        ));
4059        assert!(!test_matches(
4060            Some(pattern),
4061            "com.other",
4062            "OrderPlaced",
4063            "{}"
4064        ));
4065    }
4066
4067    #[test]
4068    fn prefix_matcher_in_detail() {
4069        let pattern = r#"{"detail": {"region": [{"prefix": "us-"}]}}"#;
4070        assert!(test_matches(
4071            Some(pattern),
4072            "src",
4073            "type",
4074            r#"{"region": "us-east-1"}"#
4075        ));
4076        assert!(!test_matches(
4077            Some(pattern),
4078            "src",
4079            "type",
4080            r#"{"region": "eu-west-1"}"#
4081        ));
4082    }
4083
4084    #[test]
4085    fn exists_matcher() {
4086        let pattern = r#"{"detail": {"error": [{"exists": true}]}}"#;
4087        assert!(test_matches(
4088            Some(pattern),
4089            "src",
4090            "type",
4091            r#"{"error": "something broke"}"#
4092        ));
4093        assert!(!test_matches(
4094            Some(pattern),
4095            "src",
4096            "type",
4097            r#"{"status": "ok"}"#
4098        ));
4099
4100        let pattern = r#"{"detail": {"error": [{"exists": false}]}}"#;
4101        assert!(test_matches(
4102            Some(pattern),
4103            "src",
4104            "type",
4105            r#"{"status": "ok"}"#
4106        ));
4107        assert!(!test_matches(
4108            Some(pattern),
4109            "src",
4110            "type",
4111            r#"{"error": "something broke"}"#
4112        ));
4113    }
4114
4115    #[test]
4116    fn anything_but_matcher() {
4117        let pattern = r#"{"source": [{"anything-but": "internal"}]}"#;
4118        assert!(test_matches(Some(pattern), "external", "Event", "{}"));
4119        assert!(!test_matches(Some(pattern), "internal", "Event", "{}"));
4120
4121        let pattern = r#"{"source": [{"anything-but": ["internal", "test"]}]}"#;
4122        assert!(test_matches(Some(pattern), "external", "Event", "{}"));
4123        assert!(!test_matches(Some(pattern), "internal", "Event", "{}"));
4124        assert!(!test_matches(Some(pattern), "test", "Event", "{}"));
4125    }
4126
4127    #[test]
4128    fn anything_but_in_detail() {
4129        let pattern = r#"{"detail": {"env": [{"anything-but": "prod"}]}}"#;
4130        assert!(test_matches(
4131            Some(pattern),
4132            "src",
4133            "type",
4134            r#"{"env": "staging"}"#
4135        ));
4136        assert!(!test_matches(
4137            Some(pattern),
4138            "src",
4139            "type",
4140            r#"{"env": "prod"}"#
4141        ));
4142    }
4143
4144    #[test]
4145    fn numeric_greater_than() {
4146        let pattern = r#"{"detail": {"count": [{"numeric": [">", 100]}]}}"#;
4147        assert!(test_matches(
4148            Some(pattern),
4149            "src",
4150            "type",
4151            r#"{"count": 150}"#
4152        ));
4153        assert!(!test_matches(
4154            Some(pattern),
4155            "src",
4156            "type",
4157            r#"{"count": 100}"#
4158        ));
4159        assert!(!test_matches(
4160            Some(pattern),
4161            "src",
4162            "type",
4163            r#"{"count": 50}"#
4164        ));
4165    }
4166
4167    #[test]
4168    fn numeric_less_than() {
4169        let pattern = r#"{"detail": {"count": [{"numeric": ["<", 10]}]}}"#;
4170        assert!(test_matches(
4171            Some(pattern),
4172            "src",
4173            "type",
4174            r#"{"count": 5}"#
4175        ));
4176        assert!(!test_matches(
4177            Some(pattern),
4178            "src",
4179            "type",
4180            r#"{"count": 10}"#
4181        ));
4182        assert!(!test_matches(
4183            Some(pattern),
4184            "src",
4185            "type",
4186            r#"{"count": 15}"#
4187        ));
4188    }
4189
4190    #[test]
4191    fn numeric_range() {
4192        let pattern = r#"{"detail": {"count": [{"numeric": [">=", 50, "<", 200]}]}}"#;
4193        assert!(test_matches(
4194            Some(pattern),
4195            "src",
4196            "type",
4197            r#"{"count": 50}"#
4198        ));
4199        assert!(test_matches(
4200            Some(pattern),
4201            "src",
4202            "type",
4203            r#"{"count": 100}"#
4204        ));
4205        assert!(!test_matches(
4206            Some(pattern),
4207            "src",
4208            "type",
4209            r#"{"count": 200}"#
4210        ));
4211        assert!(!test_matches(
4212            Some(pattern),
4213            "src",
4214            "type",
4215            r#"{"count": 49}"#
4216        ));
4217    }
4218
4219    #[test]
4220    fn mixed_matchers_and_literals() {
4221        let pattern = r#"{"source": ["exact.match", {"prefix": "com.myapp"}]}"#;
4222        assert!(test_matches(Some(pattern), "exact.match", "Event", "{}"));
4223        assert!(test_matches(
4224            Some(pattern),
4225            "com.myapp.orders",
4226            "Event",
4227            "{}"
4228        ));
4229        assert!(!test_matches(Some(pattern), "other.source", "Event", "{}"));
4230    }
4231
4232    // ---- list_connections / list_api_destinations filtering & pagination ----
4233
4234    use crate::state::EventBridgeState;
4235    use fakecloud_core::delivery::DeliveryBus;
4236    use parking_lot::RwLock;
4237
4238    fn make_service() -> EventBridgeService {
4239        let state = Arc::new(RwLock::new(EventBridgeState::new(
4240            "123456789012",
4241            "us-east-1",
4242        )));
4243        let delivery = Arc::new(DeliveryBus::new());
4244        EventBridgeService::new(state, delivery)
4245    }
4246
4247    fn make_request(action: &str, body: Value) -> AwsRequest {
4248        AwsRequest {
4249            service: "events".to_string(),
4250            action: action.to_string(),
4251            region: "us-east-1".to_string(),
4252            account_id: "123456789012".to_string(),
4253            request_id: "test-id".to_string(),
4254            headers: http::HeaderMap::new(),
4255            query_params: HashMap::new(),
4256            body: serde_json::to_vec(&body).unwrap().into(),
4257            path_segments: vec![],
4258            raw_path: "/".to_string(),
4259            raw_query: String::new(),
4260            method: http::Method::POST,
4261            is_query_protocol: false,
4262            access_key_id: None,
4263        }
4264    }
4265
4266    fn create_connection(svc: &EventBridgeService, name: &str) {
4267        let req = make_request(
4268            "CreateConnection",
4269            json!({
4270                "Name": name,
4271                "AuthorizationType": "API_KEY",
4272                "AuthParameters": {
4273                    "ApiKeyAuthParameters": {
4274                        "ApiKeyName": "x-api-key",
4275                        "ApiKeyValue": "secret"
4276                    }
4277                }
4278            }),
4279        );
4280        svc.create_connection(&req).unwrap();
4281    }
4282
4283    fn create_api_destination(svc: &EventBridgeService, name: &str, conn_name: &str) {
4284        let conn_arn_field = {
4285            let state = svc.state.read();
4286            state.connections.get(conn_name).unwrap().arn.clone()
4287        };
4288        let req = make_request(
4289            "CreateApiDestination",
4290            json!({
4291                "Name": name,
4292                "ConnectionArn": conn_arn_field,
4293                "InvocationEndpoint": "https://example.com",
4294                "HttpMethod": "POST"
4295            }),
4296        );
4297        svc.create_api_destination(&req).unwrap();
4298    }
4299
4300    // -- ListConnections tests --
4301
4302    #[test]
4303    fn list_connections_returns_all_by_default() {
4304        let svc = make_service();
4305        create_connection(&svc, "conn-alpha");
4306        create_connection(&svc, "conn-beta");
4307        create_connection(&svc, "conn-gamma");
4308
4309        let req = make_request("ListConnections", json!({}));
4310        let resp = svc.list_connections(&req).unwrap();
4311        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4312        assert_eq!(body["Connections"].as_array().unwrap().len(), 3);
4313        assert!(body["NextToken"].is_null());
4314    }
4315
4316    #[test]
4317    fn list_connections_name_prefix_filter() {
4318        let svc = make_service();
4319        create_connection(&svc, "prod-conn-1");
4320        create_connection(&svc, "prod-conn-2");
4321        create_connection(&svc, "dev-conn-1");
4322
4323        let req = make_request("ListConnections", json!({ "NamePrefix": "prod-" }));
4324        let resp = svc.list_connections(&req).unwrap();
4325        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4326        let names: Vec<&str> = body["Connections"]
4327            .as_array()
4328            .unwrap()
4329            .iter()
4330            .map(|c| c["Name"].as_str().unwrap())
4331            .collect();
4332        assert_eq!(names.len(), 2);
4333        assert!(names.iter().all(|n| n.starts_with("prod-")));
4334    }
4335
4336    #[test]
4337    fn list_connections_state_filter() {
4338        let svc = make_service();
4339        create_connection(&svc, "conn-a");
4340        create_connection(&svc, "conn-b");
4341
4342        // All connections start as AUTHORIZED; change one
4343        {
4344            let mut state = svc.state.write();
4345            state
4346                .connections
4347                .get_mut("conn-b")
4348                .unwrap()
4349                .connection_state = "DEAUTHORIZED".to_string();
4350        }
4351
4352        let req = make_request(
4353            "ListConnections",
4354            json!({ "ConnectionState": "AUTHORIZED" }),
4355        );
4356        let resp = svc.list_connections(&req).unwrap();
4357        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4358        let conns = body["Connections"].as_array().unwrap();
4359        assert_eq!(conns.len(), 1);
4360        assert_eq!(conns[0]["Name"].as_str().unwrap(), "conn-a");
4361    }
4362
4363    #[test]
4364    fn list_connections_pagination() {
4365        let svc = make_service();
4366        for i in 0..5 {
4367            create_connection(&svc, &format!("conn-{i:02}"));
4368        }
4369
4370        // First page: limit 2
4371        let req = make_request("ListConnections", json!({ "Limit": 2 }));
4372        let resp = svc.list_connections(&req).unwrap();
4373        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4374        assert_eq!(body["Connections"].as_array().unwrap().len(), 2);
4375        let token = body["NextToken"].as_str().unwrap();
4376        assert_eq!(token, "2");
4377
4378        // Second page
4379        let req = make_request("ListConnections", json!({ "Limit": 2, "NextToken": token }));
4380        let resp = svc.list_connections(&req).unwrap();
4381        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4382        assert_eq!(body["Connections"].as_array().unwrap().len(), 2);
4383        let token = body["NextToken"].as_str().unwrap();
4384        assert_eq!(token, "4");
4385
4386        // Third page (only 1 remaining)
4387        let req = make_request("ListConnections", json!({ "Limit": 2, "NextToken": token }));
4388        let resp = svc.list_connections(&req).unwrap();
4389        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4390        assert_eq!(body["Connections"].as_array().unwrap().len(), 1);
4391        assert!(body["NextToken"].is_null());
4392    }
4393
4394    #[test]
4395    fn list_connections_pagination_with_filter() {
4396        let svc = make_service();
4397        for i in 0..4 {
4398            create_connection(&svc, &format!("prod-{i:02}"));
4399        }
4400        create_connection(&svc, "dev-00");
4401
4402        let req = make_request(
4403            "ListConnections",
4404            json!({ "NamePrefix": "prod-", "Limit": 2 }),
4405        );
4406        let resp = svc.list_connections(&req).unwrap();
4407        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4408        assert_eq!(body["Connections"].as_array().unwrap().len(), 2);
4409        assert!(body["NextToken"].as_str().is_some());
4410    }
4411
4412    // -- ListApiDestinations tests --
4413
4414    #[test]
4415    fn list_api_destinations_returns_all_by_default() {
4416        let svc = make_service();
4417        create_connection(&svc, "my-conn");
4418        create_api_destination(&svc, "dest-alpha", "my-conn");
4419        create_api_destination(&svc, "dest-beta", "my-conn");
4420
4421        let req = make_request("ListApiDestinations", json!({}));
4422        let resp = svc.list_api_destinations(&req).unwrap();
4423        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4424        assert_eq!(body["ApiDestinations"].as_array().unwrap().len(), 2);
4425        assert!(body["NextToken"].is_null());
4426    }
4427
4428    #[test]
4429    fn list_api_destinations_name_prefix_filter() {
4430        let svc = make_service();
4431        create_connection(&svc, "my-conn");
4432        create_api_destination(&svc, "prod-dest-1", "my-conn");
4433        create_api_destination(&svc, "prod-dest-2", "my-conn");
4434        create_api_destination(&svc, "dev-dest-1", "my-conn");
4435
4436        let req = make_request("ListApiDestinations", json!({ "NamePrefix": "prod-" }));
4437        let resp = svc.list_api_destinations(&req).unwrap();
4438        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4439        let names: Vec<&str> = body["ApiDestinations"]
4440            .as_array()
4441            .unwrap()
4442            .iter()
4443            .map(|d| d["Name"].as_str().unwrap())
4444            .collect();
4445        assert_eq!(names.len(), 2);
4446        assert!(names.iter().all(|n| n.starts_with("prod-")));
4447    }
4448
4449    #[test]
4450    fn list_api_destinations_connection_arn_filter() {
4451        let svc = make_service();
4452        create_connection(&svc, "conn-a");
4453        create_connection(&svc, "conn-b");
4454        create_api_destination(&svc, "dest-1", "conn-a");
4455        create_api_destination(&svc, "dest-2", "conn-b");
4456        create_api_destination(&svc, "dest-3", "conn-a");
4457
4458        let conn_a_arn = {
4459            let state = svc.state.read();
4460            state.connections.get("conn-a").unwrap().arn.clone()
4461        };
4462
4463        let req = make_request(
4464            "ListApiDestinations",
4465            json!({ "ConnectionArn": conn_a_arn }),
4466        );
4467        let resp = svc.list_api_destinations(&req).unwrap();
4468        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4469        let names: Vec<&str> = body["ApiDestinations"]
4470            .as_array()
4471            .unwrap()
4472            .iter()
4473            .map(|d| d["Name"].as_str().unwrap())
4474            .collect();
4475        assert_eq!(names.len(), 2);
4476        assert!(names.contains(&"dest-1"));
4477        assert!(names.contains(&"dest-3"));
4478    }
4479
4480    #[test]
4481    fn list_api_destinations_pagination() {
4482        let svc = make_service();
4483        create_connection(&svc, "my-conn");
4484        for i in 0..5 {
4485            create_api_destination(&svc, &format!("dest-{i:02}"), "my-conn");
4486        }
4487
4488        // First page
4489        let req = make_request("ListApiDestinations", json!({ "Limit": 2 }));
4490        let resp = svc.list_api_destinations(&req).unwrap();
4491        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4492        assert_eq!(body["ApiDestinations"].as_array().unwrap().len(), 2);
4493        let token = body["NextToken"].as_str().unwrap();
4494        assert_eq!(token, "2");
4495
4496        // Second page
4497        let req = make_request(
4498            "ListApiDestinations",
4499            json!({ "Limit": 2, "NextToken": token }),
4500        );
4501        let resp = svc.list_api_destinations(&req).unwrap();
4502        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4503        assert_eq!(body["ApiDestinations"].as_array().unwrap().len(), 2);
4504        let token = body["NextToken"].as_str().unwrap();
4505        assert_eq!(token, "4");
4506
4507        // Last page
4508        let req = make_request(
4509            "ListApiDestinations",
4510            json!({ "Limit": 2, "NextToken": token }),
4511        );
4512        let resp = svc.list_api_destinations(&req).unwrap();
4513        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4514        assert_eq!(body["ApiDestinations"].as_array().unwrap().len(), 1);
4515        assert!(body["NextToken"].is_null());
4516    }
4517
4518    // -- ListEventBuses pagination tests --
4519
4520    fn create_event_bus(svc: &EventBridgeService, name: &str) {
4521        let req = make_request("CreateEventBus", json!({ "Name": name }));
4522        svc.create_event_bus(&req).unwrap();
4523    }
4524
4525    #[test]
4526    fn list_event_buses_pagination() {
4527        let svc = make_service();
4528        // "default" bus already exists, create 4 more
4529        for i in 0..4 {
4530            create_event_bus(&svc, &format!("bus-{i:02}"));
4531        }
4532
4533        // First page: limit 2
4534        let req = make_request("ListEventBuses", json!({ "Limit": 2 }));
4535        let resp = svc.list_event_buses(&req).unwrap();
4536        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4537        assert_eq!(body["EventBuses"].as_array().unwrap().len(), 2);
4538        let token = body["NextToken"].as_str().unwrap();
4539        assert_eq!(token, "2");
4540
4541        // Second page
4542        let req = make_request("ListEventBuses", json!({ "Limit": 2, "NextToken": token }));
4543        let resp = svc.list_event_buses(&req).unwrap();
4544        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4545        assert_eq!(body["EventBuses"].as_array().unwrap().len(), 2);
4546        let token = body["NextToken"].as_str().unwrap();
4547        assert_eq!(token, "4");
4548
4549        // Third page (only 1 remaining)
4550        let req = make_request("ListEventBuses", json!({ "Limit": 2, "NextToken": token }));
4551        let resp = svc.list_event_buses(&req).unwrap();
4552        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4553        assert_eq!(body["EventBuses"].as_array().unwrap().len(), 1);
4554        assert!(body["NextToken"].is_null());
4555    }
4556
4557    #[test]
4558    fn list_event_buses_no_pagination_returns_all() {
4559        let svc = make_service();
4560        create_event_bus(&svc, "bus-alpha");
4561        create_event_bus(&svc, "bus-beta");
4562
4563        let req = make_request("ListEventBuses", json!({}));
4564        let resp = svc.list_event_buses(&req).unwrap();
4565        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4566        // default + 2 custom = 3
4567        assert_eq!(body["EventBuses"].as_array().unwrap().len(), 3);
4568        assert!(body["NextToken"].is_null());
4569    }
4570
4571    // -- PutEvents EndpointId tests --
4572
4573    #[test]
4574    fn put_events_never_includes_endpoint_id_in_response() {
4575        let svc = make_service();
4576        // Even when EndpointId is provided in the request, it must not appear in the response
4577        let req = make_request(
4578            "PutEvents",
4579            json!({
4580                "EndpointId": "my-endpoint.abc123",
4581                "Entries": [{
4582                    "Source": "my.source",
4583                    "DetailType": "MyType",
4584                    "Detail": "{}",
4585                    "EventBusName": "default"
4586                }]
4587            }),
4588        );
4589        let resp = svc.put_events(&req).unwrap();
4590        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4591        assert!(
4592            !body.as_object().unwrap().contains_key("EndpointId"),
4593            "EndpointId should never be in the PutEvents response"
4594        );
4595        assert_eq!(body["FailedEntryCount"], 0);
4596    }
4597
4598    // -- ListArchives pagination tests --
4599
4600    fn create_archive(svc: &EventBridgeService, name: &str) {
4601        let req = make_request(
4602            "CreateArchive",
4603            json!({
4604                "ArchiveName": name,
4605                "EventSourceArn": "arn:aws:events:us-east-1:123456789012:event-bus/default"
4606            }),
4607        );
4608        svc.create_archive(&req).unwrap();
4609    }
4610
4611    #[test]
4612    fn list_archives_pagination() {
4613        let svc = make_service();
4614        for i in 0..5 {
4615            create_archive(&svc, &format!("archive-{i:02}"));
4616        }
4617
4618        // First page: limit 2
4619        let req = make_request("ListArchives", json!({ "Limit": 2 }));
4620        let resp = svc.list_archives(&req).unwrap();
4621        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4622        assert_eq!(body["Archives"].as_array().unwrap().len(), 2);
4623        let token = body["NextToken"].as_str().unwrap();
4624        assert_eq!(token, "2");
4625
4626        // Second page
4627        let req = make_request("ListArchives", json!({ "Limit": 2, "NextToken": token }));
4628        let resp = svc.list_archives(&req).unwrap();
4629        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4630        assert_eq!(body["Archives"].as_array().unwrap().len(), 2);
4631        let token = body["NextToken"].as_str().unwrap();
4632        assert_eq!(token, "4");
4633
4634        // Third page (only 1 remaining)
4635        let req = make_request("ListArchives", json!({ "Limit": 2, "NextToken": token }));
4636        let resp = svc.list_archives(&req).unwrap();
4637        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4638        assert_eq!(body["Archives"].as_array().unwrap().len(), 1);
4639        assert!(body["NextToken"].is_null());
4640    }
4641
4642    // -- ListReplays pagination tests --
4643
4644    fn create_replay(svc: &EventBridgeService, name: &str) {
4645        // Need an archive first for the replay's event source
4646        let archive_arn = {
4647            let state = svc.state.read();
4648            if state.archives.contains_key("replay-archive") {
4649                state.archives["replay-archive"].arn.clone()
4650            } else {
4651                drop(state);
4652                create_archive(svc, "replay-archive");
4653                svc.state.read().archives["replay-archive"].arn.clone()
4654            }
4655        };
4656        let req = make_request(
4657            "StartReplay",
4658            json!({
4659                "ReplayName": name,
4660                "EventSourceArn": archive_arn,
4661                "EventStartTime": 1000000.0,
4662                "EventEndTime": 2000000.0,
4663                "Destination": {
4664                    "Arn": "arn:aws:events:us-east-1:123456789012:event-bus/default"
4665                }
4666            }),
4667        );
4668        svc.start_replay(&req).unwrap();
4669    }
4670
4671    #[test]
4672    fn list_replays_pagination() {
4673        let svc = make_service();
4674        for i in 0..5 {
4675            create_replay(&svc, &format!("replay-{i:02}"));
4676        }
4677
4678        // First page: limit 2
4679        let req = make_request("ListReplays", json!({ "Limit": 2 }));
4680        let resp = svc.list_replays(&req).unwrap();
4681        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4682        assert_eq!(body["Replays"].as_array().unwrap().len(), 2);
4683        let token = body["NextToken"].as_str().unwrap();
4684        assert_eq!(token, "2");
4685
4686        // Second page
4687        let req = make_request("ListReplays", json!({ "Limit": 2, "NextToken": token }));
4688        let resp = svc.list_replays(&req).unwrap();
4689        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4690        assert_eq!(body["Replays"].as_array().unwrap().len(), 2);
4691        let token = body["NextToken"].as_str().unwrap();
4692        assert_eq!(token, "4");
4693
4694        // Third page (only 1 remaining)
4695        let req = make_request("ListReplays", json!({ "Limit": 2, "NextToken": token }));
4696        let resp = svc.list_replays(&req).unwrap();
4697        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4698        assert_eq!(body["Replays"].as_array().unwrap().len(), 1);
4699        assert!(body["NextToken"].is_null());
4700    }
4701
4702    #[test]
4703    fn list_event_buses_invalid_next_token_returns_error() {
4704        let svc = make_service();
4705
4706        let req = make_request("ListEventBuses", json!({ "NextToken": "not-a-number" }));
4707        let result = svc.list_event_buses(&req);
4708        assert!(
4709            result.is_err(),
4710            "non-numeric NextToken should return an error"
4711        );
4712    }
4713
4714    // ---- TestEventPattern tests ----
4715
4716    #[test]
4717    fn test_event_pattern_match() {
4718        let svc = make_service();
4719        let req = make_request(
4720            "TestEventPattern",
4721            json!({
4722                "EventPattern": r#"{"source": ["my.app"]}"#,
4723                "Event": r#"{"source": "my.app", "detail-type": "Test", "detail": {}}"#
4724            }),
4725        );
4726        let resp = svc.test_event_pattern(&req).unwrap();
4727        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4728        assert_eq!(body["Result"], true);
4729    }
4730
4731    #[test]
4732    fn test_event_pattern_no_match() {
4733        let svc = make_service();
4734        let req = make_request(
4735            "TestEventPattern",
4736            json!({
4737                "EventPattern": r#"{"source": ["other.app"]}"#,
4738                "Event": r#"{"source": "my.app", "detail-type": "Test", "detail": {}}"#
4739            }),
4740        );
4741        let resp = svc.test_event_pattern(&req).unwrap();
4742        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4743        assert_eq!(body["Result"], false);
4744    }
4745
4746    #[test]
4747    fn test_event_pattern_detail_match() {
4748        let svc = make_service();
4749        let req = make_request(
4750            "TestEventPattern",
4751            json!({
4752                "EventPattern": r#"{"detail": {"status": ["PLACED"]}}"#,
4753                "Event": r#"{"source": "my.app", "detail-type": "Order", "detail": {"status": "PLACED", "id": "123"}}"#
4754            }),
4755        );
4756        let resp = svc.test_event_pattern(&req).unwrap();
4757        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4758        assert_eq!(body["Result"], true);
4759    }
4760
4761    // ---- UpdateEventBus tests ----
4762
4763    #[test]
4764    fn update_event_bus_description() {
4765        let svc = make_service();
4766        create_event_bus(&svc, "my-bus");
4767
4768        let req = make_request(
4769            "UpdateEventBus",
4770            json!({ "Name": "my-bus", "Description": "Updated desc" }),
4771        );
4772        let resp = svc.update_event_bus(&req).unwrap();
4773        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4774        assert_eq!(body["Name"], "my-bus");
4775
4776        // Verify via describe
4777        let req = make_request("DescribeEventBus", json!({ "Name": "my-bus" }));
4778        let resp = svc.describe_event_bus(&req).unwrap();
4779        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4780        assert_eq!(body["Description"], "Updated desc");
4781    }
4782
4783    #[test]
4784    fn update_event_bus_not_found() {
4785        let svc = make_service();
4786        let req = make_request(
4787            "UpdateEventBus",
4788            json!({ "Name": "ghost-bus", "Description": "nope" }),
4789        );
4790        assert!(svc.update_event_bus(&req).is_err());
4791    }
4792
4793    // ---- Endpoint CRUD tests ----
4794
4795    fn create_endpoint_helper(svc: &EventBridgeService, name: &str) {
4796        let req = make_request(
4797            "CreateEndpoint",
4798            json!({
4799                "Name": name,
4800                "RoutingConfig": {
4801                    "FailoverConfig": {
4802                        "Primary": { "HealthCheck": "" },
4803                        "Secondary": { "Route": "us-west-2" }
4804                    }
4805                },
4806                "EventBuses": [
4807                    { "EventBusArn": "arn:aws:events:us-east-1:123456789012:event-bus/default" }
4808                ]
4809            }),
4810        );
4811        svc.create_endpoint(&req).unwrap();
4812    }
4813
4814    #[test]
4815    fn endpoint_create_describe_delete() {
4816        let svc = make_service();
4817        create_endpoint_helper(&svc, "my-endpoint");
4818
4819        // Describe
4820        let req = make_request("DescribeEndpoint", json!({ "Name": "my-endpoint" }));
4821        let resp = svc.describe_endpoint(&req).unwrap();
4822        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4823        assert_eq!(body["Name"], "my-endpoint");
4824        assert_eq!(body["State"], "ACTIVE");
4825        assert!(body["EndpointId"].as_str().unwrap().contains("my-endpoint"));
4826
4827        // Delete
4828        let req = make_request("DeleteEndpoint", json!({ "Name": "my-endpoint" }));
4829        svc.delete_endpoint(&req).unwrap();
4830
4831        // Verify gone
4832        let req = make_request("DescribeEndpoint", json!({ "Name": "my-endpoint" }));
4833        assert!(svc.describe_endpoint(&req).is_err());
4834    }
4835
4836    #[test]
4837    fn endpoint_list_and_update() {
4838        let svc = make_service();
4839        create_endpoint_helper(&svc, "ep-alpha");
4840        create_endpoint_helper(&svc, "ep-beta");
4841
4842        // List all
4843        let req = make_request("ListEndpoints", json!({}));
4844        let resp = svc.list_endpoints(&req).unwrap();
4845        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4846        assert_eq!(body["Endpoints"].as_array().unwrap().len(), 2);
4847
4848        // Update
4849        let req = make_request(
4850            "UpdateEndpoint",
4851            json!({ "Name": "ep-alpha", "Description": "updated" }),
4852        );
4853        let resp = svc.update_endpoint(&req).unwrap();
4854        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4855        assert_eq!(body["Name"], "ep-alpha");
4856
4857        // Verify description
4858        let req = make_request("DescribeEndpoint", json!({ "Name": "ep-alpha" }));
4859        let resp = svc.describe_endpoint(&req).unwrap();
4860        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4861        assert_eq!(body["Description"], "updated");
4862    }
4863
4864    #[test]
4865    fn endpoint_duplicate_fails() {
4866        let svc = make_service();
4867        create_endpoint_helper(&svc, "dup-ep");
4868        let req = make_request(
4869            "CreateEndpoint",
4870            json!({
4871                "Name": "dup-ep",
4872                "RoutingConfig": {},
4873                "EventBuses": []
4874            }),
4875        );
4876        assert!(svc.create_endpoint(&req).is_err());
4877    }
4878
4879    // ---- DeauthorizeConnection tests ----
4880
4881    #[test]
4882    fn deauthorize_connection_sets_state() {
4883        let svc = make_service();
4884        create_connection(&svc, "deauth-conn");
4885
4886        let req = make_request("DeauthorizeConnection", json!({ "Name": "deauth-conn" }));
4887        let resp = svc.deauthorize_connection(&req).unwrap();
4888        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4889        assert_eq!(body["ConnectionState"], "DEAUTHORIZING");
4890        assert!(body["ConnectionArn"]
4891            .as_str()
4892            .unwrap()
4893            .contains("deauth-conn"));
4894
4895        // Verify via describe
4896        let req = make_request("DescribeConnection", json!({ "Name": "deauth-conn" }));
4897        let resp = svc.describe_connection(&req).unwrap();
4898        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4899        assert_eq!(body["ConnectionState"], "DEAUTHORIZING");
4900    }
4901
4902    #[test]
4903    fn deauthorize_connection_not_found() {
4904        let svc = make_service();
4905        let req = make_request("DeauthorizeConnection", json!({ "Name": "ghost-conn" }));
4906        assert!(svc.deauthorize_connection(&req).is_err());
4907    }
4908
4909    // ---- Partner event source tests ----
4910
4911    #[test]
4912    fn partner_event_source_crud() {
4913        let svc = make_service();
4914
4915        // Create
4916        let req = make_request(
4917            "CreatePartnerEventSource",
4918            json!({ "Name": "partner/test", "Account": "123456789012" }),
4919        );
4920        svc.create_partner_event_source(&req).unwrap();
4921
4922        // Describe
4923        let req = make_request(
4924            "DescribePartnerEventSource",
4925            json!({ "Name": "partner/test" }),
4926        );
4927        let resp = svc.describe_partner_event_source(&req).unwrap();
4928        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4929        assert_eq!(body["Name"], "partner/test");
4930
4931        // List
4932        let req = make_request("ListPartnerEventSources", json!({"NamePrefix": "partner/"}));
4933        let resp = svc.list_partner_event_sources(&req).unwrap();
4934        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4935        assert_eq!(body["PartnerEventSources"].as_array().unwrap().len(), 1);
4936
4937        // ListPartnerEventSourceAccounts
4938        let req = make_request(
4939            "ListPartnerEventSourceAccounts",
4940            json!({ "EventSourceName": "partner/test" }),
4941        );
4942        let resp = svc.list_partner_event_source_accounts(&req).unwrap();
4943        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4944        assert_eq!(
4945            body["PartnerEventSourceAccounts"].as_array().unwrap().len(),
4946            1
4947        );
4948
4949        // DescribeEventSource
4950        let req = make_request("DescribeEventSource", json!({ "Name": "partner/test" }));
4951        let resp = svc.describe_event_source(&req).unwrap();
4952        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4953        assert_eq!(body["Name"], "partner/test");
4954        assert_eq!(body["State"], "ACTIVE");
4955
4956        // ListEventSources
4957        let req = make_request("ListEventSources", json!({}));
4958        let resp = svc.list_event_sources(&req).unwrap();
4959        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4960        assert_eq!(body["EventSources"].as_array().unwrap().len(), 1);
4961
4962        // Delete
4963        let req = make_request(
4964            "DeletePartnerEventSource",
4965            json!({ "Name": "partner/test", "Account": "123456789012" }),
4966        );
4967        svc.delete_partner_event_source(&req).unwrap();
4968
4969        // Verify gone
4970        let req = make_request(
4971            "DescribePartnerEventSource",
4972            json!({ "Name": "partner/test" }),
4973        );
4974        assert!(svc.describe_partner_event_source(&req).is_err());
4975    }
4976
4977    #[test]
4978    fn activate_deactivate_event_source() {
4979        let svc = make_service();
4980
4981        // Create a partner event source first
4982        let req = make_request(
4983            "CreatePartnerEventSource",
4984            json!({ "Name": "aws.partner/test", "Account": "123456789012" }),
4985        );
4986        svc.create_partner_event_source(&req).unwrap();
4987
4988        // Deactivate it
4989        let req = make_request(
4990            "DeactivateEventSource",
4991            json!({ "Name": "aws.partner/test" }),
4992        );
4993        svc.deactivate_event_source(&req).unwrap();
4994        {
4995            let state = svc.state.read();
4996            assert_eq!(
4997                state.partner_event_sources["aws.partner/test"].state,
4998                "INACTIVE"
4999            );
5000        }
5001
5002        // Activate it
5003        let req = make_request("ActivateEventSource", json!({ "Name": "aws.partner/test" }));
5004        svc.activate_event_source(&req).unwrap();
5005        {
5006            let state = svc.state.read();
5007            assert_eq!(
5008                state.partner_event_sources["aws.partner/test"].state,
5009                "ACTIVE"
5010            );
5011        }
5012
5013        // Not-found returns error
5014        let req = make_request("ActivateEventSource", json!({ "Name": "nonexistent" }));
5015        assert!(svc.activate_event_source(&req).is_err());
5016
5017        let req = make_request("DeactivateEventSource", json!({ "Name": "nonexistent" }));
5018        assert!(svc.deactivate_event_source(&req).is_err());
5019    }
5020
5021    #[test]
5022    fn delete_partner_event_source_verifies_account() {
5023        let svc = make_service();
5024
5025        // Create a partner event source
5026        let req = make_request(
5027            "CreatePartnerEventSource",
5028            json!({ "Name": "aws.partner/test", "Account": "123456789012" }),
5029        );
5030        svc.create_partner_event_source(&req).unwrap();
5031
5032        // Deleting with wrong account fails
5033        let req = make_request(
5034            "DeletePartnerEventSource",
5035            json!({ "Name": "aws.partner/test", "Account": "999999999999" }),
5036        );
5037        assert!(svc.delete_partner_event_source(&req).is_err());
5038        // Source still exists
5039        assert!(svc
5040            .state
5041            .read()
5042            .partner_event_sources
5043            .contains_key("aws.partner/test"));
5044
5045        // Deleting with correct account succeeds
5046        let req = make_request(
5047            "DeletePartnerEventSource",
5048            json!({ "Name": "aws.partner/test", "Account": "123456789012" }),
5049        );
5050        svc.delete_partner_event_source(&req).unwrap();
5051        assert!(!svc
5052            .state
5053            .read()
5054            .partner_event_sources
5055            .contains_key("aws.partner/test"));
5056
5057        // Deleting non-existent source returns error
5058        let req = make_request(
5059            "DeletePartnerEventSource",
5060            json!({ "Name": "aws.partner/test", "Account": "123456789012" }),
5061        );
5062        assert!(svc.delete_partner_event_source(&req).is_err());
5063    }
5064
5065    #[test]
5066    fn put_partner_events() {
5067        let svc = make_service();
5068        let req = make_request(
5069            "PutPartnerEvents",
5070            json!({
5071                "Entries": [
5072                    { "Source": "partner.app", "DetailType": "Test", "Detail": "{}" }
5073                ]
5074            }),
5075        );
5076        let resp = svc.put_partner_events(&req).unwrap();
5077        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5078        assert_eq!(body["FailedEntryCount"], 0);
5079        assert_eq!(body["Entries"].as_array().unwrap().len(), 1);
5080        assert!(body["Entries"][0]["EventId"].as_str().is_some());
5081    }
5082
5083    // ---- Archive + Replay delivery tests ----
5084
5085    /// Helper: create a service with a mock SQS delivery that records messages.
5086    #[allow(clippy::type_complexity)]
5087    fn make_service_with_sqs_recorder() -> (
5088        EventBridgeService,
5089        Arc<parking_lot::Mutex<Vec<(String, String)>>>,
5090    ) {
5091        use fakecloud_core::delivery::SqsDelivery;
5092
5093        struct RecordingSqsDelivery {
5094            messages: Arc<parking_lot::Mutex<Vec<(String, String)>>>,
5095        }
5096
5097        impl SqsDelivery for RecordingSqsDelivery {
5098            fn deliver_to_queue(
5099                &self,
5100                queue_arn: &str,
5101                message_body: &str,
5102                _attributes: &HashMap<String, String>,
5103            ) {
5104                self.messages
5105                    .lock()
5106                    .push((queue_arn.to_string(), message_body.to_string()));
5107            }
5108        }
5109
5110        let messages: Arc<parking_lot::Mutex<Vec<(String, String)>>> =
5111            Arc::new(parking_lot::Mutex::new(Vec::new()));
5112        let state = Arc::new(RwLock::new(EventBridgeState::new(
5113            "123456789012",
5114            "us-east-1",
5115        )));
5116        let delivery = Arc::new(DeliveryBus::new().with_sqs(Arc::new(RecordingSqsDelivery {
5117            messages: messages.clone(),
5118        })));
5119        let svc = EventBridgeService::new(state, delivery);
5120        (svc, messages)
5121    }
5122
5123    #[test]
5124    fn start_replay_delivers_archived_events_to_sqs_target() {
5125        let (svc, messages) = make_service_with_sqs_recorder();
5126        let queue_arn = "arn:aws:sqs:us-east-1:123456789012:replay-queue";
5127
5128        // Create a rule with an SQS target
5129        let req = make_request(
5130            "PutRule",
5131            json!({
5132                "Name": "replay-test-rule",
5133                "EventPattern": r#"{"source": ["my.app"]}"#,
5134                "State": "ENABLED"
5135            }),
5136        );
5137        svc.put_rule(&req).unwrap();
5138
5139        let req = make_request(
5140            "PutTargets",
5141            json!({
5142                "Rule": "replay-test-rule",
5143                "Targets": [{
5144                    "Id": "sqs-target",
5145                    "Arn": queue_arn
5146                }]
5147            }),
5148        );
5149        svc.put_targets(&req).unwrap();
5150
5151        // Create an archive on the default bus
5152        let req = make_request(
5153            "CreateArchive",
5154            json!({
5155                "ArchiveName": "test-archive",
5156                "EventSourceArn": "arn:aws:events:us-east-1:123456789012:event-bus/default"
5157            }),
5158        );
5159        svc.create_archive(&req).unwrap();
5160
5161        // PutEvents: these should get archived and delivered
5162        let req = make_request(
5163            "PutEvents",
5164            json!({
5165                "Entries": [
5166                    {
5167                        "Source": "my.app",
5168                        "DetailType": "OrderCreated",
5169                        "Detail": "{\"orderId\": \"1\"}",
5170                        "EventBusName": "default"
5171                    },
5172                    {
5173                        "Source": "my.app",
5174                        "DetailType": "OrderShipped",
5175                        "Detail": "{\"orderId\": \"2\"}",
5176                        "EventBusName": "default"
5177                    }
5178                ]
5179            }),
5180        );
5181        let resp = svc.put_events(&req).unwrap();
5182        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5183        assert_eq!(body["FailedEntryCount"], 0);
5184
5185        // Verify archive has 2 events
5186        {
5187            let state = svc.state.read();
5188            let archive = state.archives.get("test-archive").unwrap();
5189            assert_eq!(archive.events.len(), 2);
5190            assert_eq!(archive.event_count, 2);
5191        }
5192
5193        // Clear recorded messages from PutEvents delivery
5194        messages.lock().clear();
5195
5196        // StartReplay: should re-deliver the archived events
5197        let archive_arn = {
5198            let state = svc.state.read();
5199            state.archives.get("test-archive").unwrap().arn.clone()
5200        };
5201
5202        // Use a wide time range to capture all events
5203        let start_ts = 0.0_f64;
5204        let end_ts = (chrono::Utc::now().timestamp() + 3600) as f64;
5205
5206        let req = make_request(
5207            "StartReplay",
5208            json!({
5209                "ReplayName": "my-replay",
5210                "EventSourceArn": archive_arn,
5211                "Destination": {
5212                    "Arn": "arn:aws:events:us-east-1:123456789012:event-bus/default"
5213                },
5214                "EventStartTime": start_ts,
5215                "EventEndTime": end_ts
5216            }),
5217        );
5218        let resp = svc.start_replay(&req).unwrap();
5219        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5220        assert_eq!(body["State"], "STARTING");
5221
5222        // Verify the replay delivered events to SQS
5223        let delivered = messages.lock();
5224        assert_eq!(
5225            delivered.len(),
5226            2,
5227            "expected 2 replayed events delivered to SQS"
5228        );
5229        for (arn, msg) in delivered.iter() {
5230            assert_eq!(arn, queue_arn);
5231            let event: Value = serde_json::from_str(msg).unwrap();
5232            assert_eq!(event["source"], "my.app");
5233            // Replayed events should include replay-name
5234            assert!(event["replay-name"].as_str().is_some());
5235        }
5236
5237        // Verify replay is marked as COMPLETED
5238        let state = svc.state.read();
5239        let replay = state.replays.get("my-replay").unwrap();
5240        assert_eq!(replay.state, "COMPLETED");
5241    }
5242
5243    #[test]
5244    fn apply_connection_auth_api_key() {
5245        let conn = Connection {
5246            name: "test-conn".to_string(),
5247            arn: "arn:aws:events:us-east-1:123456789012:connection/test-conn/uuid".to_string(),
5248            description: None,
5249            authorization_type: "API_KEY".to_string(),
5250            auth_parameters: json!({
5251                "ApiKeyAuthParameters": {
5252                    "ApiKeyName": "x-api-key",
5253                    "ApiKeyValue": "my-secret"
5254                }
5255            }),
5256            connection_state: "AUTHORIZED".to_string(),
5257            secret_arn: "arn:aws:secretsmanager:us-east-1:123456789012:secret:test".to_string(),
5258            creation_time: Utc::now(),
5259            last_modified_time: Utc::now(),
5260            last_authorized_time: Utc::now(),
5261        };
5262
5263        let client = reqwest::Client::new();
5264        let builder = client
5265            .post("http://localhost:12345/test")
5266            .header("Content-Type", "application/json");
5267        let builder = apply_connection_auth(builder, &conn);
5268
5269        // Build and verify the header was applied
5270        let request = builder.body("{}").build().unwrap();
5271        assert_eq!(
5272            request
5273                .headers()
5274                .get("x-api-key")
5275                .unwrap()
5276                .to_str()
5277                .unwrap(),
5278            "my-secret"
5279        );
5280    }
5281
5282    #[test]
5283    fn apply_connection_auth_basic() {
5284        let conn = Connection {
5285            name: "basic-conn".to_string(),
5286            arn: "arn:aws:events:us-east-1:123456789012:connection/basic-conn/uuid".to_string(),
5287            description: None,
5288            authorization_type: "BASIC".to_string(),
5289            auth_parameters: json!({
5290                "BasicAuthParameters": {
5291                    "Username": "user",
5292                    "Password": "pass"
5293                }
5294            }),
5295            connection_state: "AUTHORIZED".to_string(),
5296            secret_arn: "arn:aws:secretsmanager:us-east-1:123456789012:secret:test".to_string(),
5297            creation_time: Utc::now(),
5298            last_modified_time: Utc::now(),
5299            last_authorized_time: Utc::now(),
5300        };
5301
5302        let client = reqwest::Client::new();
5303        let builder = client.post("http://localhost:12345/test");
5304        let builder = apply_connection_auth(builder, &conn);
5305
5306        let request = builder.body("{}").build().unwrap();
5307        let auth_header = request
5308            .headers()
5309            .get("authorization")
5310            .unwrap()
5311            .to_str()
5312            .unwrap();
5313        assert!(
5314            auth_header.starts_with("Basic "),
5315            "Expected Basic auth header, got: {auth_header}"
5316        );
5317    }
5318
5319    #[tokio::test]
5320    async fn put_events_with_api_destination_target_resolves_destination() {
5321        // This test verifies that the PutEvents code path correctly identifies
5322        // api-destination ARN targets and resolves the destination metadata.
5323        // The actual HTTP call goes to a non-existent host (fire-and-forget).
5324        let state = Arc::new(RwLock::new(EventBridgeState::new(
5325            "123456789012",
5326            "us-east-1",
5327        )));
5328        let delivery = Arc::new(DeliveryBus::new());
5329        let svc = EventBridgeService::new(state, delivery);
5330
5331        // Create connection and api destination
5332        create_connection(&svc, "my-conn");
5333        let conn_arn = {
5334            let state = svc.state.read();
5335            state.connections.get("my-conn").unwrap().arn.clone()
5336        };
5337        let req = make_request(
5338            "CreateApiDestination",
5339            json!({
5340                "Name": "my-dest",
5341                "ConnectionArn": conn_arn,
5342                "InvocationEndpoint": "http://127.0.0.1:1/noop",
5343                "HttpMethod": "POST"
5344            }),
5345        );
5346        svc.create_api_destination(&req).unwrap();
5347
5348        let dest_arn = {
5349            let state = svc.state.read();
5350            state.api_destinations.get("my-dest").unwrap().arn.clone()
5351        };
5352
5353        // Create a rule that targets the api-destination
5354        let req = make_request(
5355            "PutRule",
5356            json!({
5357                "Name": "api-dest-rule",
5358                "EventPattern": r#"{"source":["test.app"]}"#,
5359                "State": "ENABLED"
5360            }),
5361        );
5362        svc.put_rule(&req).unwrap();
5363
5364        let req = make_request(
5365            "PutTargets",
5366            json!({
5367                "Rule": "api-dest-rule",
5368                "Targets": [{ "Id": "dest-target", "Arn": dest_arn }]
5369            }),
5370        );
5371        svc.put_targets(&req).unwrap();
5372
5373        // PutEvents - should match the rule and attempt delivery to ApiDestination
5374        let req = make_request(
5375            "PutEvents",
5376            json!({
5377                "Entries": [{
5378                    "Source": "test.app",
5379                    "DetailType": "TestEvent",
5380                    "Detail": r#"{"key":"value"}"#
5381                }]
5382            }),
5383        );
5384        let resp = svc.put_events(&req).unwrap();
5385        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5386        assert_eq!(body["FailedEntryCount"], 0);
5387        assert_eq!(body["Entries"].as_array().unwrap().len(), 1);
5388        assert!(body["Entries"][0]["EventId"].as_str().is_some());
5389    }
5390
5391    #[test]
5392    fn test_function_name_from_arn() {
5393        // Unqualified ARN
5394        assert_eq!(
5395            super::function_name_from_arn("arn:aws:lambda:us-east-1:123456789012:function:my-func"),
5396            "my-func"
5397        );
5398        // Qualified ARN with alias
5399        assert_eq!(
5400            super::function_name_from_arn(
5401                "arn:aws:lambda:us-east-1:123456789012:function:my-func:prod"
5402            ),
5403            "my-func"
5404        );
5405        // Qualified ARN with version
5406        assert_eq!(
5407            super::function_name_from_arn(
5408                "arn:aws:lambda:us-east-1:123456789012:function:my-func:42"
5409            ),
5410            "my-func"
5411        );
5412        // Plain function name (not an ARN)
5413        assert_eq!(super::function_name_from_arn("my-func"), "my-func");
5414    }
5415}