Skip to main content

fakecloud_eventbridge/
service.rs

1use async_trait::async_trait;
2use chrono::{DateTime, Utc};
3use http::StatusCode;
4use serde_json::{json, Value};
5
6use std::collections::HashMap;
7use std::sync::Arc;
8
9use fakecloud_aws::arn::Arn;
10use fakecloud_core::delivery::DeliveryBus;
11use fakecloud_core::pagination::paginate;
12use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
13use fakecloud_core::validation::*;
14
15use fakecloud_lambda::runtime::ContainerRuntime;
16use fakecloud_lambda::state::{LambdaInvocation, SharedLambdaState};
17use fakecloud_logs::state::SharedLogsState;
18
19use crate::state::{
20    ApiDestination, Archive, Connection, Endpoint, EventBus, EventRule, EventTarget,
21    PartnerEventSource, PutEvent, Replay, SharedEventBridgeState,
22};
23
24/// Validate a single `PutEvents` entry's required fields (`Source`,
25/// `DetailType`, `Detail`) and that `Detail` is a well-formed JSON
26/// object. Returns the JSON error body AWS surfaces in the matching
27/// `Entries[]` slot on failure.
28fn validate_put_events_entry(source: &str, detail_type: &str, detail: &str) -> Result<(), Value> {
29    if source.is_empty() {
30        return Err(json!({
31            "ErrorCode": "InvalidArgument",
32            "ErrorMessage": "Parameter Source is not valid. Reason: Source is a required argument.",
33        }));
34    }
35    if detail_type.is_empty() {
36        return Err(json!({
37            "ErrorCode": "InvalidArgument",
38            "ErrorMessage": "Parameter DetailType is not valid. Reason: DetailType is a required argument.",
39        }));
40    }
41    if detail.is_empty() {
42        return Err(json!({
43            "ErrorCode": "InvalidArgument",
44            "ErrorMessage": "Parameter Detail is not valid. Reason: Detail is a required argument.",
45        }));
46    }
47    if serde_json::from_str::<Value>(detail).is_err() {
48        return Err(json!({
49            "ErrorCode": "MalformedDetail",
50            "ErrorMessage": "Detail is malformed.",
51        }));
52    }
53    Ok(())
54}
55
56/// Parse an entry's `Time` field, tolerating the three formats AWS
57/// accepts (RFC 3339 string, fractional seconds as a float, integer
58/// seconds). Falls back to "now" if the field is absent or
59/// unparseable, which matches the real service.
60fn parse_put_events_time(raw: &Value) -> DateTime<Utc> {
61    if let Some(s) = raw.as_str() {
62        return DateTime::parse_from_rfc3339(s)
63            .map(|dt| dt.with_timezone(&Utc))
64            .unwrap_or_else(|_| Utc::now());
65    }
66    if let Some(ts) = raw.as_f64() {
67        return DateTime::from_timestamp(ts as i64, ((ts.fract()) * 1_000_000_000.0) as u32)
68            .unwrap_or_else(Utc::now);
69    }
70    if let Some(ts) = raw.as_i64() {
71        return DateTime::from_timestamp(ts, 0).unwrap_or_else(Utc::now);
72    }
73    Utc::now()
74}
75
76pub struct EventBridgeService {
77    state: SharedEventBridgeState,
78    delivery: Arc<DeliveryBus>,
79    lambda_state: Option<SharedLambdaState>,
80    logs_state: Option<SharedLogsState>,
81    container_runtime: Option<Arc<ContainerRuntime>>,
82}
83
84impl EventBridgeService {
85    pub fn new(state: SharedEventBridgeState, delivery: Arc<DeliveryBus>) -> Self {
86        Self {
87            state,
88            delivery,
89            lambda_state: None,
90            logs_state: None,
91            container_runtime: None,
92        }
93    }
94
95    pub fn with_lambda(mut self, lambda_state: SharedLambdaState) -> Self {
96        self.lambda_state = Some(lambda_state);
97        self
98    }
99
100    pub fn with_logs(mut self, logs_state: SharedLogsState) -> Self {
101        self.logs_state = Some(logs_state);
102        self
103    }
104
105    pub fn with_runtime(mut self, runtime: Arc<ContainerRuntime>) -> Self {
106        self.container_runtime = Some(runtime);
107        self
108    }
109}
110
111#[async_trait]
112impl AwsService for EventBridgeService {
113    fn service_name(&self) -> &str {
114        "events"
115    }
116
117    async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
118        match req.action.as_str() {
119            "CreateEventBus" => self.create_event_bus(&req),
120            "DeleteEventBus" => self.delete_event_bus(&req),
121            "ListEventBuses" => self.list_event_buses(&req),
122            "DescribeEventBus" => self.describe_event_bus(&req),
123            "PutRule" => self.put_rule(&req),
124            "DeleteRule" => self.delete_rule(&req),
125            "ListRules" => self.list_rules(&req),
126            "DescribeRule" => self.describe_rule(&req),
127            "EnableRule" => self.enable_rule(&req),
128            "DisableRule" => self.disable_rule(&req),
129            "PutTargets" => self.put_targets(&req),
130            "RemoveTargets" => self.remove_targets(&req),
131            "ListTargetsByRule" => self.list_targets_by_rule(&req),
132            "ListRuleNamesByTarget" => self.list_rule_names_by_target(&req),
133            "PutEvents" => self.put_events(&req),
134            "PutPermission" => self.put_permission(&req),
135            "RemovePermission" => self.remove_permission(&req),
136            "TagResource" => self.tag_resource(&req),
137            "UntagResource" => self.untag_resource(&req),
138            "ListTagsForResource" => self.list_tags_for_resource(&req),
139            "CreateArchive" => self.create_archive(&req),
140            "DescribeArchive" => self.describe_archive(&req),
141            "ListArchives" => self.list_archives(&req),
142            "UpdateArchive" => self.update_archive(&req),
143            "DeleteArchive" => self.delete_archive(&req),
144            "CreateConnection" => self.create_connection(&req),
145            "DescribeConnection" => self.describe_connection(&req),
146            "ListConnections" => self.list_connections(&req),
147            "UpdateConnection" => self.update_connection(&req),
148            "DeleteConnection" => self.delete_connection(&req),
149            "CreateApiDestination" => self.create_api_destination(&req),
150            "DescribeApiDestination" => self.describe_api_destination(&req),
151            "ListApiDestinations" => self.list_api_destinations(&req),
152            "UpdateApiDestination" => self.update_api_destination(&req),
153            "DeleteApiDestination" => self.delete_api_destination(&req),
154            "StartReplay" => self.start_replay(&req),
155            "DescribeReplay" => self.describe_replay(&req),
156            "ListReplays" => self.list_replays(&req),
157            "CancelReplay" => self.cancel_replay(&req),
158            "CreatePartnerEventSource" => self.create_partner_event_source(&req),
159            "DeletePartnerEventSource" => self.delete_partner_event_source(&req),
160            "DescribePartnerEventSource" => self.describe_partner_event_source(&req),
161            "ListPartnerEventSources" => self.list_partner_event_sources(&req),
162            "ListPartnerEventSourceAccounts" => self.list_partner_event_source_accounts(&req),
163            "ActivateEventSource" => self.activate_event_source(&req),
164            "DeactivateEventSource" => self.deactivate_event_source(&req),
165            "DescribeEventSource" => self.describe_event_source(&req),
166            "ListEventSources" => self.list_event_sources(&req),
167            "PutPartnerEvents" => self.put_partner_events(&req),
168            "TestEventPattern" => self.test_event_pattern(&req),
169            "UpdateEventBus" => self.update_event_bus(&req),
170            "CreateEndpoint" => self.create_endpoint(&req),
171            "DeleteEndpoint" => self.delete_endpoint(&req),
172            "DescribeEndpoint" => self.describe_endpoint(&req),
173            "ListEndpoints" => self.list_endpoints(&req),
174            "UpdateEndpoint" => self.update_endpoint(&req),
175            "DeauthorizeConnection" => self.deauthorize_connection(&req),
176            _ => Err(AwsServiceError::action_not_implemented(
177                "events",
178                &req.action,
179            )),
180        }
181    }
182
183    fn supported_actions(&self) -> &[&str] {
184        &[
185            "CreateEventBus",
186            "DeleteEventBus",
187            "ListEventBuses",
188            "DescribeEventBus",
189            "PutRule",
190            "DeleteRule",
191            "ListRules",
192            "DescribeRule",
193            "EnableRule",
194            "DisableRule",
195            "PutTargets",
196            "RemoveTargets",
197            "ListTargetsByRule",
198            "ListRuleNamesByTarget",
199            "PutEvents",
200            "PutPermission",
201            "RemovePermission",
202            "TagResource",
203            "UntagResource",
204            "ListTagsForResource",
205            "CreateArchive",
206            "DescribeArchive",
207            "ListArchives",
208            "UpdateArchive",
209            "DeleteArchive",
210            "CreateConnection",
211            "DescribeConnection",
212            "ListConnections",
213            "UpdateConnection",
214            "DeleteConnection",
215            "CreateApiDestination",
216            "DescribeApiDestination",
217            "ListApiDestinations",
218            "UpdateApiDestination",
219            "DeleteApiDestination",
220            "StartReplay",
221            "DescribeReplay",
222            "ListReplays",
223            "CancelReplay",
224            "CreatePartnerEventSource",
225            "DeletePartnerEventSource",
226            "DescribePartnerEventSource",
227            "ListPartnerEventSources",
228            "ListPartnerEventSourceAccounts",
229            "ActivateEventSource",
230            "DeactivateEventSource",
231            "DescribeEventSource",
232            "ListEventSources",
233            "PutPartnerEvents",
234            "TestEventPattern",
235            "UpdateEventBus",
236            "CreateEndpoint",
237            "DeleteEndpoint",
238            "DescribeEndpoint",
239            "ListEndpoints",
240            "UpdateEndpoint",
241            "DeauthorizeConnection",
242        ]
243    }
244}
245
246fn parse_tags(body: &Value) -> HashMap<String, String> {
247    let mut tags = HashMap::new();
248    if let Some(arr) = body["Tags"].as_array() {
249        for tag in arr {
250            if let (Some(key), Some(val)) = (tag["Key"].as_str(), tag["Value"].as_str()) {
251                tags.insert(key.to_string(), val.to_string());
252            }
253        }
254    }
255    tags
256}
257
258fn parse_target(target: &Value) -> EventTarget {
259    EventTarget {
260        id: target["Id"].as_str().unwrap_or("").to_string(),
261        arn: target["Arn"].as_str().unwrap_or("").to_string(),
262        input: target["Input"].as_str().map(|s| s.to_string()),
263        input_path: target["InputPath"].as_str().map(|s| s.to_string()),
264        input_transformer: target.get("InputTransformer").cloned(),
265        sqs_parameters: target.get("SqsParameters").cloned(),
266    }
267}
268
269fn target_to_json(t: &EventTarget) -> Value {
270    let mut obj = json!({ "Id": t.id, "Arn": t.arn });
271    if let Some(ref input) = t.input {
272        obj["Input"] = json!(input);
273    }
274    if let Some(ref input_path) = t.input_path {
275        obj["InputPath"] = json!(input_path);
276    }
277    if let Some(ref it) = t.input_transformer {
278        obj["InputTransformer"] = it.clone();
279    }
280    if let Some(ref sp) = t.sqs_parameters {
281        obj["SqsParameters"] = sp.clone();
282    }
283    obj
284}
285
286// ─── Event Bus Operations ───────────────────────────────────────────
287impl EventBridgeService {
288    fn create_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
289        let body = req.json_body();
290        validate_required("Name", &body["Name"])?;
291        let name = body["Name"]
292            .as_str()
293            .ok_or_else(|| missing("Name"))?
294            .to_string();
295        validate_string_length("name", &name, 1, 256)?;
296        validate_optional_string_length(
297            "eventSourceName",
298            body["EventSourceName"].as_str(),
299            1,
300            256,
301        )?;
302        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
303        validate_optional_string_length(
304            "kmsKeyIdentifier",
305            body["KmsKeyIdentifier"].as_str(),
306            0,
307            2048,
308        )?;
309
310        // Validate name doesn't contain '/' (unless partner bus)
311        if name.contains('/') && !name.starts_with("aws.partner/") {
312            return Err(AwsServiceError::aws_error(
313                StatusCode::BAD_REQUEST,
314                "ValidationException",
315                "Event bus name must not contain '/'.",
316            ));
317        }
318
319        // Partner event bus validation
320        if name.starts_with("aws.partner/") {
321            let event_source = body["EventSourceName"].as_str().unwrap_or("");
322            let state_r = self.state.read();
323            let has_source = state_r.partner_event_sources.contains_key(event_source);
324            drop(state_r);
325            if !has_source {
326                return Err(AwsServiceError::aws_error(
327                    StatusCode::BAD_REQUEST,
328                    "ResourceNotFoundException",
329                    format!("Event source {event_source} does not exist."),
330                ));
331            }
332        }
333
334        let mut state = self.state.write();
335
336        if state.buses.contains_key(&name) {
337            return Err(AwsServiceError::aws_error(
338                StatusCode::BAD_REQUEST,
339                "ResourceAlreadyExistsException",
340                format!("Event bus {name} already exists."),
341            ));
342        }
343
344        let arn = format!(
345            "arn:aws:events:{}:{}:event-bus/{}",
346            req.region, state.account_id, name
347        );
348        let now = Utc::now();
349        let description = body["Description"].as_str().map(|s| s.to_string());
350        let kms_key_identifier = body["KmsKeyIdentifier"].as_str().map(|s| s.to_string());
351        let dead_letter_config = body.get("DeadLetterConfig").cloned();
352
353        let tags = parse_tags(&body);
354
355        let bus = EventBus {
356            name: name.clone(),
357            arn: arn.clone(),
358            tags,
359            policy: None,
360            description,
361            kms_key_identifier,
362            dead_letter_config,
363            creation_time: now,
364            last_modified_time: now,
365        };
366        state.buses.insert(name, bus);
367
368        Ok(AwsResponse::ok_json(json!({ "EventBusArn": arn })))
369    }
370
371    fn delete_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
372        let body = req.json_body();
373        validate_required("Name", &body["Name"])?;
374        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
375        validate_string_length("name", name, 1, 256)?;
376
377        if name == "default" {
378            return Err(AwsServiceError::aws_error(
379                StatusCode::BAD_REQUEST,
380                "ValidationException",
381                format!("Cannot delete event bus {name}."),
382            ));
383        }
384
385        let mut state = self.state.write();
386        state.buses.remove(name);
387        state.rules.retain(|k, _| k.0 != name);
388
389        Ok(AwsResponse::ok_json(json!({})))
390    }
391
392    fn list_event_buses(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
393        let body = req.json_body();
394        validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 256)?;
395        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
396        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
397        let name_prefix = body["NamePrefix"].as_str();
398        let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
399        if let Some(t) = body["NextToken"].as_str() {
400            t.parse::<usize>().map_err(|_| {
401                AwsServiceError::aws_error(
402                    StatusCode::BAD_REQUEST,
403                    "InvalidNextTokenException",
404                    format!("Invalid NextToken value: '{t}'"),
405                )
406            })?;
407        }
408
409        let state = self.state.read();
410        let filtered: Vec<&_> = state
411            .buses
412            .values()
413            .filter(|b| match name_prefix {
414                Some(prefix) => b.name.starts_with(prefix),
415                None => true,
416            })
417            .collect();
418
419        let (page, next_token) = paginate(&filtered, body["NextToken"].as_str(), limit);
420        let buses: Vec<Value> = page
421            .iter()
422            .map(|b| json!({ "Name": b.name, "Arn": b.arn }))
423            .collect();
424        let mut resp = json!({ "EventBuses": buses });
425        if let Some(token) = next_token {
426            resp["NextToken"] = json!(token);
427        }
428
429        Ok(AwsResponse::ok_json(resp))
430    }
431
432    fn describe_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
433        let body = req.json_body();
434        validate_optional_string_length("name", body["Name"].as_str(), 1, 1600)?;
435        let name = body["Name"].as_str().unwrap_or("default");
436
437        let state = self.state.read();
438        let bus = state.buses.get(name).ok_or_else(|| {
439            AwsServiceError::aws_error(
440                StatusCode::BAD_REQUEST,
441                "ResourceNotFoundException",
442                format!("Event bus {name} does not exist."),
443            )
444        })?;
445
446        let mut resp = json!({
447            "Name": bus.name,
448            "Arn": bus.arn,
449            "CreationTime": bus.creation_time.timestamp() as f64,
450            "LastModifiedTime": bus.last_modified_time.timestamp() as f64,
451        });
452
453        if let Some(ref policy) = bus.policy {
454            resp["Policy"] = Value::String(serde_json::to_string(policy).unwrap());
455        }
456        if let Some(ref desc) = bus.description {
457            resp["Description"] = json!(desc);
458        }
459        if let Some(ref kms) = bus.kms_key_identifier {
460            resp["KmsKeyIdentifier"] = json!(kms);
461        }
462        if let Some(ref dlc) = bus.dead_letter_config {
463            resp["DeadLetterConfig"] = dlc.clone();
464        }
465
466        Ok(AwsResponse::ok_json(resp))
467    }
468
469    // ─── Permission Operations ──────────────────────────────────────────
470
471    fn put_permission(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
472        let body = req.json_body();
473        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 256)?;
474        validate_optional_string_length("action", body["Action"].as_str(), 1, 64)?;
475        validate_optional_string_length("principal", body["Principal"].as_str(), 1, 12)?;
476        validate_optional_string_length("statementId", body["StatementId"].as_str(), 1, 64)?;
477        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
478
479        let mut state = self.state.write();
480
481        let bus = state.buses.get_mut(event_bus_name).ok_or_else(|| {
482            AwsServiceError::aws_error(
483                StatusCode::BAD_REQUEST,
484                "ResourceNotFoundException",
485                format!("Event bus {event_bus_name} does not exist."),
486            )
487        })?;
488
489        // Check if Policy is provided (new-style)
490        if let Some(policy_str) = body["Policy"].as_str() {
491            if let Ok(policy) = serde_json::from_str::<Value>(policy_str) {
492                bus.policy = Some(policy);
493                return Ok(AwsResponse::ok_json(json!({})));
494            }
495        }
496
497        // Old-style: Action, Principal, StatementId
498        let action = body["Action"].as_str().unwrap_or("");
499        let principal = body["Principal"].as_str().unwrap_or("");
500        let statement_id = body["StatementId"].as_str().unwrap_or("");
501
502        // Validate action
503        if action != "events:PutEvents" {
504            return Err(AwsServiceError::aws_error(
505                StatusCode::BAD_REQUEST,
506                "ValidationException",
507                "Provided value in parameter 'action' is not supported.",
508            ));
509        }
510
511        let statement = json!({
512            "Sid": statement_id,
513            "Effect": "Allow",
514            "Principal": { "AWS": Arn::global("iam", principal, "root").to_string() },
515            "Action": action,
516            "Resource": bus.arn,
517        });
518
519        let policy = bus.policy.get_or_insert_with(|| {
520            json!({
521                "Version": "2012-10-17",
522                "Statement": [],
523            })
524        });
525
526        if let Some(stmts) = policy["Statement"].as_array_mut() {
527            stmts.push(statement);
528        }
529
530        Ok(AwsResponse::ok_json(json!({})))
531    }
532
533    fn remove_permission(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
534        let body = req.json_body();
535        validate_optional_string_length("statementId", body["StatementId"].as_str(), 1, 64)?;
536        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 256)?;
537        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
538        let statement_id = body["StatementId"].as_str().unwrap_or("");
539        let remove_all = body["RemoveAllPermissions"].as_bool().unwrap_or(false);
540
541        let mut state = self.state.write();
542
543        let bus = state.buses.get_mut(event_bus_name).ok_or_else(|| {
544            AwsServiceError::aws_error(
545                StatusCode::BAD_REQUEST,
546                "ResourceNotFoundException",
547                format!("Event bus {event_bus_name} does not exist."),
548            )
549        })?;
550
551        if remove_all {
552            bus.policy = None;
553            return Ok(AwsResponse::ok_json(json!({})));
554        }
555
556        let policy = bus.policy.as_mut().ok_or_else(|| {
557            AwsServiceError::aws_error(
558                StatusCode::BAD_REQUEST,
559                "ResourceNotFoundException",
560                "EventBus does not have a policy.",
561            )
562        })?;
563
564        if let Some(stmts) = policy["Statement"].as_array_mut() {
565            let before = stmts.len();
566            stmts.retain(|s| s["Sid"].as_str() != Some(statement_id));
567            if stmts.len() == before {
568                return Err(AwsServiceError::aws_error(
569                    StatusCode::BAD_REQUEST,
570                    "ResourceNotFoundException",
571                    "Statement with the provided id does not exist.",
572                ));
573            }
574            if stmts.is_empty() {
575                bus.policy = None;
576            }
577        }
578
579        Ok(AwsResponse::ok_json(json!({})))
580    }
581
582    // ─── Rule Operations ────────────────────────────────────────────────
583
584    fn put_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
585        let body = req.json_body();
586        validate_required("Name", &body["Name"])?;
587        let name = body["Name"]
588            .as_str()
589            .ok_or_else(|| missing("Name"))?
590            .to_string();
591        validate_string_length("name", &name, 1, 64)?;
592        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
593        validate_optional_string_length(
594            "scheduleExpression",
595            body["ScheduleExpression"].as_str(),
596            0,
597            256,
598        )?;
599        validate_optional_string_length("eventPattern", body["EventPattern"].as_str(), 0, 4096)?;
600        validate_optional_enum(
601            "state",
602            body["State"].as_str(),
603            &[
604                "ENABLED",
605                "DISABLED",
606                "ENABLED_WITH_ALL_CLOUDTRAIL_MANAGEMENT_EVENTS",
607            ],
608        )?;
609        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
610        validate_optional_string_length("roleArn", body["RoleArn"].as_str(), 1, 1600)?;
611
612        let raw_bus = body["EventBusName"]
613            .as_str()
614            .unwrap_or("default")
615            .to_string();
616
617        let mut state = self.state.write();
618        let event_bus_name = state.resolve_bus_name(&raw_bus);
619
620        let event_pattern = body["EventPattern"].as_str().and_then(|s| {
621            if s.is_empty() {
622                None
623            } else {
624                Some(s.to_string())
625            }
626        });
627        let schedule_expression = body["ScheduleExpression"].as_str().and_then(|s| {
628            if s.is_empty() {
629                None
630            } else {
631                Some(s.to_string())
632            }
633        });
634        let description = body["Description"].as_str().map(|s| s.to_string());
635        let role_arn = body["RoleArn"].as_str().map(|s| s.to_string());
636        let rule_state = body["State"].as_str().unwrap_or("ENABLED").to_string();
637
638        // Validate: schedule expressions only on default bus
639        if schedule_expression.is_some() && event_bus_name != "default" {
640            return Err(AwsServiceError::aws_error(
641                StatusCode::BAD_REQUEST,
642                "ValidationException",
643                "ScheduleExpression is supported only on the default event bus.",
644            ));
645        }
646
647        if !state.buses.contains_key(&event_bus_name) {
648            return Err(AwsServiceError::aws_error(
649                StatusCode::BAD_REQUEST,
650                "ResourceNotFoundException",
651                format!("Event bus {event_bus_name} does not exist."),
652            ));
653        }
654
655        let arn = if event_bus_name == "default" {
656            format!(
657                "arn:aws:events:{}:{}:rule/{}",
658                req.region, state.account_id, name
659            )
660        } else {
661            format!(
662                "arn:aws:events:{}:{}:rule/{}/{}",
663                req.region, state.account_id, event_bus_name, name
664            )
665        };
666
667        let key = (event_bus_name.clone(), name.clone());
668        let targets = state
669            .rules
670            .get(&key)
671            .map(|r| r.targets.clone())
672            .unwrap_or_default();
673
674        let tags = parse_tags(&body);
675
676        let rule = EventRule {
677            name: name.clone(),
678            arn: arn.clone(),
679            event_bus_name,
680            event_pattern,
681            schedule_expression,
682            state: rule_state,
683            description,
684            role_arn,
685            managed_by: None,
686            created_by: None,
687            targets,
688            tags,
689            last_fired: None,
690        };
691
692        state.rules.insert(key, rule);
693        Ok(AwsResponse::ok_json(json!({ "RuleArn": arn })))
694    }
695
696    fn delete_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
697        let body = req.json_body();
698        validate_required("Name", &body["Name"])?;
699        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
700        validate_string_length("name", name, 1, 64)?;
701        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
702        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
703
704        let mut state = self.state.write();
705        let bus_name = state.resolve_bus_name(event_bus_name);
706        let key = (bus_name, name.to_string());
707
708        // Check if rule has targets
709        if let Some(rule) = state.rules.get(&key) {
710            if !rule.targets.is_empty() {
711                return Err(AwsServiceError::aws_error(
712                    StatusCode::BAD_REQUEST,
713                    "ValidationException",
714                    "Rule can't be deleted since it has targets.",
715                ));
716            }
717        }
718
719        state.rules.remove(&key);
720        Ok(AwsResponse::ok_json(json!({})))
721    }
722
723    fn list_rules(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
724        let body = req.json_body();
725        validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 64)?;
726        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
727        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
728        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
729        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
730        let name_prefix = body["NamePrefix"].as_str();
731        let limit = body["Limit"].as_u64().map(|n| n as usize);
732        let next_token = body["NextToken"].as_str();
733
734        let state = self.state.read();
735        let bus_name = state.resolve_bus_name(event_bus_name);
736
737        let mut rules: Vec<&EventRule> = state
738            .rules
739            .values()
740            .filter(|r| r.event_bus_name == bus_name)
741            .filter(|r| match name_prefix {
742                Some(prefix) => r.name.starts_with(prefix),
743                None => true,
744            })
745            .collect();
746        rules.sort_by(|a, b| a.name.cmp(&b.name));
747
748        // Pagination
749        let start = next_token
750            .and_then(|t| t.parse::<usize>().ok())
751            .unwrap_or(0)
752            .min(rules.len());
753        let rules_slice = &rules[start..];
754
755        let (page, new_next_token) = if let Some(lim) = limit {
756            if rules_slice.len() > lim {
757                (&rules_slice[..lim], Some((start + lim).to_string()))
758            } else {
759                (rules_slice, None)
760            }
761        } else {
762            (rules_slice, None)
763        };
764
765        let rules_json: Vec<Value> = page
766            .iter()
767            .map(|r| {
768                let mut obj = json!({
769                    "Name": r.name,
770                    "Arn": r.arn,
771                    "EventBusName": r.event_bus_name,
772                    "State": r.state,
773                });
774                if let Some(ref desc) = r.description {
775                    obj["Description"] = json!(desc);
776                }
777                if let Some(ref ep) = r.event_pattern {
778                    obj["EventPattern"] = json!(ep);
779                }
780                if let Some(ref se) = r.schedule_expression {
781                    obj["ScheduleExpression"] = json!(se);
782                }
783                if let Some(ref mb) = r.managed_by {
784                    obj["ManagedBy"] = json!(mb);
785                }
786                obj
787            })
788            .collect();
789
790        let mut resp = json!({ "Rules": rules_json });
791        if let Some(token) = new_next_token {
792            resp["NextToken"] = json!(token);
793        }
794
795        Ok(AwsResponse::ok_json(resp))
796    }
797
798    fn describe_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
799        let body = req.json_body();
800        validate_required("Name", &body["Name"])?;
801        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
802        validate_string_length("name", name, 1, 64)?;
803        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
804        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
805
806        let state = self.state.read();
807        let bus_name = state.resolve_bus_name(event_bus_name);
808        let key = (bus_name.clone(), name.to_string());
809
810        let rule = state.rules.get(&key).ok_or_else(|| {
811            AwsServiceError::aws_error(
812                StatusCode::BAD_REQUEST,
813                "ResourceNotFoundException",
814                format!("Rule {name} does not exist."),
815            )
816        })?;
817
818        let mut resp = json!({
819            "Name": rule.name,
820            "Arn": rule.arn,
821            "EventBusName": rule.event_bus_name,
822            "State": rule.state,
823        });
824
825        if let Some(ref desc) = rule.description {
826            resp["Description"] = json!(desc);
827        }
828        if let Some(ref ep) = rule.event_pattern {
829            resp["EventPattern"] = json!(ep);
830        }
831        if let Some(ref se) = rule.schedule_expression {
832            resp["ScheduleExpression"] = json!(se);
833        }
834        if let Some(ref role) = rule.role_arn {
835            resp["RoleArn"] = json!(role);
836        }
837        if let Some(ref mb) = rule.managed_by {
838            resp["ManagedBy"] = json!(mb);
839        }
840        if let Some(ref cb) = rule.created_by {
841            resp["CreatedBy"] = json!(cb);
842        }
843        // If non-default bus, set CreatedBy to account_id
844        if rule.event_bus_name != "default" && rule.created_by.is_none() {
845            resp["CreatedBy"] = json!(state.account_id);
846        }
847
848        Ok(AwsResponse::ok_json(resp))
849    }
850
851    fn enable_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
852        let body = req.json_body();
853        validate_required("Name", &body["Name"])?;
854        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
855        validate_string_length("name", name, 1, 64)?;
856        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
857        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
858
859        let mut state = self.state.write();
860        let bus_name = state.resolve_bus_name(event_bus_name);
861        let key = (bus_name, name.to_string());
862
863        let rule = state.rules.get_mut(&key).ok_or_else(|| {
864            AwsServiceError::aws_error(
865                StatusCode::BAD_REQUEST,
866                "ResourceNotFoundException",
867                format!("Rule {name} does not exist."),
868            )
869        })?;
870
871        rule.state = "ENABLED".to_string();
872        Ok(AwsResponse::ok_json(json!({})))
873    }
874
875    fn disable_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
876        let body = req.json_body();
877        validate_required("Name", &body["Name"])?;
878        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
879        validate_string_length("name", name, 1, 64)?;
880        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
881        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
882
883        let mut state = self.state.write();
884        let bus_name = state.resolve_bus_name(event_bus_name);
885        let key = (bus_name, name.to_string());
886
887        let rule = state.rules.get_mut(&key).ok_or_else(|| {
888            AwsServiceError::aws_error(
889                StatusCode::BAD_REQUEST,
890                "ResourceNotFoundException",
891                format!("Rule {name} does not exist."),
892            )
893        })?;
894
895        rule.state = "DISABLED".to_string();
896        Ok(AwsResponse::ok_json(json!({})))
897    }
898
899    // ─── Target Operations ──────────────────────────────────────────────
900
901    fn put_targets(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
902        let body = req.json_body();
903        validate_required("Rule", &body["Rule"])?;
904        let rule_name = body["Rule"].as_str().ok_or_else(|| missing("Rule"))?;
905        validate_string_length("rule", rule_name, 1, 64)?;
906        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
907        validate_required("Targets", &body["Targets"])?;
908        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
909        let targets = body["Targets"]
910            .as_array()
911            .ok_or_else(|| missing("Targets"))?;
912
913        // Validate targets - check for FIFO SQS without SqsParameters
914        for target in targets {
915            let target_id = target["Id"].as_str().unwrap_or("");
916            let target_arn = target["Arn"].as_str().unwrap_or("");
917
918            if target_arn.ends_with(".fifo") && target.get("SqsParameters").is_none() {
919                return Err(AwsServiceError::aws_error(
920                    StatusCode::BAD_REQUEST,
921                    "ValidationException",
922                    format!(
923                        "Parameter(s) SqsParameters must be specified for target: {target_id}."
924                    ),
925                ));
926            }
927
928            // Validate ARN format
929            if !target_arn.starts_with("arn:") {
930                return Err(AwsServiceError::aws_error(
931                    StatusCode::BAD_REQUEST,
932                    "ValidationException",
933                    format!(
934                        "Parameter {target_arn} is not valid. Reason: Provided Arn is not in correct format."
935                    ),
936                ));
937            }
938        }
939
940        let mut state = self.state.write();
941        let bus_name = state.resolve_bus_name(event_bus_name);
942        let key = (bus_name.clone(), rule_name.to_string());
943
944        let rule = state.rules.get_mut(&key).ok_or_else(|| {
945            AwsServiceError::aws_error(
946                StatusCode::BAD_REQUEST,
947                "ResourceNotFoundException",
948                format!("Rule {rule_name} does not exist on EventBus {bus_name}."),
949            )
950        })?;
951
952        for target in targets {
953            let et = parse_target(target);
954            // Remove existing target with same ID
955            rule.targets.retain(|t| t.id != et.id);
956            rule.targets.push(et);
957        }
958
959        Ok(AwsResponse::ok_json(json!({
960            "FailedEntryCount": 0,
961            "FailedEntries": [],
962        })))
963    }
964
965    fn remove_targets(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
966        let body = req.json_body();
967        validate_required("Rule", &body["Rule"])?;
968        let rule_name = body["Rule"].as_str().ok_or_else(|| missing("Rule"))?;
969        validate_string_length("rule", rule_name, 1, 64)?;
970        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
971        validate_required("Ids", &body["Ids"])?;
972        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
973        let ids = body["Ids"].as_array().ok_or_else(|| missing("Ids"))?;
974
975        let target_ids: Vec<String> = ids
976            .iter()
977            .filter_map(|v| v.as_str().map(|s| s.to_string()))
978            .collect();
979
980        let mut state = self.state.write();
981        let bus_name = state.resolve_bus_name(event_bus_name);
982        let key = (bus_name.clone(), rule_name.to_string());
983
984        let rule = state.rules.get_mut(&key).ok_or_else(|| {
985            AwsServiceError::aws_error(
986                StatusCode::BAD_REQUEST,
987                "ResourceNotFoundException",
988                format!("Rule {rule_name} does not exist on EventBus {bus_name}."),
989            )
990        })?;
991
992        rule.targets.retain(|t| !target_ids.contains(&t.id));
993
994        Ok(AwsResponse::ok_json(json!({
995            "FailedEntryCount": 0,
996            "FailedEntries": [],
997        })))
998    }
999
1000    fn list_targets_by_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1001        let body = req.json_body();
1002        validate_required("Rule", &body["Rule"])?;
1003        let rule_name = body["Rule"].as_str().ok_or_else(|| missing("Rule"))?;
1004        validate_string_length("rule", rule_name, 1, 64)?;
1005        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
1006        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1007        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1008        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
1009        let limit = body["Limit"].as_u64().map(|n| n as usize);
1010        let next_token = body["NextToken"].as_str();
1011
1012        let state = self.state.read();
1013        let bus_name = state.resolve_bus_name(event_bus_name);
1014        let key = (bus_name, rule_name.to_string());
1015
1016        let rule = state.rules.get(&key).ok_or_else(|| {
1017            AwsServiceError::aws_error(
1018                StatusCode::BAD_REQUEST,
1019                "ResourceNotFoundException",
1020                format!("Rule {rule_name} does not exist."),
1021            )
1022        })?;
1023
1024        let all_targets = &rule.targets;
1025        let start = next_token
1026            .and_then(|t| t.parse::<usize>().ok())
1027            .unwrap_or(0)
1028            .min(all_targets.len());
1029        let slice = &all_targets[start..];
1030
1031        let (page, new_next_token) = if let Some(lim) = limit {
1032            if slice.len() > lim {
1033                (&slice[..lim], Some((start + lim).to_string()))
1034            } else {
1035                (slice, None)
1036            }
1037        } else {
1038            (slice, None)
1039        };
1040
1041        let targets: Vec<Value> = page.iter().map(target_to_json).collect();
1042
1043        let mut resp = json!({ "Targets": targets });
1044        if let Some(token) = new_next_token {
1045            resp["NextToken"] = json!(token);
1046        }
1047
1048        Ok(AwsResponse::ok_json(resp))
1049    }
1050
1051    fn list_rule_names_by_target(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1052        let body = req.json_body();
1053        validate_required("TargetArn", &body["TargetArn"])?;
1054        let target_arn = body["TargetArn"]
1055            .as_str()
1056            .ok_or_else(|| missing("TargetArn"))?;
1057        validate_string_length("targetArn", target_arn, 1, 1600)?;
1058        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
1059        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1060        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1061        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
1062        let limit = body["Limit"].as_u64().map(|n| n as usize);
1063        let next_token = body["NextToken"].as_str();
1064
1065        let state = self.state.read();
1066        let bus_name = state.resolve_bus_name(event_bus_name);
1067
1068        // Deduplicate rule names
1069        let mut rule_names: Vec<String> = Vec::new();
1070        for rule in state.rules.values() {
1071            if rule.event_bus_name == bus_name
1072                && rule.targets.iter().any(|t| t.arn == target_arn)
1073                && !rule_names.contains(&rule.name)
1074            {
1075                rule_names.push(rule.name.clone());
1076            }
1077        }
1078        rule_names.sort();
1079
1080        let start = next_token
1081            .and_then(|t| t.parse::<usize>().ok())
1082            .unwrap_or(0)
1083            .min(rule_names.len());
1084        let slice = &rule_names[start..];
1085
1086        let (page, new_next_token) = if let Some(lim) = limit {
1087            if slice.len() > lim {
1088                (&slice[..lim], Some((start + lim).to_string()))
1089            } else {
1090                (slice, None)
1091            }
1092        } else {
1093            (slice, None)
1094        };
1095
1096        let mut resp = json!({ "RuleNames": page });
1097        if let Some(token) = new_next_token {
1098            resp["NextToken"] = json!(token);
1099        }
1100
1101        Ok(AwsResponse::ok_json(resp))
1102    }
1103
1104    // ─── Partner Event Sources ────────────���───────────────────────────
1105
1106    fn create_partner_event_source(
1107        &self,
1108        req: &AwsRequest,
1109    ) -> Result<AwsResponse, AwsServiceError> {
1110        let body = req.json_body();
1111        validate_required("Name", &body["Name"])?;
1112        let name = body["Name"]
1113            .as_str()
1114            .ok_or_else(|| missing("Name"))?
1115            .to_string();
1116        validate_string_length("name", &name, 1, 256)?;
1117        validate_required("Account", &body["Account"])?;
1118        let account = body["Account"]
1119            .as_str()
1120            .ok_or_else(|| missing("Account"))?
1121            .to_string();
1122        validate_string_length("account", &account, 12, 12)?;
1123
1124        let mut state = self.state.write();
1125        if state.partner_event_sources.contains_key(&name) {
1126            return Err(AwsServiceError::aws_error(
1127                StatusCode::CONFLICT,
1128                "ResourceAlreadyExistsException",
1129                format!("Partner event source {name} already exists."),
1130            ));
1131        }
1132        let arn = format!(
1133            "arn:aws:events:{}::event-source/aws.partner/{}",
1134            state.region, name
1135        );
1136        let now = Utc::now();
1137        let ps = PartnerEventSource {
1138            name: name.clone(),
1139            arn: arn.clone(),
1140            account,
1141            creation_time: now,
1142            expiration_time: None,
1143            state: "ACTIVE".to_string(),
1144        };
1145        state.partner_event_sources.insert(name.clone(), ps);
1146
1147        Ok(AwsResponse::ok_json(json!({ "EventSourceArn": arn })))
1148    }
1149
1150    fn delete_partner_event_source(
1151        &self,
1152        req: &AwsRequest,
1153    ) -> Result<AwsResponse, AwsServiceError> {
1154        let body = req.json_body();
1155        validate_required("Name", &body["Name"])?;
1156        let name = body["Name"]
1157            .as_str()
1158            .ok_or_else(|| missing("Name"))?
1159            .to_string();
1160        validate_required("Account", &body["Account"])?;
1161        let account = body["Account"]
1162            .as_str()
1163            .ok_or_else(|| missing("Account"))?
1164            .to_string();
1165
1166        let mut state = self.state.write();
1167        match state.partner_event_sources.get(&name) {
1168            Some(ps) if ps.account == account => {
1169                state.partner_event_sources.remove(&name);
1170            }
1171            Some(_) => {
1172                return Err(AwsServiceError::aws_error(
1173                    StatusCode::NOT_FOUND,
1174                    "ResourceNotFoundException",
1175                    format!("Partner event source {name} does not exist for account {account}."),
1176                ));
1177            }
1178            None => {
1179                return Err(AwsServiceError::aws_error(
1180                    StatusCode::NOT_FOUND,
1181                    "ResourceNotFoundException",
1182                    format!("Partner event source {name} does not exist."),
1183                ));
1184            }
1185        }
1186
1187        Ok(AwsResponse::ok_json(json!({})))
1188    }
1189
1190    fn describe_partner_event_source(
1191        &self,
1192        req: &AwsRequest,
1193    ) -> Result<AwsResponse, AwsServiceError> {
1194        let body = req.json_body();
1195        validate_required("Name", &body["Name"])?;
1196        let name = body["Name"]
1197            .as_str()
1198            .ok_or_else(|| missing("Name"))?
1199            .to_string();
1200        validate_string_length("name", &name, 1, 256)?;
1201
1202        let state = self.state.read();
1203        let ps = state.partner_event_sources.get(&name).ok_or_else(|| {
1204            AwsServiceError::aws_error(
1205                StatusCode::NOT_FOUND,
1206                "ResourceNotFoundException",
1207                format!("Partner event source {name} does not exist."),
1208            )
1209        })?;
1210
1211        Ok(AwsResponse::ok_json(json!({
1212            "Arn": ps.arn,
1213            "Name": ps.name,
1214        })))
1215    }
1216
1217    fn list_partner_event_sources(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1218        let body = req.json_body();
1219        validate_required("namePrefix", &body["NamePrefix"])?;
1220        let name_prefix = body["NamePrefix"]
1221            .as_str()
1222            .ok_or_else(|| missing("NamePrefix"))?;
1223        validate_string_length("namePrefix", name_prefix, 1, 256)?;
1224        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1225        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1226        let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
1227
1228        let state = self.state.read();
1229        let all: Vec<Value> = state
1230            .partner_event_sources
1231            .values()
1232            .filter(|ps| ps.name.starts_with(name_prefix))
1233            .map(|ps| {
1234                json!({
1235                    "Arn": ps.arn,
1236                    "Name": ps.name,
1237                })
1238            })
1239            .collect();
1240
1241        let (sources, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
1242        let mut resp = json!({ "PartnerEventSources": sources });
1243        if let Some(token) = next_token {
1244            resp["NextToken"] = json!(token);
1245        }
1246
1247        Ok(AwsResponse::ok_json(resp))
1248    }
1249
1250    fn list_partner_event_source_accounts(
1251        &self,
1252        req: &AwsRequest,
1253    ) -> Result<AwsResponse, AwsServiceError> {
1254        let body = req.json_body();
1255        validate_required("EventSourceName", &body["EventSourceName"])?;
1256        let event_source_name = body["EventSourceName"]
1257            .as_str()
1258            .ok_or_else(|| missing("EventSourceName"))?;
1259        validate_string_length("eventSourceName", event_source_name, 1, 256)?;
1260        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1261        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1262
1263        let state = self.state.read();
1264        let accounts: Vec<Value> = state
1265            .partner_event_sources
1266            .values()
1267            .filter(|ps| ps.name == event_source_name)
1268            .map(|ps| json!({ "Account": ps.account }))
1269            .collect();
1270
1271        Ok(AwsResponse::ok_json(json!({
1272            "PartnerEventSourceAccounts": accounts
1273        })))
1274    }
1275
1276    fn activate_event_source(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1277        let body = req.json_body();
1278        validate_required("Name", &body["Name"])?;
1279        let name = body["Name"]
1280            .as_str()
1281            .ok_or_else(|| missing("Name"))?
1282            .to_string();
1283
1284        let mut state = self.state.write();
1285        let ps = state.partner_event_sources.get_mut(&name).ok_or_else(|| {
1286            AwsServiceError::aws_error(
1287                StatusCode::NOT_FOUND,
1288                "ResourceNotFoundException",
1289                format!("Event source {name} does not exist."),
1290            )
1291        })?;
1292        ps.state = "ACTIVE".to_string();
1293
1294        Ok(AwsResponse::ok_json(json!({})))
1295    }
1296
1297    fn deactivate_event_source(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1298        let body = req.json_body();
1299        validate_required("Name", &body["Name"])?;
1300        let name = body["Name"]
1301            .as_str()
1302            .ok_or_else(|| missing("Name"))?
1303            .to_string();
1304
1305        let mut state = self.state.write();
1306        let ps = state.partner_event_sources.get_mut(&name).ok_or_else(|| {
1307            AwsServiceError::aws_error(
1308                StatusCode::NOT_FOUND,
1309                "ResourceNotFoundException",
1310                format!("Event source {name} does not exist."),
1311            )
1312        })?;
1313        ps.state = "INACTIVE".to_string();
1314
1315        Ok(AwsResponse::ok_json(json!({})))
1316    }
1317
1318    fn describe_event_source(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1319        let body = req.json_body();
1320        validate_required("Name", &body["Name"])?;
1321        let name = body["Name"]
1322            .as_str()
1323            .ok_or_else(|| missing("Name"))?
1324            .to_string();
1325
1326        let state = self.state.read();
1327        let ps = state.partner_event_sources.get(&name).ok_or_else(|| {
1328            AwsServiceError::aws_error(
1329                StatusCode::NOT_FOUND,
1330                "ResourceNotFoundException",
1331                format!("Event source {name} does not exist."),
1332            )
1333        })?;
1334
1335        Ok(AwsResponse::ok_json(json!({
1336            "Arn": ps.arn,
1337            "Name": ps.name,
1338            "CreatedBy": ps.account,
1339            "CreationTime": ps.creation_time.timestamp() as f64,
1340            "State": ps.state,
1341        })))
1342    }
1343
1344    fn list_event_sources(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1345        let body = req.json_body();
1346        validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 256)?;
1347        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1348        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1349        let name_prefix = body["NamePrefix"].as_str();
1350        let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
1351
1352        let state = self.state.read();
1353        let all: Vec<Value> = state
1354            .partner_event_sources
1355            .values()
1356            .filter(|ps| match name_prefix {
1357                Some(prefix) => ps.name.starts_with(prefix),
1358                None => true,
1359            })
1360            .map(|ps| {
1361                json!({
1362                    "Arn": ps.arn,
1363                    "Name": ps.name,
1364                    "CreatedBy": ps.account,
1365                    "CreationTime": ps.creation_time.timestamp() as f64,
1366                    "State": ps.state,
1367                })
1368            })
1369            .collect();
1370
1371        let (sources, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
1372        let mut resp = json!({ "EventSources": sources });
1373        if let Some(token) = next_token {
1374            resp["NextToken"] = json!(token);
1375        }
1376
1377        Ok(AwsResponse::ok_json(resp))
1378    }
1379
1380    fn put_partner_events(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1381        let body = req.json_body();
1382        validate_required("Entries", &body["Entries"])?;
1383        let entries = body["Entries"]
1384            .as_array()
1385            .ok_or_else(|| missing("Entries"))?;
1386
1387        let mut result_entries = Vec::new();
1388        for _entry in entries {
1389            let event_id = uuid::Uuid::new_v4().to_string();
1390            result_entries.push(json!({ "EventId": event_id }));
1391        }
1392
1393        Ok(AwsResponse::ok_json(json!({
1394            "FailedEntryCount": 0,
1395            "Entries": result_entries,
1396        })))
1397    }
1398
1399    // ─── TestEventPattern ────────────────────────────────────────────────
1400
1401    fn test_event_pattern(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1402        let body = req.json_body();
1403        validate_required("EventPattern", &body["EventPattern"])?;
1404        validate_required("Event", &body["Event"])?;
1405        let event_pattern = body["EventPattern"]
1406            .as_str()
1407            .ok_or_else(|| missing("EventPattern"))?;
1408        let event_str = body["Event"].as_str().ok_or_else(|| missing("Event"))?;
1409
1410        // Parse the event JSON
1411        let event: Value = serde_json::from_str(event_str).map_err(|_| {
1412            AwsServiceError::aws_error(
1413                StatusCode::BAD_REQUEST,
1414                "InvalidEventPatternException",
1415                "Event is not valid JSON.",
1416            )
1417        })?;
1418
1419        // Parse the pattern JSON
1420        let _pattern: Value = serde_json::from_str(event_pattern).map_err(|_| {
1421            AwsServiceError::aws_error(
1422                StatusCode::BAD_REQUEST,
1423                "InvalidEventPatternException",
1424                "Event pattern is not valid JSON.",
1425            )
1426        })?;
1427
1428        let source = event["source"].as_str().unwrap_or("");
1429        let detail_type = event["detail-type"].as_str().unwrap_or("");
1430        let detail = event
1431            .get("detail")
1432            .map(|v| serde_json::to_string(v).unwrap_or_default())
1433            .unwrap_or_else(|| "{}".to_string());
1434        let account = event["account"].as_str().unwrap_or("");
1435        let region = event["region"].as_str().unwrap_or("");
1436        let resources: Vec<String> = event["resources"]
1437            .as_array()
1438            .map(|arr| {
1439                arr.iter()
1440                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
1441                    .collect()
1442            })
1443            .unwrap_or_default();
1444
1445        let result = matches_pattern(
1446            Some(event_pattern),
1447            source,
1448            detail_type,
1449            &detail,
1450            account,
1451            region,
1452            &resources,
1453        );
1454
1455        Ok(AwsResponse::ok_json(json!({ "Result": result })))
1456    }
1457
1458    // ─── UpdateEventBus ─────────────────────────────────────────────────
1459
1460    fn update_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1461        let body = req.json_body();
1462        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
1463        validate_optional_string_length(
1464            "kmsKeyIdentifier",
1465            body["KmsKeyIdentifier"].as_str(),
1466            0,
1467            2048,
1468        )?;
1469        let name = body["Name"].as_str().unwrap_or("default");
1470
1471        let mut state = self.state.write();
1472        let bus = state.buses.get_mut(name).ok_or_else(|| {
1473            AwsServiceError::aws_error(
1474                StatusCode::BAD_REQUEST,
1475                "ResourceNotFoundException",
1476                format!("Event bus {name} does not exist."),
1477            )
1478        })?;
1479
1480        if let Some(desc) = body["Description"].as_str() {
1481            bus.description = Some(desc.to_string());
1482        }
1483        if let Some(kms) = body["KmsKeyIdentifier"].as_str() {
1484            bus.kms_key_identifier = Some(kms.to_string());
1485        }
1486        if let Some(dlc) = body.get("DeadLetterConfig") {
1487            bus.dead_letter_config = Some(dlc.clone());
1488        }
1489        bus.last_modified_time = Utc::now();
1490
1491        let arn = bus.arn.clone();
1492        let bus_name = bus.name.clone();
1493
1494        Ok(AwsResponse::ok_json(json!({
1495            "Arn": arn,
1496            "Name": bus_name,
1497        })))
1498    }
1499
1500    // ─── Endpoint Operations ────────────────────────────────────────────
1501
1502    fn create_endpoint(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1503        let body = req.json_body();
1504        validate_required("Name", &body["Name"])?;
1505        let name = body["Name"]
1506            .as_str()
1507            .ok_or_else(|| missing("Name"))?
1508            .to_string();
1509        validate_string_length("name", &name, 1, 64)?;
1510        validate_required("RoutingConfig", &body["RoutingConfig"])?;
1511        validate_required("EventBuses", &body["EventBuses"])?;
1512
1513        let description = body["Description"].as_str().map(|s| s.to_string());
1514        let routing_config = body["RoutingConfig"].clone();
1515        let replication_config = body.get("ReplicationConfig").cloned();
1516        let event_buses = body["EventBuses"].as_array().cloned().unwrap_or_default();
1517        let role_arn = body["RoleArn"].as_str().map(|s| s.to_string());
1518
1519        let mut state = self.state.write();
1520        if state.endpoints.contains_key(&name) {
1521            return Err(AwsServiceError::aws_error(
1522                StatusCode::CONFLICT,
1523                "ResourceAlreadyExistsException",
1524                format!("Endpoint {name} already exists."),
1525            ));
1526        }
1527
1528        let endpoint_id = format!("{}.abc123", name);
1529        let arn = format!(
1530            "arn:aws:events:{}:{}:endpoint/{}",
1531            req.region, state.account_id, name
1532        );
1533        let endpoint_url = format!(
1534            "https://{}.endpoint.events.{}.amazonaws.com",
1535            endpoint_id, req.region
1536        );
1537        let now = Utc::now();
1538
1539        let endpoint = Endpoint {
1540            name: name.clone(),
1541            arn: arn.clone(),
1542            endpoint_id: endpoint_id.clone(),
1543            endpoint_url: Some(endpoint_url),
1544            description,
1545            routing_config: routing_config.clone(),
1546            replication_config: replication_config.clone(),
1547            event_buses: event_buses.clone(),
1548            role_arn: role_arn.clone(),
1549            state: "ACTIVE".to_string(),
1550            creation_time: now,
1551            last_modified_time: now,
1552        };
1553        state.endpoints.insert(name.clone(), endpoint);
1554
1555        let mut resp = json!({
1556            "Name": name,
1557            "Arn": arn,
1558            "State": "ACTIVE",
1559            "RoutingConfig": routing_config,
1560            "EventBuses": event_buses,
1561        });
1562        if let Some(ref rc) = replication_config {
1563            resp["ReplicationConfig"] = rc.clone();
1564        }
1565        if let Some(ref ra) = role_arn {
1566            resp["RoleArn"] = json!(ra);
1567        }
1568
1569        Ok(AwsResponse::ok_json(resp))
1570    }
1571
1572    fn delete_endpoint(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1573        let body = req.json_body();
1574        validate_required("Name", &body["Name"])?;
1575        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
1576
1577        let mut state = self.state.write();
1578        state.endpoints.remove(name).ok_or_else(|| {
1579            AwsServiceError::aws_error(
1580                StatusCode::BAD_REQUEST,
1581                "ResourceNotFoundException",
1582                format!("Endpoint '{name}' does not exist."),
1583            )
1584        })?;
1585
1586        Ok(AwsResponse::ok_json(json!({})))
1587    }
1588
1589    fn describe_endpoint(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1590        let body = req.json_body();
1591        validate_required("Name", &body["Name"])?;
1592        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
1593
1594        let state = self.state.read();
1595        let ep = state.endpoints.get(name).ok_or_else(|| {
1596            AwsServiceError::aws_error(
1597                StatusCode::BAD_REQUEST,
1598                "ResourceNotFoundException",
1599                format!("Endpoint '{name}' does not exist."),
1600            )
1601        })?;
1602
1603        let mut resp = json!({
1604            "Name": ep.name,
1605            "Arn": ep.arn,
1606            "EndpointId": ep.endpoint_id,
1607            "State": ep.state,
1608            "RoutingConfig": ep.routing_config,
1609            "EventBuses": ep.event_buses,
1610            "CreationTime": ep.creation_time.timestamp() as f64,
1611            "LastModifiedTime": ep.last_modified_time.timestamp() as f64,
1612        });
1613        if let Some(ref url) = ep.endpoint_url {
1614            resp["EndpointUrl"] = json!(url);
1615        }
1616        if let Some(ref desc) = ep.description {
1617            resp["Description"] = json!(desc);
1618        }
1619        if let Some(ref rc) = ep.replication_config {
1620            resp["ReplicationConfig"] = rc.clone();
1621        }
1622        if let Some(ref ra) = ep.role_arn {
1623            resp["RoleArn"] = json!(ra);
1624        }
1625
1626        Ok(AwsResponse::ok_json(resp))
1627    }
1628
1629    fn list_endpoints(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1630        let body = req.json_body();
1631        validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 64)?;
1632        validate_optional_string_length("homeRegion", body["HomeRegion"].as_str(), 9, 20)?;
1633        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1634        validate_optional_range_i64("maxResults", body["MaxResults"].as_i64(), 1, 100)?;
1635        let name_prefix = body["NamePrefix"].as_str();
1636        let limit = body["MaxResults"].as_i64().unwrap_or(100) as usize;
1637
1638        let state = self.state.read();
1639        let all: Vec<Value> = state
1640            .endpoints
1641            .values()
1642            .filter(|ep| match name_prefix {
1643                Some(prefix) => ep.name.starts_with(prefix),
1644                None => true,
1645            })
1646            .map(|ep| {
1647                let mut obj = json!({
1648                    "Name": ep.name,
1649                    "Arn": ep.arn,
1650                    "EndpointId": ep.endpoint_id,
1651                    "State": ep.state,
1652                    "RoutingConfig": ep.routing_config,
1653                    "EventBuses": ep.event_buses,
1654                    "CreationTime": ep.creation_time.timestamp() as f64,
1655                    "LastModifiedTime": ep.last_modified_time.timestamp() as f64,
1656                });
1657                if let Some(ref url) = ep.endpoint_url {
1658                    obj["EndpointUrl"] = json!(url);
1659                }
1660                obj
1661            })
1662            .collect();
1663
1664        let (endpoints, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
1665        let mut resp = json!({ "Endpoints": endpoints });
1666        if let Some(token) = next_token {
1667            resp["NextToken"] = json!(token);
1668        }
1669
1670        Ok(AwsResponse::ok_json(resp))
1671    }
1672
1673    fn update_endpoint(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1674        let body = req.json_body();
1675        validate_required("Name", &body["Name"])?;
1676        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
1677
1678        let mut state = self.state.write();
1679        let ep = state.endpoints.get_mut(name).ok_or_else(|| {
1680            AwsServiceError::aws_error(
1681                StatusCode::BAD_REQUEST,
1682                "ResourceNotFoundException",
1683                format!("Endpoint '{name}' does not exist."),
1684            )
1685        })?;
1686
1687        if let Some(desc) = body["Description"].as_str() {
1688            ep.description = Some(desc.to_string());
1689        }
1690        if !body["RoutingConfig"].is_null() {
1691            ep.routing_config = body["RoutingConfig"].clone();
1692        }
1693        if let Some(rc) = body.get("ReplicationConfig") {
1694            ep.replication_config = Some(rc.clone());
1695        }
1696        if let Some(buses) = body["EventBuses"].as_array() {
1697            ep.event_buses = buses.clone();
1698        }
1699        if let Some(ra) = body["RoleArn"].as_str() {
1700            ep.role_arn = Some(ra.to_string());
1701        }
1702        ep.last_modified_time = Utc::now();
1703
1704        let resp = json!({
1705            "Name": ep.name,
1706            "Arn": ep.arn,
1707            "EndpointId": ep.endpoint_id,
1708            "State": ep.state,
1709            "RoutingConfig": ep.routing_config,
1710            "EventBuses": ep.event_buses,
1711        });
1712
1713        Ok(AwsResponse::ok_json(resp))
1714    }
1715
1716    // ─── DeauthorizeConnection ──────────────────────────────────────────
1717
1718    fn deauthorize_connection(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1719        let body = req.json_body();
1720        validate_required("Name", &body["Name"])?;
1721        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
1722        validate_string_length("name", name, 1, 64)?;
1723
1724        let mut state = self.state.write();
1725        let conn = state.connections.get_mut(name).ok_or_else(|| {
1726            AwsServiceError::aws_error(
1727                StatusCode::BAD_REQUEST,
1728                "ResourceNotFoundException",
1729                format!("Connection '{name}' does not exist."),
1730            )
1731        })?;
1732
1733        conn.connection_state = "DEAUTHORIZING".to_string();
1734        conn.last_modified_time = Utc::now();
1735
1736        let resp = json!({
1737            "ConnectionArn": conn.arn,
1738            "ConnectionState": conn.connection_state,
1739            "CreationTime": conn.creation_time.timestamp() as f64,
1740            "LastModifiedTime": conn.last_modified_time.timestamp() as f64,
1741            "LastAuthorizedTime": conn.last_authorized_time.timestamp() as f64,
1742        });
1743
1744        Ok(AwsResponse::ok_json(resp))
1745    }
1746
1747    // ─── PutEvents ──────────────────────────────────────────────────────
1748
1749    fn put_events(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1750        let body = req.json_body();
1751        validate_required("Entries", &body["Entries"])?;
1752        validate_optional_string_length("endpointId", body["EndpointId"].as_str(), 1, 50)?;
1753        let entries = body["Entries"]
1754            .as_array()
1755            .ok_or_else(|| missing("Entries"))?;
1756
1757        // Validate entries count
1758        if entries.is_empty() {
1759            return Err(AwsServiceError::aws_error(
1760                StatusCode::BAD_REQUEST,
1761                "ValidationException",
1762                "1 validation error detected: Value '[PutEventsRequestEntry]' at 'entries' failed to satisfy constraint: Member must have length greater than or equal to 1",
1763            ));
1764        }
1765        if entries.len() > 10 {
1766            return Err(AwsServiceError::aws_error(
1767                StatusCode::BAD_REQUEST,
1768                "ValidationException",
1769                "1 validation error detected: Value '[PutEventsRequestEntry]' at 'entries' failed to satisfy constraint: Member must have length less than or equal to 10",
1770            ));
1771        }
1772
1773        let mut state = self.state.write();
1774        let mut result_entries = Vec::new();
1775        let mut events_to_deliver = Vec::new();
1776        let mut failed_count = 0;
1777
1778        for entry in entries {
1779            let source = entry["Source"].as_str().unwrap_or("").to_string();
1780            let detail_type = entry["DetailType"].as_str().unwrap_or("").to_string();
1781            let detail = entry["Detail"].as_str().unwrap_or("").to_string();
1782
1783            if let Err(error) = validate_put_events_entry(&source, &detail_type, &detail) {
1784                failed_count += 1;
1785                result_entries.push(error);
1786                continue;
1787            }
1788
1789            let event_id = uuid::Uuid::new_v4().to_string();
1790            let raw_bus = entry["EventBusName"]
1791                .as_str()
1792                .unwrap_or("default")
1793                .to_string();
1794            let event_bus_name = state.resolve_bus_name(&raw_bus);
1795            let time = parse_put_events_time(&entry["Time"]);
1796            let resources: Vec<String> = entry["Resources"]
1797                .as_array()
1798                .map(|arr| {
1799                    arr.iter()
1800                        .filter_map(|v| v.as_str().map(|s| s.to_string()))
1801                        .collect()
1802                })
1803                .unwrap_or_default();
1804
1805            let event = PutEvent {
1806                event_id: event_id.clone(),
1807                source: source.clone(),
1808                detail_type: detail_type.clone(),
1809                detail: detail.clone(),
1810                event_bus_name: event_bus_name.clone(),
1811                time,
1812                resources: resources.clone(),
1813            };
1814
1815            archive_matching_event(
1816                &mut state,
1817                &event,
1818                &event_bus_name,
1819                &source,
1820                &detail_type,
1821                &detail,
1822                &req.account_id,
1823                &req.region,
1824                &resources,
1825            );
1826
1827            state.events.push(event);
1828
1829            // Find matching rules and their targets
1830            let matching_targets: Vec<EventTarget> = state
1831                .rules
1832                .values()
1833                .filter(|r| {
1834                    r.event_bus_name == event_bus_name
1835                        && r.state == "ENABLED"
1836                        && matches_pattern(
1837                            r.event_pattern.as_deref(),
1838                            &source,
1839                            &detail_type,
1840                            &detail,
1841                            &req.account_id,
1842                            &req.region,
1843                            &resources,
1844                        )
1845                })
1846                .flat_map(|r| r.targets.clone())
1847                .collect();
1848
1849            if !matching_targets.is_empty() {
1850                events_to_deliver.push((
1851                    event_id.clone(),
1852                    source,
1853                    detail_type,
1854                    detail,
1855                    time,
1856                    resources,
1857                    matching_targets,
1858                ));
1859            }
1860
1861            result_entries.push(json!({ "EventId": event_id }));
1862        }
1863
1864        // Drop the lock before delivering
1865        drop(state);
1866
1867        // Deliver to targets
1868        for (event_id, source, detail_type, detail, time, resources, targets) in events_to_deliver {
1869            let detail_value: Value = serde_json::from_str(&detail).unwrap_or(json!({}));
1870            let event_json = json!({
1871                "version": "0",
1872                "id": event_id,
1873                "source": source,
1874                "account": req.account_id,
1875                "detail-type": detail_type,
1876                "detail": detail_value,
1877                "time": time.format("%Y-%m-%dT%H:%M:%SZ").to_string(),
1878                "region": req.region,
1879                "resources": resources,
1880            });
1881            let event_str = event_json.to_string();
1882
1883            for target in targets {
1884                let arn = &target.arn;
1885                // Compute the message body, applying InputTransformer if present
1886                let body_str = if let Some(ref transformer) = target.input_transformer {
1887                    apply_input_transformer(transformer, &event_json)
1888                } else if let Some(ref input) = target.input {
1889                    input.clone()
1890                } else if let Some(ref input_path) = target.input_path {
1891                    resolve_json_path(&event_json, input_path)
1892                        .map(|v| v.to_string())
1893                        .unwrap_or_else(|| event_str.clone())
1894                } else {
1895                    event_str.clone()
1896                };
1897
1898                if arn.contains(":sqs:") {
1899                    // Extract FIFO parameters (MessageGroupId)
1900                    let group_id = target
1901                        .sqs_parameters
1902                        .as_ref()
1903                        .and_then(|p| p["MessageGroupId"].as_str())
1904                        .map(|s| s.to_string());
1905                    if group_id.is_some() {
1906                        // FIFO queue: send with group ID but no dedup ID.
1907                        // Queues with content-based dedup will auto-generate one;
1908                        // queues without it will reject the message.
1909                        self.delivery.send_to_sqs_with_attrs(
1910                            arn,
1911                            &body_str,
1912                            &HashMap::new(),
1913                            group_id.as_deref(),
1914                            None,
1915                        );
1916                    } else {
1917                        self.delivery.send_to_sqs(arn, &body_str, &HashMap::new());
1918                    }
1919                } else if arn.contains(":sns:") {
1920                    self.delivery
1921                        .publish_to_sns(arn, &body_str, Some(&detail_type));
1922                } else if arn.contains(":lambda:") {
1923                    tracing::info!(
1924                        function_arn = %arn,
1925                        payload = %body_str,
1926                        "EventBridge delivering to Lambda function"
1927                    );
1928                    let now = Utc::now();
1929                    let mut state = self.state.write();
1930                    state
1931                        .lambda_invocations
1932                        .push(crate::state::LambdaInvocation {
1933                            function_arn: arn.clone(),
1934                            payload: body_str.clone(),
1935                            timestamp: now,
1936                        });
1937                    drop(state);
1938                    // Record in Lambda state for cross-service visibility
1939                    if let Some(ref ls) = self.lambda_state {
1940                        ls.write().invocations.push(LambdaInvocation {
1941                            function_arn: arn.clone(),
1942                            payload: body_str.clone(),
1943                            timestamp: now,
1944                            source: "aws:events".to_string(),
1945                        });
1946                    }
1947                    // Actually invoke the Lambda function if a container runtime is available
1948                    invoke_lambda_async(
1949                        &self.container_runtime,
1950                        &self.lambda_state,
1951                        arn,
1952                        &body_str,
1953                    );
1954                } else if arn.contains(":logs:") {
1955                    tracing::info!(
1956                        log_group_arn = %arn,
1957                        payload = %body_str,
1958                        "EventBridge delivering to CloudWatch Logs"
1959                    );
1960                    let now = Utc::now();
1961                    let mut state = self.state.write();
1962                    state.log_deliveries.push(crate::state::LogDelivery {
1963                        log_group_arn: arn.clone(),
1964                        payload: body_str.clone(),
1965                        timestamp: now,
1966                    });
1967                    drop(state);
1968                    // Write event to CloudWatch Logs state
1969                    if let Some(ref log_state) = self.logs_state {
1970                        deliver_to_logs(log_state, arn, &body_str, now);
1971                    }
1972                } else if arn.contains(":kinesis:") {
1973                    tracing::info!(
1974                        stream_arn = %arn,
1975                        "EventBridge delivering to Kinesis stream"
1976                    );
1977                    // Use event ID as partition key for even distribution
1978                    self.delivery.send_to_kinesis(arn, &body_str, &event_id);
1979                } else if arn.contains(":states:") {
1980                    tracing::info!(
1981                        state_machine_arn = %arn,
1982                        "EventBridge delivering to Step Functions"
1983                    );
1984                    self.delivery.start_stepfunctions_execution(arn, &body_str);
1985                    let mut state = self.state.write();
1986                    state
1987                        .step_function_executions
1988                        .push(crate::state::StepFunctionExecution {
1989                            state_machine_arn: arn.clone(),
1990                            payload: body_str.clone(),
1991                            timestamp: Utc::now(),
1992                        });
1993                } else if arn.contains(":api-destination/") {
1994                    // ApiDestination target: look up destination + connection, then POST
1995                    let state = self.state.read();
1996                    let dest = state.api_destinations.values().find(|d| d.arn == *arn);
1997                    if let Some(dest) = dest {
1998                        let url = dest.invocation_endpoint.clone();
1999                        let method = dest.http_method.clone();
2000                        let conn = state
2001                            .connections
2002                            .values()
2003                            .find(|c| c.arn == dest.connection_arn)
2004                            .cloned();
2005                        drop(state);
2006
2007                        let payload = body_str.clone();
2008                        tokio::spawn(async move {
2009                            let client = reqwest::Client::new();
2010                            let mut req_builder = match method.as_str() {
2011                                "GET" => client.get(&url),
2012                                "PUT" => client.put(&url),
2013                                "DELETE" => client.delete(&url),
2014                                "PATCH" => client.patch(&url),
2015                                "HEAD" => client.head(&url),
2016                                _ => client.post(&url),
2017                            };
2018                            req_builder = req_builder.header("Content-Type", "application/json");
2019
2020                            // Apply auth from connection
2021                            if let Some(conn) = conn {
2022                                req_builder = apply_connection_auth(req_builder, &conn);
2023                            }
2024
2025                            let result = req_builder.body(payload).send().await;
2026                            if let Err(e) = result {
2027                                tracing::warn!(
2028                                    endpoint = %url,
2029                                    error = %e,
2030                                    "EventBridge ApiDestination delivery failed"
2031                                );
2032                            }
2033                        });
2034                    }
2035                } else if arn.starts_with("https://") || arn.starts_with("http://") {
2036                    // HTTP target — fire-and-forget POST
2037                    let url = arn.clone();
2038                    let payload = body_str.clone();
2039                    tokio::spawn(async move {
2040                        let client = reqwest::Client::new();
2041                        let result = client
2042                            .post(&url)
2043                            .header("Content-Type", "application/json")
2044                            .body(payload)
2045                            .send()
2046                            .await;
2047                        if let Err(e) = result {
2048                            tracing::warn!(
2049                                endpoint = %url,
2050                                error = %e,
2051                                "EventBridge HTTP target delivery failed"
2052                            );
2053                        }
2054                    });
2055                }
2056            }
2057        }
2058
2059        let resp = json!({
2060            "FailedEntryCount": failed_count,
2061            "Entries": result_entries,
2062        });
2063
2064        Ok(AwsResponse::ok_json(resp))
2065    }
2066
2067    // ─── Tagging ────────────────────────────────────────────────────────
2068
2069    fn tag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2070        let body = req.json_body();
2071        validate_required("ResourceARN", &body["ResourceARN"])?;
2072        let arn = body["ResourceARN"]
2073            .as_str()
2074            .ok_or_else(|| missing("ResourceARN"))?;
2075        validate_string_length("resourceARN", arn, 1, 1600)?;
2076        validate_required("Tags", &body["Tags"])?;
2077
2078        let mut state = self.state.write();
2079        let tag_map = find_tags_mut(&mut state, arn)?;
2080
2081        fakecloud_core::tags::apply_tags(tag_map, &body, "Tags", "Key", "Value").map_err(|f| {
2082            AwsServiceError::aws_error(
2083                StatusCode::BAD_REQUEST,
2084                "ValidationException",
2085                format!("{f} must be a list"),
2086            )
2087        })?;
2088
2089        Ok(AwsResponse::ok_json(json!({})))
2090    }
2091
2092    fn untag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2093        let body = req.json_body();
2094        validate_required("ResourceARN", &body["ResourceARN"])?;
2095        let arn = body["ResourceARN"]
2096            .as_str()
2097            .ok_or_else(|| missing("ResourceARN"))?;
2098        validate_string_length("resourceARN", arn, 1, 1600)?;
2099        validate_required("TagKeys", &body["TagKeys"])?;
2100
2101        let mut state = self.state.write();
2102        let tag_map = find_tags_mut(&mut state, arn)?;
2103
2104        fakecloud_core::tags::remove_tags(tag_map, &body, "TagKeys").map_err(|f| {
2105            AwsServiceError::aws_error(
2106                StatusCode::BAD_REQUEST,
2107                "ValidationException",
2108                format!("{f} must be a list"),
2109            )
2110        })?;
2111
2112        Ok(AwsResponse::ok_json(json!({})))
2113    }
2114
2115    fn list_tags_for_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2116        let body = req.json_body();
2117        validate_required("ResourceARN", &body["ResourceARN"])?;
2118        let arn = body["ResourceARN"]
2119            .as_str()
2120            .ok_or_else(|| missing("ResourceARN"))?;
2121        validate_string_length("resourceARN", arn, 1, 1600)?;
2122
2123        let state = self.state.read();
2124        let tag_map = find_tags(&state, arn)?;
2125
2126        let tags = fakecloud_core::tags::tags_to_json(tag_map, "Key", "Value");
2127
2128        Ok(AwsResponse::ok_json(json!({ "Tags": tags })))
2129    }
2130
2131    // ─── Archive Operations ─────────────────────────────────────────────
2132
2133    fn create_archive(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2134        let body = req.json_body();
2135        validate_required("ArchiveName", &body["ArchiveName"])?;
2136        let name = body["ArchiveName"]
2137            .as_str()
2138            .ok_or_else(|| missing("ArchiveName"))?
2139            .to_string();
2140        validate_string_length("archiveName", &name, 1, 48)?;
2141        validate_required("EventSourceArn", &body["EventSourceArn"])?;
2142        let event_source_arn = body["EventSourceArn"]
2143            .as_str()
2144            .ok_or_else(|| missing("EventSourceArn"))?
2145            .to_string();
2146        validate_string_length("eventSourceArn", &event_source_arn, 1, 1600)?;
2147        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2148        validate_optional_string_length("eventPattern", body["EventPattern"].as_str(), 0, 4096)?;
2149        if let Some(rd) = body["RetentionDays"].as_i64() {
2150            validate_range_i64("retentionDays", rd, 0, i64::MAX)?;
2151        }
2152        let description = body["Description"].as_str().map(|s| s.to_string());
2153        let event_pattern = body["EventPattern"].as_str().map(|s| s.to_string());
2154        let retention_days = body["RetentionDays"].as_i64().unwrap_or(0);
2155
2156        // Validate event pattern if provided
2157        if let Some(ref pattern) = event_pattern {
2158            validate_event_pattern(pattern)?;
2159        }
2160
2161        let mut state = self.state.write();
2162
2163        // Validate event bus exists
2164        let bus_name = state.resolve_bus_name(&event_source_arn);
2165        if !state.buses.contains_key(&bus_name) {
2166            return Err(AwsServiceError::aws_error(
2167                StatusCode::BAD_REQUEST,
2168                "ResourceNotFoundException",
2169                format!("Event bus {bus_name} does not exist."),
2170            ));
2171        }
2172
2173        // Check duplicate
2174        if state.archives.contains_key(&name) {
2175            return Err(AwsServiceError::aws_error(
2176                StatusCode::BAD_REQUEST,
2177                "ResourceAlreadyExistsException",
2178                format!("Archive {name} already exists."),
2179            ));
2180        }
2181
2182        let now = Utc::now();
2183        let arn = format!(
2184            "arn:aws:events:{}:{}:archive/{}",
2185            req.region, state.account_id, name
2186        );
2187
2188        let archive = Archive {
2189            name: name.clone(),
2190            arn: arn.clone(),
2191            event_source_arn: event_source_arn.clone(),
2192            description,
2193            event_pattern: event_pattern.clone(),
2194            retention_days,
2195            state: "ENABLED".to_string(),
2196            creation_time: now,
2197            event_count: 0,
2198            size_bytes: 0,
2199            events: Vec::new(),
2200        };
2201        state.archives.insert(name.clone(), archive);
2202
2203        // Create the archive rule
2204        let rule_name = format!("Events-Archive-{name}");
2205        let rule_arn = format!(
2206            "arn:aws:events:{}:{}:rule/{}",
2207            req.region, state.account_id, rule_name
2208        );
2209        // Merge archive event pattern with replay-name filter
2210        let rule_event_pattern = {
2211            let mut merged = if let Some(ref ep) = event_pattern {
2212                serde_json::from_str::<Value>(ep).unwrap_or_else(|_| json!({}))
2213            } else {
2214                json!({})
2215            };
2216            if let Some(obj) = merged.as_object_mut() {
2217                obj.insert("replay-name".to_string(), json!([{"exists": false}]));
2218            }
2219            serde_json::to_string(&merged).unwrap_or_default()
2220        };
2221
2222        // Build the archive target with InputTransformer
2223        let archive_target = EventTarget {
2224            id: name.clone(),
2225            arn: format!("arn:aws:events:{}:::", req.region),
2226            input: None,
2227            input_path: None,
2228            input_transformer: Some(json!({
2229                "InputPathsMap": {},
2230                "InputTemplate": format!(
2231                    "{{\"archive-arn\": \"{}\", \"event\": <aws.events.event.json>, \"ingestion-time\": <aws.events.event.ingestion-time>}}",
2232                    arn
2233                )
2234            })),
2235            sqs_parameters: None,
2236        };
2237
2238        let archive_rule = EventRule {
2239            name: rule_name.clone(),
2240            arn: rule_arn,
2241            event_bus_name: bus_name.clone(),
2242            event_pattern: Some(rule_event_pattern),
2243            schedule_expression: None,
2244            state: "ENABLED".to_string(),
2245            description: None,
2246            role_arn: None,
2247            managed_by: Some("prod.vhs.events.aws.internal".to_string()),
2248            created_by: Some(state.account_id.clone()),
2249            targets: vec![archive_target],
2250            tags: HashMap::new(),
2251            last_fired: None,
2252        };
2253        let key = (bus_name, rule_name);
2254        state.rules.insert(key, archive_rule);
2255
2256        Ok(AwsResponse::ok_json(json!({
2257            "ArchiveArn": arn,
2258            "CreationTime": now.timestamp() as f64,
2259            "State": "ENABLED",
2260        })))
2261    }
2262
2263    fn describe_archive(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2264        let body = req.json_body();
2265        validate_required("ArchiveName", &body["ArchiveName"])?;
2266        let name = body["ArchiveName"]
2267            .as_str()
2268            .ok_or_else(|| missing("ArchiveName"))?;
2269        validate_string_length("archiveName", name, 1, 48)?;
2270
2271        let state = self.state.read();
2272        let archive = state.archives.get(name).ok_or_else(|| {
2273            AwsServiceError::aws_error(
2274                StatusCode::BAD_REQUEST,
2275                "ResourceNotFoundException",
2276                format!("Archive {name} does not exist."),
2277            )
2278        })?;
2279
2280        let mut resp = json!({
2281            "ArchiveArn": archive.arn,
2282            "ArchiveName": archive.name,
2283            "CreationTime": archive.creation_time.timestamp() as f64,
2284            "EventCount": archive.event_count,
2285            "EventSourceArn": archive.event_source_arn,
2286            "RetentionDays": archive.retention_days,
2287            "SizeBytes": archive.size_bytes,
2288            "State": archive.state,
2289        });
2290        if let Some(ref desc) = archive.description {
2291            resp["Description"] = json!(desc);
2292        }
2293        if let Some(ref ep) = archive.event_pattern {
2294            resp["EventPattern"] = json!(ep);
2295        }
2296
2297        Ok(AwsResponse::ok_json(resp))
2298    }
2299
2300    fn list_archives(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2301        let body = req.json_body();
2302        validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 48)?;
2303        validate_optional_string_length(
2304            "eventSourceArn",
2305            body["EventSourceArn"].as_str(),
2306            1,
2307            1600,
2308        )?;
2309        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
2310        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
2311        let name_prefix = body["NamePrefix"].as_str();
2312        let source_arn = body["EventSourceArn"].as_str();
2313        let archive_state = body["State"].as_str();
2314
2315        // Validate at most one filter
2316        let filter_count = [
2317            name_prefix.is_some(),
2318            source_arn.is_some(),
2319            archive_state.is_some(),
2320        ]
2321        .iter()
2322        .filter(|&&x| x)
2323        .count();
2324        if filter_count > 1 {
2325            return Err(AwsServiceError::aws_error(
2326                StatusCode::BAD_REQUEST,
2327                "ValidationException",
2328                "At most one filter is allowed for ListArchives. Use either : State, EventSourceArn, or NamePrefix.",
2329            ));
2330        }
2331
2332        // Validate state
2333        if let Some(s) = archive_state {
2334            let valid = [
2335                "ENABLED",
2336                "DISABLED",
2337                "CREATING",
2338                "UPDATING",
2339                "CREATE_FAILED",
2340                "UPDATE_FAILED",
2341            ];
2342            if !valid.contains(&s) {
2343                return Err(AwsServiceError::aws_error(
2344                    StatusCode::BAD_REQUEST,
2345                    "ValidationException",
2346                    format!(
2347                        "1 validation error detected: Value '{}' at 'state' failed to satisfy constraint: Member must satisfy enum value set: [ENABLED, DISABLED, CREATING, UPDATING, CREATE_FAILED, UPDATE_FAILED]",
2348                        s
2349                    ),
2350                ));
2351            }
2352        }
2353
2354        let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
2355
2356        let state = self.state.read();
2357        let all: Vec<Value> = state
2358            .archives
2359            .values()
2360            .filter(|a| {
2361                if let Some(prefix) = name_prefix {
2362                    a.name.starts_with(prefix)
2363                } else if let Some(arn) = source_arn {
2364                    a.event_source_arn == arn
2365                } else if let Some(s) = archive_state {
2366                    a.state == s
2367                } else {
2368                    true
2369                }
2370            })
2371            .map(|a| {
2372                json!({
2373                    "ArchiveName": a.name,
2374                    "CreationTime": a.creation_time.timestamp() as f64,
2375                    "EventCount": a.event_count,
2376                    "EventSourceArn": a.event_source_arn,
2377                    "RetentionDays": a.retention_days,
2378                    "SizeBytes": a.size_bytes,
2379                    "State": a.state,
2380                })
2381            })
2382            .collect();
2383
2384        let (archives, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
2385        let mut resp = json!({ "Archives": archives });
2386        if let Some(token) = next_token {
2387            resp["NextToken"] = json!(token);
2388        }
2389
2390        Ok(AwsResponse::ok_json(resp))
2391    }
2392
2393    fn update_archive(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2394        let body = req.json_body();
2395        validate_required("ArchiveName", &body["ArchiveName"])?;
2396        let name = body["ArchiveName"]
2397            .as_str()
2398            .ok_or_else(|| missing("ArchiveName"))?;
2399        validate_string_length("archiveName", name, 1, 48)?;
2400        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2401        validate_optional_string_length("eventPattern", body["EventPattern"].as_str(), 0, 4096)?;
2402        if let Some(rd) = body["RetentionDays"].as_i64() {
2403            validate_range_i64("retentionDays", rd, 0, i64::MAX)?;
2404        }
2405
2406        // Validate event pattern if provided
2407        if let Some(pattern) = body["EventPattern"].as_str() {
2408            validate_event_pattern(pattern)?;
2409        }
2410
2411        let mut state = self.state.write();
2412        let archive = state.archives.get_mut(name).ok_or_else(|| {
2413            AwsServiceError::aws_error(
2414                StatusCode::BAD_REQUEST,
2415                "ResourceNotFoundException",
2416                format!("Archive {name} does not exist."),
2417            )
2418        })?;
2419
2420        if let Some(desc) = body["Description"].as_str() {
2421            archive.description = Some(desc.to_string());
2422        }
2423        if let Some(pattern) = body["EventPattern"].as_str() {
2424            archive.event_pattern = Some(pattern.to_string());
2425        }
2426        if let Some(days) = body["RetentionDays"].as_i64() {
2427            archive.retention_days = days;
2428        }
2429
2430        Ok(AwsResponse::ok_json(json!({
2431            "ArchiveArn": archive.arn,
2432            "CreationTime": archive.creation_time.timestamp() as f64,
2433            "State": archive.state,
2434        })))
2435    }
2436
2437    fn delete_archive(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2438        let body = req.json_body();
2439        validate_required("ArchiveName", &body["ArchiveName"])?;
2440        let name = body["ArchiveName"]
2441            .as_str()
2442            .ok_or_else(|| missing("ArchiveName"))?;
2443        validate_string_length("archiveName", name, 1, 48)?;
2444
2445        let mut state = self.state.write();
2446        if !state.archives.contains_key(name) {
2447            return Err(AwsServiceError::aws_error(
2448                StatusCode::BAD_REQUEST,
2449                "ResourceNotFoundException",
2450                format!("Archive {name} does not exist."),
2451            ));
2452        }
2453
2454        state.archives.remove(name);
2455
2456        // Remove the archive rule
2457        let rule_name = format!("Events-Archive-{name}");
2458        state.rules.retain(|k, _| k.1 != rule_name);
2459
2460        Ok(AwsResponse::ok_json(json!({})))
2461    }
2462
2463    // ─── Connection Operations ──────────────────────────────────────────
2464
2465    fn create_connection(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2466        let body = req.json_body();
2467        validate_required("Name", &body["Name"])?;
2468        let name = body["Name"]
2469            .as_str()
2470            .ok_or_else(|| missing("Name"))?
2471            .to_string();
2472        validate_string_length("name", &name, 1, 64)?;
2473        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2474        validate_required("AuthorizationType", &body["AuthorizationType"])?;
2475        let description = body["Description"].as_str().map(|s| s.to_string());
2476        let auth_type = body["AuthorizationType"]
2477            .as_str()
2478            .ok_or_else(|| missing("AuthorizationType"))?
2479            .to_string();
2480        validate_enum(
2481            "authorizationType",
2482            &auth_type,
2483            &["BASIC", "OAUTH_CLIENT_CREDENTIALS", "API_KEY"],
2484        )?;
2485        validate_optional_string_length(
2486            "kmsKeyIdentifier",
2487            body["KmsKeyIdentifier"].as_str(),
2488            0,
2489            2048,
2490        )?;
2491        validate_required("AuthParameters", &body["AuthParameters"])?;
2492        let auth_params = body["AuthParameters"].clone();
2493
2494        let mut state = self.state.write();
2495        let now = Utc::now();
2496        let conn_uuid = uuid::Uuid::new_v4();
2497        let arn = format!(
2498            "arn:aws:events:{}:{}:connection/{}/{}",
2499            req.region, state.account_id, name, conn_uuid
2500        );
2501        let secret_arn = format!(
2502            "arn:aws:secretsmanager:{}:{}:secret:events!connection/{}/{}",
2503            req.region, state.account_id, name, conn_uuid
2504        );
2505
2506        let conn = Connection {
2507            name: name.clone(),
2508            arn: arn.clone(),
2509            description,
2510            authorization_type: auth_type.clone(),
2511            auth_parameters: auth_params,
2512            connection_state: "AUTHORIZED".to_string(),
2513            secret_arn: secret_arn.clone(),
2514            creation_time: now,
2515            last_modified_time: now,
2516            last_authorized_time: now,
2517        };
2518        state.connections.insert(name, conn);
2519
2520        Ok(AwsResponse::ok_json(json!({
2521            "ConnectionArn": arn,
2522            "ConnectionState": "AUTHORIZED",
2523            "CreationTime": now.timestamp() as f64,
2524            "LastModifiedTime": now.timestamp() as f64,
2525        })))
2526    }
2527
2528    fn describe_connection(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2529        let body = req.json_body();
2530        validate_required("Name", &body["Name"])?;
2531        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
2532        validate_string_length("name", name, 1, 64)?;
2533
2534        let state = self.state.read();
2535        let conn = state.connections.get(name).ok_or_else(|| {
2536            AwsServiceError::aws_error(
2537                StatusCode::BAD_REQUEST,
2538                "ResourceNotFoundException",
2539                format!("Connection '{name}' does not exist."),
2540            )
2541        })?;
2542
2543        // Build auth parameters response - strip secrets
2544        let auth_params_response =
2545            build_auth_params_response(&conn.authorization_type, &conn.auth_parameters);
2546
2547        let mut resp = json!({
2548            "ConnectionArn": conn.arn,
2549            "Name": conn.name,
2550            "AuthorizationType": conn.authorization_type,
2551            "AuthParameters": auth_params_response,
2552            "ConnectionState": conn.connection_state,
2553            "SecretArn": conn.secret_arn,
2554            "CreationTime": conn.creation_time.timestamp() as f64,
2555            "LastModifiedTime": conn.last_modified_time.timestamp() as f64,
2556            "LastAuthorizedTime": conn.last_authorized_time.timestamp() as f64,
2557        });
2558        if let Some(ref desc) = conn.description {
2559            resp["Description"] = json!(desc);
2560        }
2561
2562        Ok(AwsResponse::ok_json(resp))
2563    }
2564
2565    fn list_connections(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2566        let body = req.json_body();
2567        validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 64)?;
2568        validate_optional_enum(
2569            "connectionState",
2570            body["ConnectionState"].as_str(),
2571            &[
2572                "CREATING",
2573                "UPDATING",
2574                "DELETING",
2575                "AUTHORIZED",
2576                "DEAUTHORIZED",
2577                "AUTHORIZING",
2578                "DEAUTHORIZING",
2579                "ACTIVE",
2580                "FAILED_CONNECTIVITY",
2581            ],
2582        )?;
2583        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
2584        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
2585
2586        let name_prefix = body["NamePrefix"].as_str();
2587        let connection_state = body["ConnectionState"].as_str();
2588        let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
2589
2590        let state = self.state.read();
2591        let all: Vec<Value> = state
2592            .connections
2593            .values()
2594            .filter(|c| {
2595                if let Some(prefix) = name_prefix {
2596                    if !c.name.starts_with(prefix) {
2597                        return false;
2598                    }
2599                }
2600                if let Some(cs) = connection_state {
2601                    if c.connection_state != cs {
2602                        return false;
2603                    }
2604                }
2605                true
2606            })
2607            .map(|c| {
2608                json!({
2609                    "ConnectionArn": c.arn,
2610                    "Name": c.name,
2611                    "AuthorizationType": c.authorization_type,
2612                    "ConnectionState": c.connection_state,
2613                    "CreationTime": c.creation_time.timestamp() as f64,
2614                    "LastModifiedTime": c.last_modified_time.timestamp() as f64,
2615                    "LastAuthorizedTime": c.last_authorized_time.timestamp() as f64,
2616                })
2617            })
2618            .collect();
2619
2620        let (conns, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
2621        let mut resp = json!({ "Connections": conns });
2622        if let Some(token) = next_token {
2623            resp["NextToken"] = json!(token);
2624        }
2625
2626        Ok(AwsResponse::ok_json(resp))
2627    }
2628
2629    fn update_connection(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2630        let body = req.json_body();
2631        validate_required("Name", &body["Name"])?;
2632        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
2633        validate_string_length("name", name, 1, 64)?;
2634        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2635        validate_optional_enum(
2636            "authorizationType",
2637            body["AuthorizationType"].as_str(),
2638            &["BASIC", "OAUTH_CLIENT_CREDENTIALS", "API_KEY"],
2639        )?;
2640
2641        let mut state = self.state.write();
2642        let conn = state.connections.get_mut(name).ok_or_else(|| {
2643            AwsServiceError::aws_error(
2644                StatusCode::BAD_REQUEST,
2645                "ResourceNotFoundException",
2646                format!("Connection '{name}' does not exist."),
2647            )
2648        })?;
2649
2650        if let Some(desc) = body["Description"].as_str() {
2651            conn.description = Some(desc.to_string());
2652        }
2653        if let Some(auth_type) = body["AuthorizationType"].as_str() {
2654            conn.authorization_type = auth_type.to_string();
2655        }
2656        if body.get("AuthParameters").is_some() {
2657            conn.auth_parameters = body["AuthParameters"].clone();
2658        }
2659        conn.last_modified_time = Utc::now();
2660
2661        Ok(AwsResponse::ok_json(json!({
2662            "ConnectionArn": conn.arn,
2663            "ConnectionState": conn.connection_state,
2664            "CreationTime": conn.creation_time.timestamp() as f64,
2665            "LastModifiedTime": conn.last_modified_time.timestamp() as f64,
2666            "LastAuthorizedTime": conn.last_authorized_time.timestamp() as f64,
2667        })))
2668    }
2669
2670    fn delete_connection(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2671        let body = req.json_body();
2672        validate_required("Name", &body["Name"])?;
2673        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
2674        validate_string_length("name", name, 1, 64)?;
2675
2676        let mut state = self.state.write();
2677        let conn = state.connections.remove(name).ok_or_else(|| {
2678            AwsServiceError::aws_error(
2679                StatusCode::BAD_REQUEST,
2680                "ResourceNotFoundException",
2681                format!("Connection '{name}' does not exist."),
2682            )
2683        })?;
2684
2685        Ok(AwsResponse::ok_json(json!({
2686            "ConnectionArn": conn.arn,
2687            "ConnectionState": conn.connection_state,
2688            "CreationTime": conn.creation_time.timestamp() as f64,
2689            "LastModifiedTime": conn.last_modified_time.timestamp() as f64,
2690            "LastAuthorizedTime": conn.last_authorized_time.timestamp() as f64,
2691        })))
2692    }
2693
2694    // ─── API Destination Operations ─────────────────────────────────────
2695
2696    fn create_api_destination(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2697        let body = req.json_body();
2698        validate_required("Name", &body["Name"])?;
2699        let name = body["Name"]
2700            .as_str()
2701            .ok_or_else(|| missing("Name"))?
2702            .to_string();
2703        validate_string_length("name", &name, 1, 64)?;
2704        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2705        validate_required("ConnectionArn", &body["ConnectionArn"])?;
2706        let description = body["Description"].as_str().map(|s| s.to_string());
2707        let connection_arn = body["ConnectionArn"]
2708            .as_str()
2709            .ok_or_else(|| missing("ConnectionArn"))?
2710            .to_string();
2711        validate_string_length("connectionArn", &connection_arn, 1, 1600)?;
2712        validate_required("InvocationEndpoint", &body["InvocationEndpoint"])?;
2713        let endpoint = body["InvocationEndpoint"]
2714            .as_str()
2715            .ok_or_else(|| missing("InvocationEndpoint"))?
2716            .to_string();
2717        validate_string_length("invocationEndpoint", &endpoint, 1, 2048)?;
2718        validate_required("HttpMethod", &body["HttpMethod"])?;
2719        let http_method = body["HttpMethod"]
2720            .as_str()
2721            .ok_or_else(|| missing("HttpMethod"))?
2722            .to_string();
2723        validate_enum(
2724            "httpMethod",
2725            &http_method,
2726            &["POST", "GET", "HEAD", "OPTIONS", "PUT", "PATCH", "DELETE"],
2727        )?;
2728        let rate_limit = body["InvocationRateLimitPerSecond"].as_i64();
2729        if let Some(r) = rate_limit {
2730            validate_range_i64("invocationRateLimitPerSecond", r, 1, i64::MAX)?;
2731        }
2732
2733        let mut state = self.state.write();
2734        let now = Utc::now();
2735        let dest_uuid = uuid::Uuid::new_v4();
2736        let arn = format!(
2737            "arn:aws:events:{}:{}:api-destination/{}/{}",
2738            req.region, state.account_id, name, dest_uuid
2739        );
2740
2741        let dest = ApiDestination {
2742            name: name.clone(),
2743            arn: arn.clone(),
2744            description,
2745            connection_arn,
2746            invocation_endpoint: endpoint,
2747            http_method,
2748            invocation_rate_limit_per_second: rate_limit,
2749            state: "ACTIVE".to_string(),
2750            creation_time: now,
2751            last_modified_time: now,
2752        };
2753        state.api_destinations.insert(name, dest);
2754
2755        Ok(AwsResponse::ok_json(json!({
2756            "ApiDestinationArn": arn,
2757            "ApiDestinationState": "ACTIVE",
2758            "CreationTime": now.timestamp() as f64,
2759            "LastModifiedTime": now.timestamp() as f64,
2760        })))
2761    }
2762
2763    fn describe_api_destination(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2764        let body = req.json_body();
2765        validate_required("Name", &body["Name"])?;
2766        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
2767        validate_string_length("name", name, 1, 64)?;
2768
2769        let state = self.state.read();
2770        let dest = state.api_destinations.get(name).ok_or_else(|| {
2771            AwsServiceError::aws_error(
2772                StatusCode::BAD_REQUEST,
2773                "ResourceNotFoundException",
2774                format!("An api-destination '{name}' does not exist."),
2775            )
2776        })?;
2777
2778        let mut resp = json!({
2779            "ApiDestinationArn": dest.arn,
2780            "Name": dest.name,
2781            "ConnectionArn": dest.connection_arn,
2782            "InvocationEndpoint": dest.invocation_endpoint,
2783            "HttpMethod": dest.http_method,
2784            "ApiDestinationState": dest.state,
2785            "CreationTime": dest.creation_time.timestamp() as f64,
2786            "LastModifiedTime": dest.last_modified_time.timestamp() as f64,
2787        });
2788        if let Some(ref desc) = dest.description {
2789            resp["Description"] = json!(desc);
2790        }
2791        if let Some(rate) = dest.invocation_rate_limit_per_second {
2792            resp["InvocationRateLimitPerSecond"] = json!(rate);
2793        }
2794
2795        Ok(AwsResponse::ok_json(resp))
2796    }
2797
2798    fn list_api_destinations(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2799        let body = req.json_body();
2800        validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 64)?;
2801        validate_optional_string_length("connectionArn", body["ConnectionArn"].as_str(), 1, 1600)?;
2802        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
2803        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
2804
2805        let name_prefix = body["NamePrefix"].as_str();
2806        let connection_arn = body["ConnectionArn"].as_str();
2807        let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
2808
2809        let state = self.state.read();
2810        let all: Vec<Value> = state
2811            .api_destinations
2812            .values()
2813            .filter(|d| {
2814                if let Some(prefix) = name_prefix {
2815                    if !d.name.starts_with(prefix) {
2816                        return false;
2817                    }
2818                }
2819                if let Some(arn) = connection_arn {
2820                    if d.connection_arn != arn {
2821                        return false;
2822                    }
2823                }
2824                true
2825            })
2826            .map(|d| {
2827                let mut obj = json!({
2828                    "ApiDestinationArn": d.arn,
2829                    "Name": d.name,
2830                    "ConnectionArn": d.connection_arn,
2831                    "InvocationEndpoint": d.invocation_endpoint,
2832                    "HttpMethod": d.http_method,
2833                    "ApiDestinationState": d.state,
2834                    "CreationTime": d.creation_time.timestamp() as f64,
2835                    "LastModifiedTime": d.last_modified_time.timestamp() as f64,
2836                });
2837                if let Some(rate) = d.invocation_rate_limit_per_second {
2838                    obj["InvocationRateLimitPerSecond"] = json!(rate);
2839                }
2840                obj
2841            })
2842            .collect();
2843
2844        let (dests, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
2845        let mut resp = json!({ "ApiDestinations": dests });
2846        if let Some(token) = next_token {
2847            resp["NextToken"] = json!(token);
2848        }
2849
2850        Ok(AwsResponse::ok_json(resp))
2851    }
2852
2853    fn update_api_destination(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2854        let body = req.json_body();
2855        validate_required("Name", &body["Name"])?;
2856        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
2857        validate_string_length("name", name, 1, 64)?;
2858        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2859        validate_optional_string_length("connectionArn", body["ConnectionArn"].as_str(), 1, 1600)?;
2860        validate_optional_string_length(
2861            "invocationEndpoint",
2862            body["InvocationEndpoint"].as_str(),
2863            1,
2864            2048,
2865        )?;
2866        validate_optional_enum(
2867            "httpMethod",
2868            body["HttpMethod"].as_str(),
2869            &["POST", "GET", "HEAD", "OPTIONS", "PUT", "PATCH", "DELETE"],
2870        )?;
2871        if let Some(r) = body["InvocationRateLimitPerSecond"].as_i64() {
2872            validate_range_i64("invocationRateLimitPerSecond", r, 1, i64::MAX)?;
2873        }
2874
2875        let mut state = self.state.write();
2876        let dest = state.api_destinations.get_mut(name).ok_or_else(|| {
2877            AwsServiceError::aws_error(
2878                StatusCode::BAD_REQUEST,
2879                "ResourceNotFoundException",
2880                format!("An api-destination '{name}' does not exist."),
2881            )
2882        })?;
2883
2884        if let Some(desc) = body["Description"].as_str() {
2885            dest.description = Some(desc.to_string());
2886        }
2887        if let Some(endpoint) = body["InvocationEndpoint"].as_str() {
2888            dest.invocation_endpoint = endpoint.to_string();
2889        }
2890        if let Some(method) = body["HttpMethod"].as_str() {
2891            dest.http_method = method.to_string();
2892        }
2893        if let Some(rate) = body["InvocationRateLimitPerSecond"].as_i64() {
2894            dest.invocation_rate_limit_per_second = Some(rate);
2895        }
2896        if let Some(conn) = body["ConnectionArn"].as_str() {
2897            dest.connection_arn = conn.to_string();
2898        }
2899        dest.last_modified_time = Utc::now();
2900
2901        Ok(AwsResponse::ok_json(json!({
2902            "ApiDestinationArn": dest.arn,
2903            "ApiDestinationState": dest.state,
2904            "CreationTime": dest.creation_time.timestamp() as f64,
2905            "LastModifiedTime": dest.last_modified_time.timestamp() as f64,
2906        })))
2907    }
2908
2909    fn delete_api_destination(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2910        let body = req.json_body();
2911        validate_required("Name", &body["Name"])?;
2912        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
2913        validate_string_length("name", name, 1, 64)?;
2914
2915        let mut state = self.state.write();
2916        if !state.api_destinations.contains_key(name) {
2917            return Err(AwsServiceError::aws_error(
2918                StatusCode::BAD_REQUEST,
2919                "ResourceNotFoundException",
2920                format!("An api-destination '{name}' does not exist."),
2921            ));
2922        }
2923        state.api_destinations.remove(name);
2924
2925        Ok(AwsResponse::ok_json(json!({})))
2926    }
2927
2928    // ─── Replay Operations ──────────────────────────────────────────────
2929
2930    fn start_replay(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2931        let input = StartReplayInput::from_body(&req.json_body())?;
2932
2933        let mut state = self.state.write();
2934
2935        // Validate event bus + archive, in the order the real service validates them.
2936        let bus_name = state.resolve_bus_name(&input.destination_arn);
2937        if !state.buses.contains_key(&bus_name) {
2938            return Err(AwsServiceError::aws_error(
2939                StatusCode::BAD_REQUEST,
2940                "ResourceNotFoundException",
2941                format!("Event bus {bus_name} does not exist."),
2942            ));
2943        }
2944
2945        let archive_name = input
2946            .event_source_arn
2947            .rsplit_once("archive/")
2948            .map(|(_, n)| n.to_string())
2949            .unwrap_or_default();
2950        let archive = state.archives.get(&archive_name).ok_or_else(|| {
2951            AwsServiceError::aws_error(
2952                StatusCode::BAD_REQUEST,
2953                "ValidationException",
2954                format!(
2955                    "Parameter EventSourceArn is not valid. Reason: Archive {archive_name} does not exist."
2956                ),
2957            )
2958        })?;
2959        let archive_bus = state.resolve_bus_name(&archive.event_source_arn);
2960        if archive_bus != bus_name {
2961            return Err(AwsServiceError::aws_error(
2962                StatusCode::BAD_REQUEST,
2963                "ValidationException",
2964                "Parameter Destination.Arn is not valid. Reason: Cross event bus replay is not permitted.",
2965            ));
2966        }
2967
2968        if input.event_end_time <= input.event_start_time {
2969            return Err(AwsServiceError::aws_error(
2970                StatusCode::BAD_REQUEST,
2971                "ValidationException",
2972                "Parameter EventEndTime is not valid. Reason: EventStartTime must be before EventEndTime.",
2973            ));
2974        }
2975
2976        if state.replays.contains_key(&input.name) {
2977            return Err(AwsServiceError::aws_error(
2978                StatusCode::BAD_REQUEST,
2979                "ResourceAlreadyExistsException",
2980                format!("Replay {} already exists.", input.name),
2981            ));
2982        }
2983
2984        let now = Utc::now();
2985        let arn = format!(
2986            "arn:aws:events:{}:{}:replay/{}",
2987            req.region, state.account_id, input.name
2988        );
2989
2990        let events_to_deliver = collect_replay_events_with_targets(
2991            &state,
2992            &archive_name,
2993            &bus_name,
2994            input.event_start_time,
2995            input.event_end_time,
2996            &req.account_id,
2997            &req.region,
2998        );
2999
3000        let replay = Replay {
3001            name: input.name.clone(),
3002            arn: arn.clone(),
3003            description: input.description,
3004            event_source_arn: input.event_source_arn,
3005            destination: input.destination,
3006            event_start_time: input.event_start_time,
3007            event_end_time: input.event_end_time,
3008            state: "COMPLETED".to_string(),
3009            replay_start_time: now,
3010            replay_end_time: Some(now),
3011        };
3012        state.replays.insert(input.name, replay);
3013
3014        drop(state);
3015
3016        for (event, targets) in events_to_deliver {
3017            let detail_value: Value = serde_json::from_str(&event.detail).unwrap_or(json!({}));
3018            let event_json = json!({
3019                "version": "0",
3020                "id": event.event_id,
3021                "source": event.source,
3022                "account": req.account_id,
3023                "detail-type": event.detail_type,
3024                "detail": detail_value,
3025                "time": event.time.format("%Y-%m-%dT%H:%M:%SZ").to_string(),
3026                "region": req.region,
3027                "resources": event.resources,
3028                "replay-name": arn,
3029            });
3030            let event_str = event_json.to_string();
3031
3032            for target in targets {
3033                self.deliver_replay_event_to_target(&target, &event, &event_json, &event_str);
3034            }
3035        }
3036
3037        Ok(AwsResponse::ok_json(json!({
3038            "ReplayArn": arn,
3039            "ReplayStartTime": now.timestamp() as f64,
3040            "State": "STARTING",
3041        })))
3042    }
3043
3044    fn deliver_replay_event_to_target(
3045        &self,
3046        target: &EventTarget,
3047        event: &PutEvent,
3048        event_json: &Value,
3049        event_str: &str,
3050    ) {
3051        let target_arn = &target.arn;
3052        let body_str = if let Some(ref transformer) = target.input_transformer {
3053            apply_input_transformer(transformer, event_json)
3054        } else if let Some(ref input) = target.input {
3055            input.clone()
3056        } else if let Some(ref input_path) = target.input_path {
3057            resolve_json_path(event_json, input_path)
3058                .map(|v| v.to_string())
3059                .unwrap_or_else(|| event_str.to_string())
3060        } else {
3061            event_str.to_string()
3062        };
3063
3064        if target_arn.contains(":sqs:") {
3065            let group_id = target
3066                .sqs_parameters
3067                .as_ref()
3068                .and_then(|p| p["MessageGroupId"].as_str())
3069                .map(|s| s.to_string());
3070            if group_id.is_some() {
3071                self.delivery.send_to_sqs_with_attrs(
3072                    target_arn,
3073                    &body_str,
3074                    &HashMap::new(),
3075                    group_id.as_deref(),
3076                    None,
3077                );
3078            } else {
3079                self.delivery
3080                    .send_to_sqs(target_arn, &body_str, &HashMap::new());
3081            }
3082        } else if target_arn.contains(":sns:") {
3083            self.delivery
3084                .publish_to_sns(target_arn, &body_str, Some(&event.detail_type));
3085        } else if target_arn.contains(":lambda:") {
3086            let mut state = self.state.write();
3087            state
3088                .lambda_invocations
3089                .push(crate::state::LambdaInvocation {
3090                    function_arn: target_arn.clone(),
3091                    payload: body_str.clone(),
3092                    timestamp: Utc::now(),
3093                });
3094            drop(state);
3095            if let Some(ref ls) = self.lambda_state {
3096                ls.write().invocations.push(LambdaInvocation {
3097                    function_arn: target_arn.clone(),
3098                    payload: body_str.clone(),
3099                    timestamp: Utc::now(),
3100                    source: "aws:events".to_string(),
3101                });
3102            }
3103            invoke_lambda_async(
3104                &self.container_runtime,
3105                &self.lambda_state,
3106                target_arn,
3107                &body_str,
3108            );
3109        } else if target_arn.contains(":logs:") {
3110            let mut state = self.state.write();
3111            state.log_deliveries.push(crate::state::LogDelivery {
3112                log_group_arn: target_arn.clone(),
3113                payload: body_str.clone(),
3114                timestamp: Utc::now(),
3115            });
3116            drop(state);
3117            if let Some(ref log_state) = self.logs_state {
3118                deliver_to_logs(log_state, target_arn, &body_str, Utc::now());
3119            }
3120        } else if target_arn.contains(":states:") {
3121            self.delivery
3122                .start_stepfunctions_execution(target_arn, &body_str);
3123            let mut state = self.state.write();
3124            state
3125                .step_function_executions
3126                .push(crate::state::StepFunctionExecution {
3127                    state_machine_arn: target_arn.clone(),
3128                    payload: body_str.clone(),
3129                    timestamp: Utc::now(),
3130                });
3131        } else if target_arn.starts_with("https://") || target_arn.starts_with("http://") {
3132            let url = target_arn.clone();
3133            let payload = body_str.clone();
3134            tokio::spawn(async move {
3135                let client = reqwest::Client::new();
3136                let _ = client
3137                    .post(&url)
3138                    .header("Content-Type", "application/json")
3139                    .body(payload)
3140                    .send()
3141                    .await;
3142            });
3143        }
3144    }
3145
3146    fn describe_replay(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3147        let body = req.json_body();
3148        validate_required("ReplayName", &body["ReplayName"])?;
3149        let name = body["ReplayName"]
3150            .as_str()
3151            .ok_or_else(|| missing("ReplayName"))?;
3152        validate_string_length("replayName", name, 1, 64)?;
3153
3154        let state = self.state.read();
3155        let replay = state.replays.get(name).ok_or_else(|| {
3156            AwsServiceError::aws_error(
3157                StatusCode::BAD_REQUEST,
3158                "ResourceNotFoundException",
3159                format!("Replay {name} does not exist."),
3160            )
3161        })?;
3162
3163        let mut resp = json!({
3164            "Destination": replay.destination,
3165            "EventSourceArn": replay.event_source_arn,
3166            "EventStartTime": replay.event_start_time.timestamp() as f64,
3167            "EventEndTime": replay.event_end_time.timestamp() as f64,
3168            "ReplayArn": replay.arn,
3169            "ReplayName": replay.name,
3170            "ReplayStartTime": replay.replay_start_time.timestamp() as f64,
3171            "State": replay.state,
3172        });
3173        if let Some(ref desc) = replay.description {
3174            resp["Description"] = json!(desc);
3175        }
3176        if let Some(ref end) = replay.replay_end_time {
3177            resp["ReplayEndTime"] = json!(end.timestamp() as f64);
3178        }
3179
3180        Ok(AwsResponse::ok_json(resp))
3181    }
3182
3183    fn list_replays(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3184        let body = req.json_body();
3185        validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 64)?;
3186        validate_optional_string_length(
3187            "eventSourceArn",
3188            body["EventSourceArn"].as_str(),
3189            1,
3190            1600,
3191        )?;
3192        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
3193        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
3194        let name_prefix = body["NamePrefix"].as_str();
3195        let source_arn = body["EventSourceArn"].as_str();
3196        let replay_state = body["State"].as_str();
3197
3198        // Validate at most one filter
3199        let filter_count = [
3200            name_prefix.is_some(),
3201            source_arn.is_some(),
3202            replay_state.is_some(),
3203        ]
3204        .iter()
3205        .filter(|&&x| x)
3206        .count();
3207        if filter_count > 1 {
3208            return Err(AwsServiceError::aws_error(
3209                StatusCode::BAD_REQUEST,
3210                "ValidationException",
3211                "At most one filter is allowed for ListReplays. Use either : State, EventSourceArn, or NamePrefix.",
3212            ));
3213        }
3214
3215        // Validate state
3216        if let Some(s) = replay_state {
3217            let valid = [
3218                "CANCELLED",
3219                "CANCELLING",
3220                "COMPLETED",
3221                "FAILED",
3222                "RUNNING",
3223                "STARTING",
3224            ];
3225            if !valid.contains(&s) {
3226                return Err(AwsServiceError::aws_error(
3227                    StatusCode::BAD_REQUEST,
3228                    "ValidationException",
3229                    format!(
3230                        "1 validation error detected: Value '{}' at 'state' failed to satisfy constraint: Member must satisfy enum value set: [CANCELLED, CANCELLING, COMPLETED, FAILED, RUNNING, STARTING]",
3231                        s
3232                    ),
3233                ));
3234            }
3235        }
3236
3237        let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
3238
3239        let state = self.state.read();
3240        let all: Vec<Value> = state
3241            .replays
3242            .values()
3243            .filter(|r| {
3244                if let Some(prefix) = name_prefix {
3245                    r.name.starts_with(prefix)
3246                } else if let Some(arn) = source_arn {
3247                    r.event_source_arn == arn
3248                } else if let Some(s) = replay_state {
3249                    r.state == s
3250                } else {
3251                    true
3252                }
3253            })
3254            .map(|r| {
3255                let mut obj = json!({
3256                    "EventSourceArn": r.event_source_arn,
3257                    "EventStartTime": r.event_start_time.timestamp() as f64,
3258                    "EventEndTime": r.event_end_time.timestamp() as f64,
3259                    "ReplayName": r.name,
3260                    "ReplayStartTime": r.replay_start_time.timestamp() as f64,
3261                    "State": r.state,
3262                });
3263                if let Some(ref end) = r.replay_end_time {
3264                    obj["ReplayEndTime"] = json!(end.timestamp() as f64);
3265                }
3266                obj
3267            })
3268            .collect();
3269
3270        let (replays, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
3271        let mut resp = json!({ "Replays": replays });
3272        if let Some(token) = next_token {
3273            resp["NextToken"] = json!(token);
3274        }
3275
3276        Ok(AwsResponse::ok_json(resp))
3277    }
3278
3279    fn cancel_replay(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3280        let body = req.json_body();
3281        validate_required("ReplayName", &body["ReplayName"])?;
3282        let name = body["ReplayName"]
3283            .as_str()
3284            .ok_or_else(|| missing("ReplayName"))?;
3285        validate_string_length("replayName", name, 1, 64)?;
3286
3287        let mut state = self.state.write();
3288        let replay = state.replays.get_mut(name).ok_or_else(|| {
3289            AwsServiceError::aws_error(
3290                StatusCode::BAD_REQUEST,
3291                "ResourceNotFoundException",
3292                format!("Replay {name} does not exist."),
3293            )
3294        })?;
3295
3296        // Can only cancel STARTING or RUNNING replays (or COMPLETED in our mock)
3297        if replay.state == "CANCELLED" || replay.state == "CANCELLING" {
3298            return Err(AwsServiceError::aws_error(
3299                StatusCode::BAD_REQUEST,
3300                "IllegalStatusException",
3301                format!("Replay {name} is not in a valid state for this operation."),
3302            ));
3303        }
3304
3305        let arn = replay.arn.clone();
3306        replay.state = "CANCELLED".to_string();
3307
3308        Ok(AwsResponse::ok_json(json!({
3309            "ReplayArn": arn,
3310            "State": "CANCELLING",
3311        })))
3312    }
3313}
3314
3315// ─── Tag Lookup Helpers ─────────────────────────────────────────────────
3316
3317fn find_tags_mut<'a>(
3318    state: &'a mut crate::state::EventBridgeState,
3319    arn: &str,
3320) -> Result<&'a mut HashMap<String, String>, AwsServiceError> {
3321    // Check buses
3322    for bus in state.buses.values_mut() {
3323        if bus.arn == arn {
3324            return Ok(&mut bus.tags);
3325        }
3326    }
3327    // Check rules
3328    for rule in state.rules.values_mut() {
3329        if rule.arn == arn {
3330            return Ok(&mut rule.tags);
3331        }
3332    }
3333
3334    // Parse ARN to give better error messages
3335    let error_msg = if arn.contains(":rule/") {
3336        // Extract rule name and bus from ARN
3337        let parts: Vec<&str> = arn.rsplitn(2, ":rule/").collect();
3338        if let Some(rule_path) = parts.first() {
3339            if let Some((bus, rule_name)) = rule_path.rsplit_once('/') {
3340                format!("Rule {rule_name} does not exist on EventBus {bus}.")
3341            } else {
3342                format!("Rule {} does not exist on EventBus default.", rule_path)
3343            }
3344        } else {
3345            format!("Resource {arn} not found.")
3346        }
3347    } else {
3348        format!("Resource {arn} not found.")
3349    };
3350
3351    Err(AwsServiceError::aws_error(
3352        StatusCode::BAD_REQUEST,
3353        "ResourceNotFoundException",
3354        error_msg,
3355    ))
3356}
3357
3358fn find_tags<'a>(
3359    state: &'a crate::state::EventBridgeState,
3360    arn: &str,
3361) -> Result<&'a HashMap<String, String>, AwsServiceError> {
3362    for bus in state.buses.values() {
3363        if bus.arn == arn {
3364            return Ok(&bus.tags);
3365        }
3366    }
3367    for rule in state.rules.values() {
3368        if rule.arn == arn {
3369            return Ok(&rule.tags);
3370        }
3371    }
3372
3373    let error_msg = if arn.contains(":rule/") {
3374        let parts: Vec<&str> = arn.rsplitn(2, ":rule/").collect();
3375        if let Some(rule_path) = parts.first() {
3376            if let Some((bus, rule_name)) = rule_path.rsplit_once('/') {
3377                format!("Rule {rule_name} does not exist on EventBus {bus}.")
3378            } else {
3379                format!("Rule {} does not exist on EventBus default.", rule_path)
3380            }
3381        } else {
3382            format!("Resource {arn} not found.")
3383        }
3384    } else {
3385        format!("Resource {arn} not found.")
3386    };
3387
3388    Err(AwsServiceError::aws_error(
3389        StatusCode::BAD_REQUEST,
3390        "ResourceNotFoundException",
3391        error_msg,
3392    ))
3393}
3394
3395// ─── Event Pattern Validation ────────────────────────────────────────
3396
3397fn validate_event_pattern(pattern: &str) -> Result<(), AwsServiceError> {
3398    let parsed: Value = serde_json::from_str(pattern).map_err(|_| {
3399        AwsServiceError::aws_error(
3400            StatusCode::BAD_REQUEST,
3401            "InvalidEventPatternException",
3402            "Event pattern is not valid. Reason: Invalid JSON",
3403        )
3404    })?;
3405
3406    validate_pattern_values(&parsed, "")?;
3407    Ok(())
3408}
3409
3410fn validate_pattern_values(value: &Value, path: &str) -> Result<(), AwsServiceError> {
3411    match value {
3412        Value::Object(obj) => {
3413            for (key, val) in obj {
3414                let new_path = if path.is_empty() {
3415                    key.clone()
3416                } else {
3417                    format!("{path}.{key}")
3418                };
3419                match val {
3420                    Value::Object(_) => validate_pattern_values(val, &new_path)?,
3421                    Value::Array(_) => {} // Arrays are fine at leaf level
3422                    _ => {
3423                        return Err(AwsServiceError::aws_error(
3424                            StatusCode::BAD_REQUEST,
3425                            "InvalidEventPatternException",
3426                            format!(
3427                                "Event pattern is not valid. Reason: '{}' must be an object or an array",
3428                                key
3429                            ),
3430                        ));
3431                    }
3432                }
3433            }
3434            Ok(())
3435        }
3436        _ => Ok(()),
3437    }
3438}
3439
3440// ─── Connection Auth Params Response Builder ────────────────────────
3441
3442fn build_auth_params_response(auth_type: &str, params: &Value) -> Value {
3443    match auth_type {
3444        "API_KEY" => {
3445            let mut resp = json!({});
3446            if let Some(api_key) = params.get("ApiKeyAuthParameters") {
3447                resp["ApiKeyAuthParameters"] = json!({
3448                    "ApiKeyName": api_key["ApiKeyName"],
3449                });
3450            }
3451            resp
3452        }
3453        "BASIC" => {
3454            let mut resp = json!({});
3455            if let Some(basic) = params.get("BasicAuthParameters") {
3456                resp["BasicAuthParameters"] = json!({
3457                    "Username": basic["Username"],
3458                });
3459            }
3460            resp
3461        }
3462        "OAUTH_CLIENT_CREDENTIALS" => {
3463            let mut resp = json!({});
3464            if let Some(oauth) = params.get("OAuthParameters") {
3465                resp["OAuthParameters"] = json!({
3466                    "AuthorizationEndpoint": oauth["AuthorizationEndpoint"],
3467                    "HttpMethod": oauth["HttpMethod"],
3468                    "ClientParameters": {
3469                        "ClientID": oauth.get("ClientParameters").and_then(|c| c.get("ClientID")),
3470                    },
3471                });
3472            }
3473            resp
3474        }
3475        _ => params.clone(),
3476    }
3477}
3478
3479// ─── Event Pattern Matching ─────────────────────────────────────────
3480
3481/// Match an event against an EventBridge event pattern.
3482pub fn matches_pattern(
3483    pattern_json: Option<&str>,
3484    source: &str,
3485    detail_type: &str,
3486    detail: &str,
3487    account: &str,
3488    region: &str,
3489    resources: &[String],
3490) -> bool {
3491    let pattern_json = match pattern_json {
3492        Some(p) => p,
3493        None => return true,
3494    };
3495
3496    let pattern: Value = match serde_json::from_str(pattern_json) {
3497        Ok(v) => v,
3498        Err(_) => return false,
3499    };
3500
3501    let pattern_obj = match pattern.as_object() {
3502        Some(o) => o,
3503        None => return false,
3504    };
3505
3506    let detail_value: Value = serde_json::from_str(detail).unwrap_or(json!({}));
3507    let event = json!({
3508        "source": source,
3509        "detail-type": detail_type,
3510        "detail": detail_value,
3511        "account": account,
3512        "region": region,
3513        "resources": resources,
3514    });
3515
3516    for (key, pattern_value) in pattern_obj {
3517        let event_value = &event[key];
3518        if !matches_value(pattern_value, event_value) {
3519            return false;
3520        }
3521    }
3522
3523    true
3524}
3525
3526fn matches_value(pattern: &Value, event_value: &Value) -> bool {
3527    match pattern {
3528        Value::Object(obj) => {
3529            for (key, sub_pattern) in obj {
3530                let sub_value = &event_value[key];
3531                if !matches_value(sub_pattern, sub_value) {
3532                    return false;
3533                }
3534            }
3535            true
3536        }
3537        Value::Array(arr) => arr.iter().any(|elem| matches_single(elem, event_value)),
3538        _ => false,
3539    }
3540}
3541
3542fn matches_single(pattern_elem: &Value, event_value: &Value) -> bool {
3543    match pattern_elem {
3544        Value::Object(obj) => {
3545            if let Some(prefix_val) = obj.get("prefix") {
3546                if let (Some(prefix), Some(actual)) = (prefix_val.as_str(), event_value.as_str()) {
3547                    return actual.starts_with(prefix);
3548                }
3549                return false;
3550            }
3551            if let Some(exists_val) = obj.get("exists") {
3552                let should_exist = exists_val.as_bool().unwrap_or(true);
3553                let does_exist = !event_value.is_null();
3554                return should_exist == does_exist;
3555            }
3556            if let Some(anything_but_val) = obj.get("anything-but") {
3557                return match anything_but_val {
3558                    Value::String(s) => event_value.as_str() != Some(s.as_str()),
3559                    Value::Array(arr) => !arr.iter().any(|v| values_equal(v, event_value)),
3560                    Value::Number(_) => event_value != anything_but_val,
3561                    _ => true,
3562                };
3563            }
3564            if let Some(numeric_val) = obj.get("numeric") {
3565                return matches_numeric(numeric_val, event_value);
3566            }
3567            false
3568        }
3569        _ => values_equal(pattern_elem, event_value),
3570    }
3571}
3572
3573/// For each archive on `event_bus_name` whose event pattern matches the
3574/// event, append a clone of it to the archive's stored events and bump
3575/// the archive's counters.
3576#[allow(clippy::too_many_arguments)]
3577fn archive_matching_event(
3578    state: &mut crate::state::EventBridgeState,
3579    event: &PutEvent,
3580    event_bus_name: &str,
3581    source: &str,
3582    detail_type: &str,
3583    detail: &str,
3584    account_id: &str,
3585    region: &str,
3586    resources: &[String],
3587) {
3588    let archive_keys: Vec<String> = state.archives.keys().cloned().collect();
3589    for akey in archive_keys {
3590        let (archive_bus, archive_pattern, archive_enabled) = {
3591            let a = &state.archives[&akey];
3592            (
3593                state.resolve_bus_name(&a.event_source_arn),
3594                a.event_pattern.clone(),
3595                a.state == "ENABLED",
3596            )
3597        };
3598        if archive_bus != event_bus_name || !archive_enabled {
3599            continue;
3600        }
3601        let pattern_matches = matches_pattern(
3602            archive_pattern.as_deref(),
3603            source,
3604            detail_type,
3605            detail,
3606            account_id,
3607            region,
3608            resources,
3609        );
3610        if !pattern_matches {
3611            continue;
3612        }
3613        if let Some(archive) = state.archives.get_mut(&akey) {
3614            archive.event_count += 1;
3615            archive.size_bytes += detail.len() as i64;
3616            archive.events.push(event.clone());
3617        }
3618    }
3619}
3620
3621/// Parsed + validated inputs for `StartReplay`.
3622struct StartReplayInput {
3623    name: String,
3624    description: Option<String>,
3625    event_source_arn: String,
3626    destination: Value,
3627    destination_arn: String,
3628    event_start_time: DateTime<Utc>,
3629    event_end_time: DateTime<Utc>,
3630}
3631
3632impl StartReplayInput {
3633    fn from_body(body: &Value) -> Result<Self, AwsServiceError> {
3634        validate_required("ReplayName", &body["ReplayName"])?;
3635        let name = body["ReplayName"]
3636            .as_str()
3637            .ok_or_else(|| missing("ReplayName"))?
3638            .to_string();
3639        validate_string_length("replayName", &name, 1, 64)?;
3640        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
3641        validate_required("EventSourceArn", &body["EventSourceArn"])?;
3642        let description = body["Description"].as_str().map(|s| s.to_string());
3643        let event_source_arn = body["EventSourceArn"]
3644            .as_str()
3645            .ok_or_else(|| missing("EventSourceArn"))?
3646            .to_string();
3647        validate_string_length("eventSourceArn", &event_source_arn, 1, 1600)?;
3648        validate_required("EventStartTime", &body["EventStartTime"])?;
3649        validate_required("EventEndTime", &body["EventEndTime"])?;
3650        validate_required("Destination", &body["Destination"])?;
3651        let destination = body["Destination"].clone();
3652
3653        let event_start_time = body["EventStartTime"]
3654            .as_f64()
3655            .and_then(|f| DateTime::from_timestamp(f as i64, 0))
3656            .unwrap_or_else(Utc::now);
3657        let event_end_time = body["EventEndTime"]
3658            .as_f64()
3659            .and_then(|f| DateTime::from_timestamp(f as i64, 0))
3660            .unwrap_or_else(Utc::now);
3661
3662        let destination_arn = destination["Arn"].as_str().unwrap_or("").to_string();
3663        if !destination_arn.contains(":event-bus/") {
3664            return Err(AwsServiceError::aws_error(
3665                StatusCode::BAD_REQUEST,
3666                "ValidationException",
3667                "Parameter Destination.Arn is not valid. Reason: Must contain an event bus ARN.",
3668            ));
3669        }
3670
3671        Ok(Self {
3672            name,
3673            description,
3674            event_source_arn,
3675            destination,
3676            destination_arn,
3677            event_start_time,
3678            event_end_time,
3679        })
3680    }
3681}
3682
3683/// Walk the named archive, filter events into the replay window, then
3684/// fan out each event against rules on `bus_name` to collect its
3685/// matching targets. Returns only events that matched at least one
3686/// target.
3687#[allow(clippy::too_many_arguments)]
3688fn collect_replay_events_with_targets(
3689    state: &crate::state::EventBridgeState,
3690    archive_name: &str,
3691    bus_name: &str,
3692    event_start_time: DateTime<Utc>,
3693    event_end_time: DateTime<Utc>,
3694    account_id: &str,
3695    region: &str,
3696) -> Vec<(PutEvent, Vec<EventTarget>)> {
3697    let Some(archive) = state.archives.get(archive_name) else {
3698        return Vec::new();
3699    };
3700
3701    let replay_events: Vec<PutEvent> = archive
3702        .events
3703        .iter()
3704        .filter(|e| e.time >= event_start_time && e.time < event_end_time)
3705        .cloned()
3706        .collect();
3707
3708    let mut events_to_deliver: Vec<(PutEvent, Vec<EventTarget>)> = Vec::new();
3709    for event in replay_events {
3710        let matching_targets: Vec<EventTarget> = state
3711            .rules
3712            .values()
3713            .filter(|r| {
3714                r.event_bus_name == bus_name
3715                    && r.state == "ENABLED"
3716                    && matches_pattern(
3717                        r.event_pattern.as_deref(),
3718                        &event.source,
3719                        &event.detail_type,
3720                        &event.detail,
3721                        account_id,
3722                        region,
3723                        &event.resources,
3724                    )
3725            })
3726            .flat_map(|r| r.targets.clone())
3727            .collect();
3728
3729        if !matching_targets.is_empty() {
3730            events_to_deliver.push((event, matching_targets));
3731        }
3732    }
3733    events_to_deliver
3734}
3735
3736fn matches_numeric(numeric_arr: &Value, event_value: &Value) -> bool {
3737    let arr = match numeric_arr.as_array() {
3738        Some(a) => a,
3739        None => return false,
3740    };
3741    let actual = match event_value.as_f64() {
3742        Some(n) => n,
3743        None => return false,
3744    };
3745    let mut i = 0;
3746    while i + 1 < arr.len() {
3747        let op = match arr[i].as_str() {
3748            Some(s) => s,
3749            None => return false,
3750        };
3751        let threshold = match arr[i + 1].as_f64() {
3752            Some(n) => n,
3753            None => return false,
3754        };
3755        let ok = match op {
3756            ">" => actual > threshold,
3757            ">=" => actual >= threshold,
3758            "<" => actual < threshold,
3759            "<=" => actual <= threshold,
3760            "=" => (actual - threshold).abs() < f64::EPSILON,
3761            _ => return false,
3762        };
3763        if !ok {
3764            return false;
3765        }
3766        i += 2;
3767    }
3768    true
3769}
3770
3771fn values_equal(a: &Value, b: &Value) -> bool {
3772    a == b
3773}
3774
3775/// Resolve a simple JSON path like `$.detail.name` against an event JSON value.
3776fn resolve_json_path(event: &Value, path: &str) -> Option<Value> {
3777    let path = path.strip_prefix('$').unwrap_or(path);
3778    let mut current = event;
3779    for segment in path.split('.') {
3780        if segment.is_empty() {
3781            continue;
3782        }
3783        current = current.get(segment)?;
3784    }
3785    Some(current.clone())
3786}
3787
3788/// Apply an EventBridge InputTransformer to an event.
3789fn apply_input_transformer(transformer: &Value, event: &Value) -> String {
3790    let input_paths_map = transformer
3791        .get("InputPathsMap")
3792        .and_then(|v| v.as_object())
3793        .cloned()
3794        .unwrap_or_default();
3795    let template = transformer
3796        .get("InputTemplate")
3797        .and_then(|v| v.as_str())
3798        .unwrap_or("")
3799        .to_string();
3800
3801    // Resolve all input paths
3802    let mut resolved: HashMap<String, Value> = HashMap::new();
3803    for (var_name, path_val) in &input_paths_map {
3804        if let Some(path_str) = path_val.as_str() {
3805            if let Some(val) = resolve_json_path(event, path_str) {
3806                resolved.insert(var_name.clone(), val);
3807            }
3808        }
3809    }
3810
3811    // Replace <varName> placeholders in template
3812    let mut result = template;
3813    for (var_name, val) in &resolved {
3814        let placeholder = format!("<{var_name}>");
3815        let replacement = match val {
3816            Value::String(s) => s.clone(),
3817            other => other.to_string(),
3818        };
3819        result = result.replace(&placeholder, &replacement);
3820    }
3821
3822    result
3823}
3824
3825fn missing(name: &str) -> AwsServiceError {
3826    AwsServiceError::aws_error(
3827        StatusCode::BAD_REQUEST,
3828        "ValidationException",
3829        format!("The request must contain the parameter {name}"),
3830    )
3831}
3832
3833/// Extract a Lambda function name from its ARN.
3834///
3835/// Handles both unqualified (`arn:aws:lambda:region:account:function:NAME`)
3836/// and qualified (`arn:aws:lambda:region:account:function:NAME:alias`) ARNs.
3837fn function_name_from_arn(arn: &str) -> &str {
3838    let parts: Vec<&str> = arn.split(':').collect();
3839    if parts.len() >= 7 && parts[5] == "function" {
3840        parts[6]
3841    } else {
3842        arn
3843    }
3844}
3845
3846/// Spawn a background task to invoke a Lambda function via ContainerRuntime.
3847/// This is fire-and-forget: EventBridge delivery is asynchronous.
3848pub fn invoke_lambda_async(
3849    container_runtime: &Option<Arc<ContainerRuntime>>,
3850    lambda_state: &Option<SharedLambdaState>,
3851    function_arn: &str,
3852    payload: &str,
3853) {
3854    let runtime = match container_runtime {
3855        Some(rt) => rt.clone(),
3856        None => return,
3857    };
3858    let lambda_state = match lambda_state {
3859        Some(ls) => ls.clone(),
3860        None => return,
3861    };
3862    let func_name = function_name_from_arn(function_arn).to_string();
3863    let payload = payload.as_bytes().to_vec();
3864
3865    tokio::spawn(async move {
3866        let func = {
3867            let state = lambda_state.read();
3868            state.functions.get(&func_name).cloned()
3869        };
3870        let func = match func {
3871            Some(f) => f,
3872            None => {
3873                tracing::warn!(
3874                    function = %func_name,
3875                    "EventBridge Lambda target not found, skipping invocation"
3876                );
3877                return;
3878            }
3879        };
3880        match runtime.invoke(&func, &payload).await {
3881            Ok(_) => {
3882                tracing::info!(function = %func_name, "EventBridge Lambda invocation succeeded");
3883            }
3884            Err(e) => {
3885                tracing::warn!(
3886                    function = %func_name,
3887                    error = %e,
3888                    "EventBridge Lambda invocation failed"
3889                );
3890            }
3891        }
3892    });
3893}
3894
3895/// Deliver an EventBridge event to CloudWatch Logs by writing a log event
3896/// to the appropriate log group and stream.
3897pub fn deliver_to_logs(
3898    logs_state: &SharedLogsState,
3899    log_group_arn: &str,
3900    payload: &str,
3901    timestamp: chrono::DateTime<chrono::Utc>,
3902) {
3903    // Extract log group name from ARN: arn:aws:logs:region:account:log-group:NAME
3904    // or just the name if it's not an ARN
3905    let group_name = if log_group_arn.contains(":log-group:") {
3906        log_group_arn
3907            .split(":log-group:")
3908            .nth(1)
3909            .unwrap_or(log_group_arn)
3910            .trim_end_matches(":*")
3911    } else {
3912        log_group_arn
3913    };
3914
3915    let stream_name = "events".to_string();
3916    let ts_millis = timestamp.timestamp_millis();
3917
3918    let mut state = logs_state.write();
3919    let region = state.region.clone();
3920    let account_id = state.account_id.clone();
3921
3922    // Auto-create log group and stream if they don't exist
3923    let group = state
3924        .log_groups
3925        .entry(group_name.to_string())
3926        .or_insert_with(|| fakecloud_logs::state::LogGroup {
3927            name: group_name.to_string(),
3928            arn: Arn::new(
3929                "logs",
3930                &region,
3931                &account_id,
3932                &format!("log-group:{group_name}"),
3933            )
3934            .to_string(),
3935            creation_time: ts_millis,
3936            retention_in_days: None,
3937            kms_key_id: None,
3938            tags: HashMap::new(),
3939            log_streams: HashMap::new(),
3940            stored_bytes: 0,
3941            subscription_filters: Vec::new(),
3942            data_protection_policy: None,
3943            index_policies: Vec::new(),
3944            transformer: None,
3945            deletion_protection: false,
3946        });
3947
3948    let stream = group
3949        .log_streams
3950        .entry(stream_name.clone())
3951        .or_insert_with(|| fakecloud_logs::state::LogStream {
3952            name: stream_name,
3953            arn: format!("{}:log-stream:events", group.arn),
3954            creation_time: ts_millis,
3955            first_event_timestamp: None,
3956            last_event_timestamp: None,
3957            last_ingestion_time: None,
3958            upload_sequence_token: "1".to_string(),
3959            events: Vec::new(),
3960        });
3961
3962    stream.events.push(fakecloud_logs::state::LogEvent {
3963        timestamp: ts_millis,
3964        message: payload.to_string(),
3965        ingestion_time: ts_millis,
3966    });
3967    stream.last_event_timestamp = Some(ts_millis);
3968    stream.last_ingestion_time = Some(ts_millis);
3969    if stream.first_event_timestamp.is_none() {
3970        stream.first_event_timestamp = Some(ts_millis);
3971    }
3972}
3973
3974/// Apply connection auth parameters to an outgoing HTTP request.
3975fn apply_connection_auth(
3976    mut builder: reqwest::RequestBuilder,
3977    conn: &Connection,
3978) -> reqwest::RequestBuilder {
3979    match conn.authorization_type.as_str() {
3980        "API_KEY" => {
3981            if let Some(params) = conn.auth_parameters.get("ApiKeyAuthParameters") {
3982                if let (Some(name), Some(value)) = (
3983                    params["ApiKeyName"].as_str(),
3984                    params["ApiKeyValue"].as_str(),
3985                ) {
3986                    builder = builder.header(name, value);
3987                }
3988            }
3989        }
3990        "BASIC" => {
3991            if let Some(params) = conn.auth_parameters.get("BasicAuthParameters") {
3992                if let (Some(user), Some(pass)) =
3993                    (params["Username"].as_str(), params["Password"].as_str())
3994                {
3995                    builder = builder.basic_auth(user, Some(pass));
3996                }
3997            }
3998        }
3999        "OAUTH_CLIENT_CREDENTIALS" => {
4000            // For OAuth, in a real implementation we'd exchange credentials for a token.
4001            // Here we pass client credentials as basic auth as a reasonable approximation.
4002            if let Some(params) = conn.auth_parameters.get("OAuthParameters") {
4003                if let (Some(client_id), Some(client_secret)) = (
4004                    params["ClientParameters"]["ClientID"].as_str(),
4005                    params["ClientParameters"]["ClientSecret"].as_str(),
4006                ) {
4007                    builder = builder.basic_auth(client_id, Some(client_secret));
4008                }
4009            }
4010        }
4011        _ => {}
4012    }
4013    builder
4014}
4015
4016#[cfg(test)]
4017mod tests {
4018    use super::*;
4019
4020    /// Test helper that calls matches_pattern with default account/region/resources
4021    fn test_matches(
4022        pattern_json: Option<&str>,
4023        source: &str,
4024        detail_type: &str,
4025        detail: &str,
4026    ) -> bool {
4027        matches_pattern(
4028            pattern_json,
4029            source,
4030            detail_type,
4031            detail,
4032            "123456789012",
4033            "us-east-1",
4034            &[],
4035        )
4036    }
4037
4038    #[test]
4039    fn pattern_matches_source() {
4040        assert!(test_matches(
4041            Some(r#"{"source": ["my.app"]}"#),
4042            "my.app",
4043            "OrderPlaced",
4044            "{}"
4045        ));
4046        assert!(!test_matches(
4047            Some(r#"{"source": ["other.app"]}"#),
4048            "my.app",
4049            "OrderPlaced",
4050            "{}"
4051        ));
4052    }
4053
4054    #[test]
4055    fn pattern_matches_detail_type() {
4056        assert!(test_matches(
4057            Some(r#"{"detail-type": ["OrderPlaced"]}"#),
4058            "my.app",
4059            "OrderPlaced",
4060            "{}"
4061        ));
4062        assert!(!test_matches(
4063            Some(r#"{"detail-type": ["OrderShipped"]}"#),
4064            "my.app",
4065            "OrderPlaced",
4066            "{}"
4067        ));
4068    }
4069
4070    #[test]
4071    fn pattern_matches_detail_field() {
4072        assert!(test_matches(
4073            Some(r#"{"detail": {"status": ["ACTIVE"]}}"#),
4074            "my.app",
4075            "StatusChange",
4076            r#"{"status": "ACTIVE"}"#
4077        ));
4078        assert!(!test_matches(
4079            Some(r#"{"detail": {"status": ["ACTIVE"]}}"#),
4080            "my.app",
4081            "StatusChange",
4082            r#"{"status": "INACTIVE"}"#
4083        ));
4084    }
4085
4086    #[test]
4087    fn no_pattern_matches_everything() {
4088        assert!(test_matches(None, "any", "any", "{}"));
4089    }
4090
4091    #[test]
4092    fn combined_pattern() {
4093        let pattern = r#"{"source": ["orders"], "detail-type": ["OrderPlaced"]}"#;
4094        assert!(test_matches(Some(pattern), "orders", "OrderPlaced", "{}"));
4095        assert!(!test_matches(Some(pattern), "orders", "OrderShipped", "{}"));
4096        assert!(!test_matches(Some(pattern), "other", "OrderPlaced", "{}"));
4097    }
4098
4099    #[test]
4100    fn nested_detail_pattern() {
4101        let pattern = r#"{"detail": {"order": {"status": ["PLACED"]}}}"#;
4102        assert!(test_matches(
4103            Some(pattern),
4104            "my.app",
4105            "OrderEvent",
4106            r#"{"order": {"status": "PLACED", "id": "123"}}"#
4107        ));
4108        assert!(!test_matches(
4109            Some(pattern),
4110            "my.app",
4111            "OrderEvent",
4112            r#"{"order": {"status": "SHIPPED", "id": "123"}}"#
4113        ));
4114        assert!(!test_matches(
4115            Some(pattern),
4116            "my.app",
4117            "OrderEvent",
4118            r#"{"order": {"id": "123"}}"#
4119        ));
4120    }
4121
4122    #[test]
4123    fn deeply_nested_detail_pattern() {
4124        let pattern = r#"{"detail": {"a": {"b": {"c": ["deep"]}}}}"#;
4125        assert!(test_matches(
4126            Some(pattern),
4127            "src",
4128            "type",
4129            r#"{"a": {"b": {"c": "deep"}}}"#
4130        ));
4131        assert!(!test_matches(
4132            Some(pattern),
4133            "src",
4134            "type",
4135            r#"{"a": {"b": {"c": "shallow"}}}"#
4136        ));
4137    }
4138
4139    #[test]
4140    fn prefix_matcher() {
4141        let pattern = r#"{"source": [{"prefix": "com.myapp"}]}"#;
4142        assert!(test_matches(
4143            Some(pattern),
4144            "com.myapp.orders",
4145            "OrderPlaced",
4146            "{}"
4147        ));
4148        assert!(test_matches(
4149            Some(pattern),
4150            "com.myapp",
4151            "OrderPlaced",
4152            "{}"
4153        ));
4154        assert!(!test_matches(
4155            Some(pattern),
4156            "com.other",
4157            "OrderPlaced",
4158            "{}"
4159        ));
4160    }
4161
4162    #[test]
4163    fn prefix_matcher_in_detail() {
4164        let pattern = r#"{"detail": {"region": [{"prefix": "us-"}]}}"#;
4165        assert!(test_matches(
4166            Some(pattern),
4167            "src",
4168            "type",
4169            r#"{"region": "us-east-1"}"#
4170        ));
4171        assert!(!test_matches(
4172            Some(pattern),
4173            "src",
4174            "type",
4175            r#"{"region": "eu-west-1"}"#
4176        ));
4177    }
4178
4179    #[test]
4180    fn exists_matcher() {
4181        let pattern = r#"{"detail": {"error": [{"exists": true}]}}"#;
4182        assert!(test_matches(
4183            Some(pattern),
4184            "src",
4185            "type",
4186            r#"{"error": "something broke"}"#
4187        ));
4188        assert!(!test_matches(
4189            Some(pattern),
4190            "src",
4191            "type",
4192            r#"{"status": "ok"}"#
4193        ));
4194
4195        let pattern = r#"{"detail": {"error": [{"exists": false}]}}"#;
4196        assert!(test_matches(
4197            Some(pattern),
4198            "src",
4199            "type",
4200            r#"{"status": "ok"}"#
4201        ));
4202        assert!(!test_matches(
4203            Some(pattern),
4204            "src",
4205            "type",
4206            r#"{"error": "something broke"}"#
4207        ));
4208    }
4209
4210    #[test]
4211    fn anything_but_matcher() {
4212        let pattern = r#"{"source": [{"anything-but": "internal"}]}"#;
4213        assert!(test_matches(Some(pattern), "external", "Event", "{}"));
4214        assert!(!test_matches(Some(pattern), "internal", "Event", "{}"));
4215
4216        let pattern = r#"{"source": [{"anything-but": ["internal", "test"]}]}"#;
4217        assert!(test_matches(Some(pattern), "external", "Event", "{}"));
4218        assert!(!test_matches(Some(pattern), "internal", "Event", "{}"));
4219        assert!(!test_matches(Some(pattern), "test", "Event", "{}"));
4220    }
4221
4222    #[test]
4223    fn anything_but_in_detail() {
4224        let pattern = r#"{"detail": {"env": [{"anything-but": "prod"}]}}"#;
4225        assert!(test_matches(
4226            Some(pattern),
4227            "src",
4228            "type",
4229            r#"{"env": "staging"}"#
4230        ));
4231        assert!(!test_matches(
4232            Some(pattern),
4233            "src",
4234            "type",
4235            r#"{"env": "prod"}"#
4236        ));
4237    }
4238
4239    #[test]
4240    fn numeric_greater_than() {
4241        let pattern = r#"{"detail": {"count": [{"numeric": [">", 100]}]}}"#;
4242        assert!(test_matches(
4243            Some(pattern),
4244            "src",
4245            "type",
4246            r#"{"count": 150}"#
4247        ));
4248        assert!(!test_matches(
4249            Some(pattern),
4250            "src",
4251            "type",
4252            r#"{"count": 100}"#
4253        ));
4254        assert!(!test_matches(
4255            Some(pattern),
4256            "src",
4257            "type",
4258            r#"{"count": 50}"#
4259        ));
4260    }
4261
4262    #[test]
4263    fn numeric_less_than() {
4264        let pattern = r#"{"detail": {"count": [{"numeric": ["<", 10]}]}}"#;
4265        assert!(test_matches(
4266            Some(pattern),
4267            "src",
4268            "type",
4269            r#"{"count": 5}"#
4270        ));
4271        assert!(!test_matches(
4272            Some(pattern),
4273            "src",
4274            "type",
4275            r#"{"count": 10}"#
4276        ));
4277        assert!(!test_matches(
4278            Some(pattern),
4279            "src",
4280            "type",
4281            r#"{"count": 15}"#
4282        ));
4283    }
4284
4285    #[test]
4286    fn numeric_range() {
4287        let pattern = r#"{"detail": {"count": [{"numeric": [">=", 50, "<", 200]}]}}"#;
4288        assert!(test_matches(
4289            Some(pattern),
4290            "src",
4291            "type",
4292            r#"{"count": 50}"#
4293        ));
4294        assert!(test_matches(
4295            Some(pattern),
4296            "src",
4297            "type",
4298            r#"{"count": 100}"#
4299        ));
4300        assert!(!test_matches(
4301            Some(pattern),
4302            "src",
4303            "type",
4304            r#"{"count": 200}"#
4305        ));
4306        assert!(!test_matches(
4307            Some(pattern),
4308            "src",
4309            "type",
4310            r#"{"count": 49}"#
4311        ));
4312    }
4313
4314    #[test]
4315    fn mixed_matchers_and_literals() {
4316        let pattern = r#"{"source": ["exact.match", {"prefix": "com.myapp"}]}"#;
4317        assert!(test_matches(Some(pattern), "exact.match", "Event", "{}"));
4318        assert!(test_matches(
4319            Some(pattern),
4320            "com.myapp.orders",
4321            "Event",
4322            "{}"
4323        ));
4324        assert!(!test_matches(Some(pattern), "other.source", "Event", "{}"));
4325    }
4326
4327    // ---- list_connections / list_api_destinations filtering & pagination ----
4328
4329    use crate::state::EventBridgeState;
4330    use fakecloud_core::delivery::DeliveryBus;
4331    use parking_lot::RwLock;
4332
4333    fn make_service() -> EventBridgeService {
4334        let state = Arc::new(RwLock::new(EventBridgeState::new(
4335            "123456789012",
4336            "us-east-1",
4337        )));
4338        let delivery = Arc::new(DeliveryBus::new());
4339        EventBridgeService::new(state, delivery)
4340    }
4341
4342    fn make_request(action: &str, body: Value) -> AwsRequest {
4343        AwsRequest {
4344            service: "events".to_string(),
4345            action: action.to_string(),
4346            region: "us-east-1".to_string(),
4347            account_id: "123456789012".to_string(),
4348            request_id: "test-id".to_string(),
4349            headers: http::HeaderMap::new(),
4350            query_params: HashMap::new(),
4351            body: serde_json::to_vec(&body).unwrap().into(),
4352            path_segments: vec![],
4353            raw_path: "/".to_string(),
4354            raw_query: String::new(),
4355            method: http::Method::POST,
4356            is_query_protocol: false,
4357            access_key_id: None,
4358        }
4359    }
4360
4361    fn create_connection(svc: &EventBridgeService, name: &str) {
4362        let req = make_request(
4363            "CreateConnection",
4364            json!({
4365                "Name": name,
4366                "AuthorizationType": "API_KEY",
4367                "AuthParameters": {
4368                    "ApiKeyAuthParameters": {
4369                        "ApiKeyName": "x-api-key",
4370                        "ApiKeyValue": "secret"
4371                    }
4372                }
4373            }),
4374        );
4375        svc.create_connection(&req).unwrap();
4376    }
4377
4378    fn create_api_destination(svc: &EventBridgeService, name: &str, conn_name: &str) {
4379        let conn_arn_field = {
4380            let state = svc.state.read();
4381            state.connections.get(conn_name).unwrap().arn.clone()
4382        };
4383        let req = make_request(
4384            "CreateApiDestination",
4385            json!({
4386                "Name": name,
4387                "ConnectionArn": conn_arn_field,
4388                "InvocationEndpoint": "https://example.com",
4389                "HttpMethod": "POST"
4390            }),
4391        );
4392        svc.create_api_destination(&req).unwrap();
4393    }
4394
4395    // -- ListConnections tests --
4396
4397    #[test]
4398    fn list_connections_returns_all_by_default() {
4399        let svc = make_service();
4400        create_connection(&svc, "conn-alpha");
4401        create_connection(&svc, "conn-beta");
4402        create_connection(&svc, "conn-gamma");
4403
4404        let req = make_request("ListConnections", json!({}));
4405        let resp = svc.list_connections(&req).unwrap();
4406        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4407        assert_eq!(body["Connections"].as_array().unwrap().len(), 3);
4408        assert!(body["NextToken"].is_null());
4409    }
4410
4411    #[test]
4412    fn list_connections_name_prefix_filter() {
4413        let svc = make_service();
4414        create_connection(&svc, "prod-conn-1");
4415        create_connection(&svc, "prod-conn-2");
4416        create_connection(&svc, "dev-conn-1");
4417
4418        let req = make_request("ListConnections", json!({ "NamePrefix": "prod-" }));
4419        let resp = svc.list_connections(&req).unwrap();
4420        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4421        let names: Vec<&str> = body["Connections"]
4422            .as_array()
4423            .unwrap()
4424            .iter()
4425            .map(|c| c["Name"].as_str().unwrap())
4426            .collect();
4427        assert_eq!(names.len(), 2);
4428        assert!(names.iter().all(|n| n.starts_with("prod-")));
4429    }
4430
4431    #[test]
4432    fn list_connections_state_filter() {
4433        let svc = make_service();
4434        create_connection(&svc, "conn-a");
4435        create_connection(&svc, "conn-b");
4436
4437        // All connections start as AUTHORIZED; change one
4438        {
4439            let mut state = svc.state.write();
4440            state
4441                .connections
4442                .get_mut("conn-b")
4443                .unwrap()
4444                .connection_state = "DEAUTHORIZED".to_string();
4445        }
4446
4447        let req = make_request(
4448            "ListConnections",
4449            json!({ "ConnectionState": "AUTHORIZED" }),
4450        );
4451        let resp = svc.list_connections(&req).unwrap();
4452        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4453        let conns = body["Connections"].as_array().unwrap();
4454        assert_eq!(conns.len(), 1);
4455        assert_eq!(conns[0]["Name"].as_str().unwrap(), "conn-a");
4456    }
4457
4458    #[test]
4459    fn list_connections_pagination() {
4460        let svc = make_service();
4461        for i in 0..5 {
4462            create_connection(&svc, &format!("conn-{i:02}"));
4463        }
4464
4465        // First page: limit 2
4466        let req = make_request("ListConnections", json!({ "Limit": 2 }));
4467        let resp = svc.list_connections(&req).unwrap();
4468        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4469        assert_eq!(body["Connections"].as_array().unwrap().len(), 2);
4470        let token = body["NextToken"].as_str().unwrap();
4471        assert_eq!(token, "2");
4472
4473        // Second page
4474        let req = make_request("ListConnections", json!({ "Limit": 2, "NextToken": token }));
4475        let resp = svc.list_connections(&req).unwrap();
4476        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4477        assert_eq!(body["Connections"].as_array().unwrap().len(), 2);
4478        let token = body["NextToken"].as_str().unwrap();
4479        assert_eq!(token, "4");
4480
4481        // Third page (only 1 remaining)
4482        let req = make_request("ListConnections", json!({ "Limit": 2, "NextToken": token }));
4483        let resp = svc.list_connections(&req).unwrap();
4484        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4485        assert_eq!(body["Connections"].as_array().unwrap().len(), 1);
4486        assert!(body["NextToken"].is_null());
4487    }
4488
4489    #[test]
4490    fn list_connections_pagination_with_filter() {
4491        let svc = make_service();
4492        for i in 0..4 {
4493            create_connection(&svc, &format!("prod-{i:02}"));
4494        }
4495        create_connection(&svc, "dev-00");
4496
4497        let req = make_request(
4498            "ListConnections",
4499            json!({ "NamePrefix": "prod-", "Limit": 2 }),
4500        );
4501        let resp = svc.list_connections(&req).unwrap();
4502        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4503        assert_eq!(body["Connections"].as_array().unwrap().len(), 2);
4504        assert!(body["NextToken"].as_str().is_some());
4505    }
4506
4507    // -- ListApiDestinations tests --
4508
4509    #[test]
4510    fn list_api_destinations_returns_all_by_default() {
4511        let svc = make_service();
4512        create_connection(&svc, "my-conn");
4513        create_api_destination(&svc, "dest-alpha", "my-conn");
4514        create_api_destination(&svc, "dest-beta", "my-conn");
4515
4516        let req = make_request("ListApiDestinations", json!({}));
4517        let resp = svc.list_api_destinations(&req).unwrap();
4518        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4519        assert_eq!(body["ApiDestinations"].as_array().unwrap().len(), 2);
4520        assert!(body["NextToken"].is_null());
4521    }
4522
4523    #[test]
4524    fn list_api_destinations_name_prefix_filter() {
4525        let svc = make_service();
4526        create_connection(&svc, "my-conn");
4527        create_api_destination(&svc, "prod-dest-1", "my-conn");
4528        create_api_destination(&svc, "prod-dest-2", "my-conn");
4529        create_api_destination(&svc, "dev-dest-1", "my-conn");
4530
4531        let req = make_request("ListApiDestinations", json!({ "NamePrefix": "prod-" }));
4532        let resp = svc.list_api_destinations(&req).unwrap();
4533        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4534        let names: Vec<&str> = body["ApiDestinations"]
4535            .as_array()
4536            .unwrap()
4537            .iter()
4538            .map(|d| d["Name"].as_str().unwrap())
4539            .collect();
4540        assert_eq!(names.len(), 2);
4541        assert!(names.iter().all(|n| n.starts_with("prod-")));
4542    }
4543
4544    #[test]
4545    fn list_api_destinations_connection_arn_filter() {
4546        let svc = make_service();
4547        create_connection(&svc, "conn-a");
4548        create_connection(&svc, "conn-b");
4549        create_api_destination(&svc, "dest-1", "conn-a");
4550        create_api_destination(&svc, "dest-2", "conn-b");
4551        create_api_destination(&svc, "dest-3", "conn-a");
4552
4553        let conn_a_arn = {
4554            let state = svc.state.read();
4555            state.connections.get("conn-a").unwrap().arn.clone()
4556        };
4557
4558        let req = make_request(
4559            "ListApiDestinations",
4560            json!({ "ConnectionArn": conn_a_arn }),
4561        );
4562        let resp = svc.list_api_destinations(&req).unwrap();
4563        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4564        let names: Vec<&str> = body["ApiDestinations"]
4565            .as_array()
4566            .unwrap()
4567            .iter()
4568            .map(|d| d["Name"].as_str().unwrap())
4569            .collect();
4570        assert_eq!(names.len(), 2);
4571        assert!(names.contains(&"dest-1"));
4572        assert!(names.contains(&"dest-3"));
4573    }
4574
4575    #[test]
4576    fn list_api_destinations_pagination() {
4577        let svc = make_service();
4578        create_connection(&svc, "my-conn");
4579        for i in 0..5 {
4580            create_api_destination(&svc, &format!("dest-{i:02}"), "my-conn");
4581        }
4582
4583        // First page
4584        let req = make_request("ListApiDestinations", json!({ "Limit": 2 }));
4585        let resp = svc.list_api_destinations(&req).unwrap();
4586        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4587        assert_eq!(body["ApiDestinations"].as_array().unwrap().len(), 2);
4588        let token = body["NextToken"].as_str().unwrap();
4589        assert_eq!(token, "2");
4590
4591        // Second page
4592        let req = make_request(
4593            "ListApiDestinations",
4594            json!({ "Limit": 2, "NextToken": token }),
4595        );
4596        let resp = svc.list_api_destinations(&req).unwrap();
4597        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4598        assert_eq!(body["ApiDestinations"].as_array().unwrap().len(), 2);
4599        let token = body["NextToken"].as_str().unwrap();
4600        assert_eq!(token, "4");
4601
4602        // Last page
4603        let req = make_request(
4604            "ListApiDestinations",
4605            json!({ "Limit": 2, "NextToken": token }),
4606        );
4607        let resp = svc.list_api_destinations(&req).unwrap();
4608        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4609        assert_eq!(body["ApiDestinations"].as_array().unwrap().len(), 1);
4610        assert!(body["NextToken"].is_null());
4611    }
4612
4613    // -- ListEventBuses pagination tests --
4614
4615    fn create_event_bus(svc: &EventBridgeService, name: &str) {
4616        let req = make_request("CreateEventBus", json!({ "Name": name }));
4617        svc.create_event_bus(&req).unwrap();
4618    }
4619
4620    #[test]
4621    fn list_event_buses_pagination() {
4622        let svc = make_service();
4623        // "default" bus already exists, create 4 more
4624        for i in 0..4 {
4625            create_event_bus(&svc, &format!("bus-{i:02}"));
4626        }
4627
4628        // First page: limit 2
4629        let req = make_request("ListEventBuses", json!({ "Limit": 2 }));
4630        let resp = svc.list_event_buses(&req).unwrap();
4631        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4632        assert_eq!(body["EventBuses"].as_array().unwrap().len(), 2);
4633        let token = body["NextToken"].as_str().unwrap();
4634        assert_eq!(token, "2");
4635
4636        // Second page
4637        let req = make_request("ListEventBuses", json!({ "Limit": 2, "NextToken": token }));
4638        let resp = svc.list_event_buses(&req).unwrap();
4639        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4640        assert_eq!(body["EventBuses"].as_array().unwrap().len(), 2);
4641        let token = body["NextToken"].as_str().unwrap();
4642        assert_eq!(token, "4");
4643
4644        // Third page (only 1 remaining)
4645        let req = make_request("ListEventBuses", json!({ "Limit": 2, "NextToken": token }));
4646        let resp = svc.list_event_buses(&req).unwrap();
4647        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4648        assert_eq!(body["EventBuses"].as_array().unwrap().len(), 1);
4649        assert!(body["NextToken"].is_null());
4650    }
4651
4652    #[test]
4653    fn list_event_buses_no_pagination_returns_all() {
4654        let svc = make_service();
4655        create_event_bus(&svc, "bus-alpha");
4656        create_event_bus(&svc, "bus-beta");
4657
4658        let req = make_request("ListEventBuses", json!({}));
4659        let resp = svc.list_event_buses(&req).unwrap();
4660        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4661        // default + 2 custom = 3
4662        assert_eq!(body["EventBuses"].as_array().unwrap().len(), 3);
4663        assert!(body["NextToken"].is_null());
4664    }
4665
4666    // -- PutEvents EndpointId tests --
4667
4668    #[test]
4669    fn put_events_never_includes_endpoint_id_in_response() {
4670        let svc = make_service();
4671        // Even when EndpointId is provided in the request, it must not appear in the response
4672        let req = make_request(
4673            "PutEvents",
4674            json!({
4675                "EndpointId": "my-endpoint.abc123",
4676                "Entries": [{
4677                    "Source": "my.source",
4678                    "DetailType": "MyType",
4679                    "Detail": "{}",
4680                    "EventBusName": "default"
4681                }]
4682            }),
4683        );
4684        let resp = svc.put_events(&req).unwrap();
4685        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4686        assert!(
4687            !body.as_object().unwrap().contains_key("EndpointId"),
4688            "EndpointId should never be in the PutEvents response"
4689        );
4690        assert_eq!(body["FailedEntryCount"], 0);
4691    }
4692
4693    // -- ListArchives pagination tests --
4694
4695    fn create_archive(svc: &EventBridgeService, name: &str) {
4696        let req = make_request(
4697            "CreateArchive",
4698            json!({
4699                "ArchiveName": name,
4700                "EventSourceArn": "arn:aws:events:us-east-1:123456789012:event-bus/default"
4701            }),
4702        );
4703        svc.create_archive(&req).unwrap();
4704    }
4705
4706    #[test]
4707    fn list_archives_pagination() {
4708        let svc = make_service();
4709        for i in 0..5 {
4710            create_archive(&svc, &format!("archive-{i:02}"));
4711        }
4712
4713        // First page: limit 2
4714        let req = make_request("ListArchives", json!({ "Limit": 2 }));
4715        let resp = svc.list_archives(&req).unwrap();
4716        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4717        assert_eq!(body["Archives"].as_array().unwrap().len(), 2);
4718        let token = body["NextToken"].as_str().unwrap();
4719        assert_eq!(token, "2");
4720
4721        // Second page
4722        let req = make_request("ListArchives", json!({ "Limit": 2, "NextToken": token }));
4723        let resp = svc.list_archives(&req).unwrap();
4724        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4725        assert_eq!(body["Archives"].as_array().unwrap().len(), 2);
4726        let token = body["NextToken"].as_str().unwrap();
4727        assert_eq!(token, "4");
4728
4729        // Third page (only 1 remaining)
4730        let req = make_request("ListArchives", json!({ "Limit": 2, "NextToken": token }));
4731        let resp = svc.list_archives(&req).unwrap();
4732        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4733        assert_eq!(body["Archives"].as_array().unwrap().len(), 1);
4734        assert!(body["NextToken"].is_null());
4735    }
4736
4737    // -- ListReplays pagination tests --
4738
4739    fn create_replay(svc: &EventBridgeService, name: &str) {
4740        // Need an archive first for the replay's event source
4741        let archive_arn = {
4742            let state = svc.state.read();
4743            if state.archives.contains_key("replay-archive") {
4744                state.archives["replay-archive"].arn.clone()
4745            } else {
4746                drop(state);
4747                create_archive(svc, "replay-archive");
4748                svc.state.read().archives["replay-archive"].arn.clone()
4749            }
4750        };
4751        let req = make_request(
4752            "StartReplay",
4753            json!({
4754                "ReplayName": name,
4755                "EventSourceArn": archive_arn,
4756                "EventStartTime": 1000000.0,
4757                "EventEndTime": 2000000.0,
4758                "Destination": {
4759                    "Arn": "arn:aws:events:us-east-1:123456789012:event-bus/default"
4760                }
4761            }),
4762        );
4763        svc.start_replay(&req).unwrap();
4764    }
4765
4766    #[test]
4767    fn list_replays_pagination() {
4768        let svc = make_service();
4769        for i in 0..5 {
4770            create_replay(&svc, &format!("replay-{i:02}"));
4771        }
4772
4773        // First page: limit 2
4774        let req = make_request("ListReplays", json!({ "Limit": 2 }));
4775        let resp = svc.list_replays(&req).unwrap();
4776        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4777        assert_eq!(body["Replays"].as_array().unwrap().len(), 2);
4778        let token = body["NextToken"].as_str().unwrap();
4779        assert_eq!(token, "2");
4780
4781        // Second page
4782        let req = make_request("ListReplays", json!({ "Limit": 2, "NextToken": token }));
4783        let resp = svc.list_replays(&req).unwrap();
4784        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4785        assert_eq!(body["Replays"].as_array().unwrap().len(), 2);
4786        let token = body["NextToken"].as_str().unwrap();
4787        assert_eq!(token, "4");
4788
4789        // Third page (only 1 remaining)
4790        let req = make_request("ListReplays", json!({ "Limit": 2, "NextToken": token }));
4791        let resp = svc.list_replays(&req).unwrap();
4792        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4793        assert_eq!(body["Replays"].as_array().unwrap().len(), 1);
4794        assert!(body["NextToken"].is_null());
4795    }
4796
4797    #[test]
4798    fn list_event_buses_invalid_next_token_returns_error() {
4799        let svc = make_service();
4800
4801        let req = make_request("ListEventBuses", json!({ "NextToken": "not-a-number" }));
4802        let result = svc.list_event_buses(&req);
4803        assert!(
4804            result.is_err(),
4805            "non-numeric NextToken should return an error"
4806        );
4807    }
4808
4809    // ---- TestEventPattern tests ----
4810
4811    #[test]
4812    fn test_event_pattern_match() {
4813        let svc = make_service();
4814        let req = make_request(
4815            "TestEventPattern",
4816            json!({
4817                "EventPattern": r#"{"source": ["my.app"]}"#,
4818                "Event": r#"{"source": "my.app", "detail-type": "Test", "detail": {}}"#
4819            }),
4820        );
4821        let resp = svc.test_event_pattern(&req).unwrap();
4822        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4823        assert_eq!(body["Result"], true);
4824    }
4825
4826    #[test]
4827    fn test_event_pattern_no_match() {
4828        let svc = make_service();
4829        let req = make_request(
4830            "TestEventPattern",
4831            json!({
4832                "EventPattern": r#"{"source": ["other.app"]}"#,
4833                "Event": r#"{"source": "my.app", "detail-type": "Test", "detail": {}}"#
4834            }),
4835        );
4836        let resp = svc.test_event_pattern(&req).unwrap();
4837        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4838        assert_eq!(body["Result"], false);
4839    }
4840
4841    #[test]
4842    fn test_event_pattern_detail_match() {
4843        let svc = make_service();
4844        let req = make_request(
4845            "TestEventPattern",
4846            json!({
4847                "EventPattern": r#"{"detail": {"status": ["PLACED"]}}"#,
4848                "Event": r#"{"source": "my.app", "detail-type": "Order", "detail": {"status": "PLACED", "id": "123"}}"#
4849            }),
4850        );
4851        let resp = svc.test_event_pattern(&req).unwrap();
4852        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4853        assert_eq!(body["Result"], true);
4854    }
4855
4856    // ---- UpdateEventBus tests ----
4857
4858    #[test]
4859    fn update_event_bus_description() {
4860        let svc = make_service();
4861        create_event_bus(&svc, "my-bus");
4862
4863        let req = make_request(
4864            "UpdateEventBus",
4865            json!({ "Name": "my-bus", "Description": "Updated desc" }),
4866        );
4867        let resp = svc.update_event_bus(&req).unwrap();
4868        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4869        assert_eq!(body["Name"], "my-bus");
4870
4871        // Verify via describe
4872        let req = make_request("DescribeEventBus", json!({ "Name": "my-bus" }));
4873        let resp = svc.describe_event_bus(&req).unwrap();
4874        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4875        assert_eq!(body["Description"], "Updated desc");
4876    }
4877
4878    #[test]
4879    fn update_event_bus_not_found() {
4880        let svc = make_service();
4881        let req = make_request(
4882            "UpdateEventBus",
4883            json!({ "Name": "ghost-bus", "Description": "nope" }),
4884        );
4885        assert!(svc.update_event_bus(&req).is_err());
4886    }
4887
4888    // ---- Endpoint CRUD tests ----
4889
4890    fn create_endpoint_helper(svc: &EventBridgeService, name: &str) {
4891        let req = make_request(
4892            "CreateEndpoint",
4893            json!({
4894                "Name": name,
4895                "RoutingConfig": {
4896                    "FailoverConfig": {
4897                        "Primary": { "HealthCheck": "" },
4898                        "Secondary": { "Route": "us-west-2" }
4899                    }
4900                },
4901                "EventBuses": [
4902                    { "EventBusArn": "arn:aws:events:us-east-1:123456789012:event-bus/default" }
4903                ]
4904            }),
4905        );
4906        svc.create_endpoint(&req).unwrap();
4907    }
4908
4909    #[test]
4910    fn endpoint_create_describe_delete() {
4911        let svc = make_service();
4912        create_endpoint_helper(&svc, "my-endpoint");
4913
4914        // Describe
4915        let req = make_request("DescribeEndpoint", json!({ "Name": "my-endpoint" }));
4916        let resp = svc.describe_endpoint(&req).unwrap();
4917        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4918        assert_eq!(body["Name"], "my-endpoint");
4919        assert_eq!(body["State"], "ACTIVE");
4920        assert!(body["EndpointId"].as_str().unwrap().contains("my-endpoint"));
4921
4922        // Delete
4923        let req = make_request("DeleteEndpoint", json!({ "Name": "my-endpoint" }));
4924        svc.delete_endpoint(&req).unwrap();
4925
4926        // Verify gone
4927        let req = make_request("DescribeEndpoint", json!({ "Name": "my-endpoint" }));
4928        assert!(svc.describe_endpoint(&req).is_err());
4929    }
4930
4931    #[test]
4932    fn endpoint_list_and_update() {
4933        let svc = make_service();
4934        create_endpoint_helper(&svc, "ep-alpha");
4935        create_endpoint_helper(&svc, "ep-beta");
4936
4937        // List all
4938        let req = make_request("ListEndpoints", json!({}));
4939        let resp = svc.list_endpoints(&req).unwrap();
4940        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4941        assert_eq!(body["Endpoints"].as_array().unwrap().len(), 2);
4942
4943        // Update
4944        let req = make_request(
4945            "UpdateEndpoint",
4946            json!({ "Name": "ep-alpha", "Description": "updated" }),
4947        );
4948        let resp = svc.update_endpoint(&req).unwrap();
4949        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4950        assert_eq!(body["Name"], "ep-alpha");
4951
4952        // Verify description
4953        let req = make_request("DescribeEndpoint", json!({ "Name": "ep-alpha" }));
4954        let resp = svc.describe_endpoint(&req).unwrap();
4955        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4956        assert_eq!(body["Description"], "updated");
4957    }
4958
4959    #[test]
4960    fn endpoint_duplicate_fails() {
4961        let svc = make_service();
4962        create_endpoint_helper(&svc, "dup-ep");
4963        let req = make_request(
4964            "CreateEndpoint",
4965            json!({
4966                "Name": "dup-ep",
4967                "RoutingConfig": {},
4968                "EventBuses": []
4969            }),
4970        );
4971        assert!(svc.create_endpoint(&req).is_err());
4972    }
4973
4974    // ---- DeauthorizeConnection tests ----
4975
4976    #[test]
4977    fn deauthorize_connection_sets_state() {
4978        let svc = make_service();
4979        create_connection(&svc, "deauth-conn");
4980
4981        let req = make_request("DeauthorizeConnection", json!({ "Name": "deauth-conn" }));
4982        let resp = svc.deauthorize_connection(&req).unwrap();
4983        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4984        assert_eq!(body["ConnectionState"], "DEAUTHORIZING");
4985        assert!(body["ConnectionArn"]
4986            .as_str()
4987            .unwrap()
4988            .contains("deauth-conn"));
4989
4990        // Verify via describe
4991        let req = make_request("DescribeConnection", json!({ "Name": "deauth-conn" }));
4992        let resp = svc.describe_connection(&req).unwrap();
4993        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4994        assert_eq!(body["ConnectionState"], "DEAUTHORIZING");
4995    }
4996
4997    #[test]
4998    fn deauthorize_connection_not_found() {
4999        let svc = make_service();
5000        let req = make_request("DeauthorizeConnection", json!({ "Name": "ghost-conn" }));
5001        assert!(svc.deauthorize_connection(&req).is_err());
5002    }
5003
5004    // ---- Partner event source tests ----
5005
5006    #[test]
5007    fn partner_event_source_crud() {
5008        let svc = make_service();
5009
5010        // Create
5011        let req = make_request(
5012            "CreatePartnerEventSource",
5013            json!({ "Name": "partner/test", "Account": "123456789012" }),
5014        );
5015        svc.create_partner_event_source(&req).unwrap();
5016
5017        // Describe
5018        let req = make_request(
5019            "DescribePartnerEventSource",
5020            json!({ "Name": "partner/test" }),
5021        );
5022        let resp = svc.describe_partner_event_source(&req).unwrap();
5023        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5024        assert_eq!(body["Name"], "partner/test");
5025
5026        // List
5027        let req = make_request("ListPartnerEventSources", json!({"NamePrefix": "partner/"}));
5028        let resp = svc.list_partner_event_sources(&req).unwrap();
5029        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5030        assert_eq!(body["PartnerEventSources"].as_array().unwrap().len(), 1);
5031
5032        // ListPartnerEventSourceAccounts
5033        let req = make_request(
5034            "ListPartnerEventSourceAccounts",
5035            json!({ "EventSourceName": "partner/test" }),
5036        );
5037        let resp = svc.list_partner_event_source_accounts(&req).unwrap();
5038        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5039        assert_eq!(
5040            body["PartnerEventSourceAccounts"].as_array().unwrap().len(),
5041            1
5042        );
5043
5044        // DescribeEventSource
5045        let req = make_request("DescribeEventSource", json!({ "Name": "partner/test" }));
5046        let resp = svc.describe_event_source(&req).unwrap();
5047        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5048        assert_eq!(body["Name"], "partner/test");
5049        assert_eq!(body["State"], "ACTIVE");
5050
5051        // ListEventSources
5052        let req = make_request("ListEventSources", json!({}));
5053        let resp = svc.list_event_sources(&req).unwrap();
5054        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5055        assert_eq!(body["EventSources"].as_array().unwrap().len(), 1);
5056
5057        // Delete
5058        let req = make_request(
5059            "DeletePartnerEventSource",
5060            json!({ "Name": "partner/test", "Account": "123456789012" }),
5061        );
5062        svc.delete_partner_event_source(&req).unwrap();
5063
5064        // Verify gone
5065        let req = make_request(
5066            "DescribePartnerEventSource",
5067            json!({ "Name": "partner/test" }),
5068        );
5069        assert!(svc.describe_partner_event_source(&req).is_err());
5070    }
5071
5072    #[test]
5073    fn activate_deactivate_event_source() {
5074        let svc = make_service();
5075
5076        // Create a partner event source first
5077        let req = make_request(
5078            "CreatePartnerEventSource",
5079            json!({ "Name": "aws.partner/test", "Account": "123456789012" }),
5080        );
5081        svc.create_partner_event_source(&req).unwrap();
5082
5083        // Deactivate it
5084        let req = make_request(
5085            "DeactivateEventSource",
5086            json!({ "Name": "aws.partner/test" }),
5087        );
5088        svc.deactivate_event_source(&req).unwrap();
5089        {
5090            let state = svc.state.read();
5091            assert_eq!(
5092                state.partner_event_sources["aws.partner/test"].state,
5093                "INACTIVE"
5094            );
5095        }
5096
5097        // Activate it
5098        let req = make_request("ActivateEventSource", json!({ "Name": "aws.partner/test" }));
5099        svc.activate_event_source(&req).unwrap();
5100        {
5101            let state = svc.state.read();
5102            assert_eq!(
5103                state.partner_event_sources["aws.partner/test"].state,
5104                "ACTIVE"
5105            );
5106        }
5107
5108        // Not-found returns error
5109        let req = make_request("ActivateEventSource", json!({ "Name": "nonexistent" }));
5110        assert!(svc.activate_event_source(&req).is_err());
5111
5112        let req = make_request("DeactivateEventSource", json!({ "Name": "nonexistent" }));
5113        assert!(svc.deactivate_event_source(&req).is_err());
5114    }
5115
5116    #[test]
5117    fn delete_partner_event_source_verifies_account() {
5118        let svc = make_service();
5119
5120        // Create a partner event source
5121        let req = make_request(
5122            "CreatePartnerEventSource",
5123            json!({ "Name": "aws.partner/test", "Account": "123456789012" }),
5124        );
5125        svc.create_partner_event_source(&req).unwrap();
5126
5127        // Deleting with wrong account fails
5128        let req = make_request(
5129            "DeletePartnerEventSource",
5130            json!({ "Name": "aws.partner/test", "Account": "999999999999" }),
5131        );
5132        assert!(svc.delete_partner_event_source(&req).is_err());
5133        // Source still exists
5134        assert!(svc
5135            .state
5136            .read()
5137            .partner_event_sources
5138            .contains_key("aws.partner/test"));
5139
5140        // Deleting with correct account succeeds
5141        let req = make_request(
5142            "DeletePartnerEventSource",
5143            json!({ "Name": "aws.partner/test", "Account": "123456789012" }),
5144        );
5145        svc.delete_partner_event_source(&req).unwrap();
5146        assert!(!svc
5147            .state
5148            .read()
5149            .partner_event_sources
5150            .contains_key("aws.partner/test"));
5151
5152        // Deleting non-existent source returns error
5153        let req = make_request(
5154            "DeletePartnerEventSource",
5155            json!({ "Name": "aws.partner/test", "Account": "123456789012" }),
5156        );
5157        assert!(svc.delete_partner_event_source(&req).is_err());
5158    }
5159
5160    #[test]
5161    fn put_partner_events() {
5162        let svc = make_service();
5163        let req = make_request(
5164            "PutPartnerEvents",
5165            json!({
5166                "Entries": [
5167                    { "Source": "partner.app", "DetailType": "Test", "Detail": "{}" }
5168                ]
5169            }),
5170        );
5171        let resp = svc.put_partner_events(&req).unwrap();
5172        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5173        assert_eq!(body["FailedEntryCount"], 0);
5174        assert_eq!(body["Entries"].as_array().unwrap().len(), 1);
5175        assert!(body["Entries"][0]["EventId"].as_str().is_some());
5176    }
5177
5178    // ---- Archive + Replay delivery tests ----
5179
5180    /// Helper: create a service with a mock SQS delivery that records messages.
5181    #[allow(clippy::type_complexity)]
5182    fn make_service_with_sqs_recorder() -> (
5183        EventBridgeService,
5184        Arc<parking_lot::Mutex<Vec<(String, String)>>>,
5185    ) {
5186        use fakecloud_core::delivery::SqsDelivery;
5187
5188        struct RecordingSqsDelivery {
5189            messages: Arc<parking_lot::Mutex<Vec<(String, String)>>>,
5190        }
5191
5192        impl SqsDelivery for RecordingSqsDelivery {
5193            fn deliver_to_queue(
5194                &self,
5195                queue_arn: &str,
5196                message_body: &str,
5197                _attributes: &HashMap<String, String>,
5198            ) {
5199                self.messages
5200                    .lock()
5201                    .push((queue_arn.to_string(), message_body.to_string()));
5202            }
5203        }
5204
5205        let messages: Arc<parking_lot::Mutex<Vec<(String, String)>>> =
5206            Arc::new(parking_lot::Mutex::new(Vec::new()));
5207        let state = Arc::new(RwLock::new(EventBridgeState::new(
5208            "123456789012",
5209            "us-east-1",
5210        )));
5211        let delivery = Arc::new(DeliveryBus::new().with_sqs(Arc::new(RecordingSqsDelivery {
5212            messages: messages.clone(),
5213        })));
5214        let svc = EventBridgeService::new(state, delivery);
5215        (svc, messages)
5216    }
5217
5218    #[test]
5219    fn start_replay_delivers_archived_events_to_sqs_target() {
5220        let (svc, messages) = make_service_with_sqs_recorder();
5221        let queue_arn = "arn:aws:sqs:us-east-1:123456789012:replay-queue";
5222
5223        // Create a rule with an SQS target
5224        let req = make_request(
5225            "PutRule",
5226            json!({
5227                "Name": "replay-test-rule",
5228                "EventPattern": r#"{"source": ["my.app"]}"#,
5229                "State": "ENABLED"
5230            }),
5231        );
5232        svc.put_rule(&req).unwrap();
5233
5234        let req = make_request(
5235            "PutTargets",
5236            json!({
5237                "Rule": "replay-test-rule",
5238                "Targets": [{
5239                    "Id": "sqs-target",
5240                    "Arn": queue_arn
5241                }]
5242            }),
5243        );
5244        svc.put_targets(&req).unwrap();
5245
5246        // Create an archive on the default bus
5247        let req = make_request(
5248            "CreateArchive",
5249            json!({
5250                "ArchiveName": "test-archive",
5251                "EventSourceArn": "arn:aws:events:us-east-1:123456789012:event-bus/default"
5252            }),
5253        );
5254        svc.create_archive(&req).unwrap();
5255
5256        // PutEvents: these should get archived and delivered
5257        let req = make_request(
5258            "PutEvents",
5259            json!({
5260                "Entries": [
5261                    {
5262                        "Source": "my.app",
5263                        "DetailType": "OrderCreated",
5264                        "Detail": "{\"orderId\": \"1\"}",
5265                        "EventBusName": "default"
5266                    },
5267                    {
5268                        "Source": "my.app",
5269                        "DetailType": "OrderShipped",
5270                        "Detail": "{\"orderId\": \"2\"}",
5271                        "EventBusName": "default"
5272                    }
5273                ]
5274            }),
5275        );
5276        let resp = svc.put_events(&req).unwrap();
5277        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5278        assert_eq!(body["FailedEntryCount"], 0);
5279
5280        // Verify archive has 2 events
5281        {
5282            let state = svc.state.read();
5283            let archive = state.archives.get("test-archive").unwrap();
5284            assert_eq!(archive.events.len(), 2);
5285            assert_eq!(archive.event_count, 2);
5286        }
5287
5288        // Clear recorded messages from PutEvents delivery
5289        messages.lock().clear();
5290
5291        // StartReplay: should re-deliver the archived events
5292        let archive_arn = {
5293            let state = svc.state.read();
5294            state.archives.get("test-archive").unwrap().arn.clone()
5295        };
5296
5297        // Use a wide time range to capture all events
5298        let start_ts = 0.0_f64;
5299        let end_ts = (chrono::Utc::now().timestamp() + 3600) as f64;
5300
5301        let req = make_request(
5302            "StartReplay",
5303            json!({
5304                "ReplayName": "my-replay",
5305                "EventSourceArn": archive_arn,
5306                "Destination": {
5307                    "Arn": "arn:aws:events:us-east-1:123456789012:event-bus/default"
5308                },
5309                "EventStartTime": start_ts,
5310                "EventEndTime": end_ts
5311            }),
5312        );
5313        let resp = svc.start_replay(&req).unwrap();
5314        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5315        assert_eq!(body["State"], "STARTING");
5316
5317        // Verify the replay delivered events to SQS
5318        let delivered = messages.lock();
5319        assert_eq!(
5320            delivered.len(),
5321            2,
5322            "expected 2 replayed events delivered to SQS"
5323        );
5324        for (arn, msg) in delivered.iter() {
5325            assert_eq!(arn, queue_arn);
5326            let event: Value = serde_json::from_str(msg).unwrap();
5327            assert_eq!(event["source"], "my.app");
5328            // Replayed events should include replay-name
5329            assert!(event["replay-name"].as_str().is_some());
5330        }
5331
5332        // Verify replay is marked as COMPLETED
5333        let state = svc.state.read();
5334        let replay = state.replays.get("my-replay").unwrap();
5335        assert_eq!(replay.state, "COMPLETED");
5336    }
5337
5338    #[test]
5339    fn apply_connection_auth_api_key() {
5340        let conn = Connection {
5341            name: "test-conn".to_string(),
5342            arn: "arn:aws:events:us-east-1:123456789012:connection/test-conn/uuid".to_string(),
5343            description: None,
5344            authorization_type: "API_KEY".to_string(),
5345            auth_parameters: json!({
5346                "ApiKeyAuthParameters": {
5347                    "ApiKeyName": "x-api-key",
5348                    "ApiKeyValue": "my-secret"
5349                }
5350            }),
5351            connection_state: "AUTHORIZED".to_string(),
5352            secret_arn: "arn:aws:secretsmanager:us-east-1:123456789012:secret:test".to_string(),
5353            creation_time: Utc::now(),
5354            last_modified_time: Utc::now(),
5355            last_authorized_time: Utc::now(),
5356        };
5357
5358        let client = reqwest::Client::new();
5359        let builder = client
5360            .post("http://localhost:12345/test")
5361            .header("Content-Type", "application/json");
5362        let builder = apply_connection_auth(builder, &conn);
5363
5364        // Build and verify the header was applied
5365        let request = builder.body("{}").build().unwrap();
5366        assert_eq!(
5367            request
5368                .headers()
5369                .get("x-api-key")
5370                .unwrap()
5371                .to_str()
5372                .unwrap(),
5373            "my-secret"
5374        );
5375    }
5376
5377    #[test]
5378    fn apply_connection_auth_basic() {
5379        let conn = Connection {
5380            name: "basic-conn".to_string(),
5381            arn: "arn:aws:events:us-east-1:123456789012:connection/basic-conn/uuid".to_string(),
5382            description: None,
5383            authorization_type: "BASIC".to_string(),
5384            auth_parameters: json!({
5385                "BasicAuthParameters": {
5386                    "Username": "user",
5387                    "Password": "pass"
5388                }
5389            }),
5390            connection_state: "AUTHORIZED".to_string(),
5391            secret_arn: "arn:aws:secretsmanager:us-east-1:123456789012:secret:test".to_string(),
5392            creation_time: Utc::now(),
5393            last_modified_time: Utc::now(),
5394            last_authorized_time: Utc::now(),
5395        };
5396
5397        let client = reqwest::Client::new();
5398        let builder = client.post("http://localhost:12345/test");
5399        let builder = apply_connection_auth(builder, &conn);
5400
5401        let request = builder.body("{}").build().unwrap();
5402        let auth_header = request
5403            .headers()
5404            .get("authorization")
5405            .unwrap()
5406            .to_str()
5407            .unwrap();
5408        assert!(
5409            auth_header.starts_with("Basic "),
5410            "Expected Basic auth header, got: {auth_header}"
5411        );
5412    }
5413
5414    #[tokio::test]
5415    async fn put_events_with_api_destination_target_resolves_destination() {
5416        // This test verifies that the PutEvents code path correctly identifies
5417        // api-destination ARN targets and resolves the destination metadata.
5418        // The actual HTTP call goes to a non-existent host (fire-and-forget).
5419        let state = Arc::new(RwLock::new(EventBridgeState::new(
5420            "123456789012",
5421            "us-east-1",
5422        )));
5423        let delivery = Arc::new(DeliveryBus::new());
5424        let svc = EventBridgeService::new(state, delivery);
5425
5426        // Create connection and api destination
5427        create_connection(&svc, "my-conn");
5428        let conn_arn = {
5429            let state = svc.state.read();
5430            state.connections.get("my-conn").unwrap().arn.clone()
5431        };
5432        let req = make_request(
5433            "CreateApiDestination",
5434            json!({
5435                "Name": "my-dest",
5436                "ConnectionArn": conn_arn,
5437                "InvocationEndpoint": "http://127.0.0.1:1/noop",
5438                "HttpMethod": "POST"
5439            }),
5440        );
5441        svc.create_api_destination(&req).unwrap();
5442
5443        let dest_arn = {
5444            let state = svc.state.read();
5445            state.api_destinations.get("my-dest").unwrap().arn.clone()
5446        };
5447
5448        // Create a rule that targets the api-destination
5449        let req = make_request(
5450            "PutRule",
5451            json!({
5452                "Name": "api-dest-rule",
5453                "EventPattern": r#"{"source":["test.app"]}"#,
5454                "State": "ENABLED"
5455            }),
5456        );
5457        svc.put_rule(&req).unwrap();
5458
5459        let req = make_request(
5460            "PutTargets",
5461            json!({
5462                "Rule": "api-dest-rule",
5463                "Targets": [{ "Id": "dest-target", "Arn": dest_arn }]
5464            }),
5465        );
5466        svc.put_targets(&req).unwrap();
5467
5468        // PutEvents - should match the rule and attempt delivery to ApiDestination
5469        let req = make_request(
5470            "PutEvents",
5471            json!({
5472                "Entries": [{
5473                    "Source": "test.app",
5474                    "DetailType": "TestEvent",
5475                    "Detail": r#"{"key":"value"}"#
5476                }]
5477            }),
5478        );
5479        let resp = svc.put_events(&req).unwrap();
5480        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5481        assert_eq!(body["FailedEntryCount"], 0);
5482        assert_eq!(body["Entries"].as_array().unwrap().len(), 1);
5483        assert!(body["Entries"][0]["EventId"].as_str().is_some());
5484    }
5485
5486    #[test]
5487    fn test_function_name_from_arn() {
5488        // Unqualified ARN
5489        assert_eq!(
5490            super::function_name_from_arn("arn:aws:lambda:us-east-1:123456789012:function:my-func"),
5491            "my-func"
5492        );
5493        // Qualified ARN with alias
5494        assert_eq!(
5495            super::function_name_from_arn(
5496                "arn:aws:lambda:us-east-1:123456789012:function:my-func:prod"
5497            ),
5498            "my-func"
5499        );
5500        // Qualified ARN with version
5501        assert_eq!(
5502            super::function_name_from_arn(
5503                "arn:aws:lambda:us-east-1:123456789012:function:my-func:42"
5504            ),
5505            "my-func"
5506        );
5507        // Plain function name (not an ARN)
5508        assert_eq!(super::function_name_from_arn("my-func"), "my-func");
5509    }
5510}