Skip to main content

fakecloud_eventbridge/
service.rs

1use async_trait::async_trait;
2use chrono::{DateTime, Utc};
3use http::StatusCode;
4use serde_json::{json, Value};
5
6use std::collections::HashMap;
7use std::sync::Arc;
8
9use fakecloud_aws::arn::Arn;
10use fakecloud_core::delivery::DeliveryBus;
11use fakecloud_core::pagination::paginate;
12use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
13use fakecloud_core::validation::*;
14
15use fakecloud_lambda::runtime::ContainerRuntime;
16use fakecloud_lambda::state::{LambdaInvocation, SharedLambdaState};
17use fakecloud_logs::state::SharedLogsState;
18
19use crate::state::{
20    ApiDestination, Archive, Connection, Endpoint, EventBus, EventRule, EventTarget,
21    PartnerEventSource, PutEvent, Replay, SharedEventBridgeState,
22};
23
24/// Validate a single `PutEvents` entry's required fields (`Source`,
25/// `DetailType`, `Detail`) and that `Detail` is a well-formed JSON
26/// object. Returns the JSON error body AWS surfaces in the matching
27/// `Entries[]` slot on failure.
28fn validate_put_events_entry(source: &str, detail_type: &str, detail: &str) -> Result<(), Value> {
29    if source.is_empty() {
30        return Err(json!({
31            "ErrorCode": "InvalidArgument",
32            "ErrorMessage": "Parameter Source is not valid. Reason: Source is a required argument.",
33        }));
34    }
35    if detail_type.is_empty() {
36        return Err(json!({
37            "ErrorCode": "InvalidArgument",
38            "ErrorMessage": "Parameter DetailType is not valid. Reason: DetailType is a required argument.",
39        }));
40    }
41    if detail.is_empty() {
42        return Err(json!({
43            "ErrorCode": "InvalidArgument",
44            "ErrorMessage": "Parameter Detail is not valid. Reason: Detail is a required argument.",
45        }));
46    }
47    if serde_json::from_str::<Value>(detail).is_err() {
48        return Err(json!({
49            "ErrorCode": "MalformedDetail",
50            "ErrorMessage": "Detail is malformed.",
51        }));
52    }
53    Ok(())
54}
55
56/// Parse an entry's `Time` field, tolerating the three formats AWS
57/// accepts (RFC 3339 string, fractional seconds as a float, integer
58/// seconds). Falls back to "now" if the field is absent or
59/// unparseable, which matches the real service.
60fn parse_put_events_time(raw: &Value) -> DateTime<Utc> {
61    if let Some(s) = raw.as_str() {
62        return DateTime::parse_from_rfc3339(s)
63            .map(|dt| dt.with_timezone(&Utc))
64            .unwrap_or_else(|_| Utc::now());
65    }
66    if let Some(ts) = raw.as_f64() {
67        return DateTime::from_timestamp(ts as i64, ((ts.fract()) * 1_000_000_000.0) as u32)
68            .unwrap_or_else(Utc::now);
69    }
70    if let Some(ts) = raw.as_i64() {
71        return DateTime::from_timestamp(ts, 0).unwrap_or_else(Utc::now);
72    }
73    Utc::now()
74}
75
76pub struct EventBridgeService {
77    state: SharedEventBridgeState,
78    delivery: Arc<DeliveryBus>,
79    lambda_state: Option<SharedLambdaState>,
80    logs_state: Option<SharedLogsState>,
81    container_runtime: Option<Arc<ContainerRuntime>>,
82}
83
84impl EventBridgeService {
85    pub fn new(state: SharedEventBridgeState, delivery: Arc<DeliveryBus>) -> Self {
86        Self {
87            state,
88            delivery,
89            lambda_state: None,
90            logs_state: None,
91            container_runtime: None,
92        }
93    }
94
95    pub fn with_lambda(mut self, lambda_state: SharedLambdaState) -> Self {
96        self.lambda_state = Some(lambda_state);
97        self
98    }
99
100    pub fn with_logs(mut self, logs_state: SharedLogsState) -> Self {
101        self.logs_state = Some(logs_state);
102        self
103    }
104
105    pub fn with_runtime(mut self, runtime: Arc<ContainerRuntime>) -> Self {
106        self.container_runtime = Some(runtime);
107        self
108    }
109}
110
111#[async_trait]
112impl AwsService for EventBridgeService {
113    fn service_name(&self) -> &str {
114        "events"
115    }
116
117    async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
118        match req.action.as_str() {
119            "CreateEventBus" => self.create_event_bus(&req),
120            "DeleteEventBus" => self.delete_event_bus(&req),
121            "ListEventBuses" => self.list_event_buses(&req),
122            "DescribeEventBus" => self.describe_event_bus(&req),
123            "PutRule" => self.put_rule(&req),
124            "DeleteRule" => self.delete_rule(&req),
125            "ListRules" => self.list_rules(&req),
126            "DescribeRule" => self.describe_rule(&req),
127            "EnableRule" => self.enable_rule(&req),
128            "DisableRule" => self.disable_rule(&req),
129            "PutTargets" => self.put_targets(&req),
130            "RemoveTargets" => self.remove_targets(&req),
131            "ListTargetsByRule" => self.list_targets_by_rule(&req),
132            "ListRuleNamesByTarget" => self.list_rule_names_by_target(&req),
133            "PutEvents" => self.put_events(&req),
134            "PutPermission" => self.put_permission(&req),
135            "RemovePermission" => self.remove_permission(&req),
136            "TagResource" => self.tag_resource(&req),
137            "UntagResource" => self.untag_resource(&req),
138            "ListTagsForResource" => self.list_tags_for_resource(&req),
139            "CreateArchive" => self.create_archive(&req),
140            "DescribeArchive" => self.describe_archive(&req),
141            "ListArchives" => self.list_archives(&req),
142            "UpdateArchive" => self.update_archive(&req),
143            "DeleteArchive" => self.delete_archive(&req),
144            "CreateConnection" => self.create_connection(&req),
145            "DescribeConnection" => self.describe_connection(&req),
146            "ListConnections" => self.list_connections(&req),
147            "UpdateConnection" => self.update_connection(&req),
148            "DeleteConnection" => self.delete_connection(&req),
149            "CreateApiDestination" => self.create_api_destination(&req),
150            "DescribeApiDestination" => self.describe_api_destination(&req),
151            "ListApiDestinations" => self.list_api_destinations(&req),
152            "UpdateApiDestination" => self.update_api_destination(&req),
153            "DeleteApiDestination" => self.delete_api_destination(&req),
154            "StartReplay" => self.start_replay(&req),
155            "DescribeReplay" => self.describe_replay(&req),
156            "ListReplays" => self.list_replays(&req),
157            "CancelReplay" => self.cancel_replay(&req),
158            "CreatePartnerEventSource" => self.create_partner_event_source(&req),
159            "DeletePartnerEventSource" => self.delete_partner_event_source(&req),
160            "DescribePartnerEventSource" => self.describe_partner_event_source(&req),
161            "ListPartnerEventSources" => self.list_partner_event_sources(&req),
162            "ListPartnerEventSourceAccounts" => self.list_partner_event_source_accounts(&req),
163            "ActivateEventSource" => self.activate_event_source(&req),
164            "DeactivateEventSource" => self.deactivate_event_source(&req),
165            "DescribeEventSource" => self.describe_event_source(&req),
166            "ListEventSources" => self.list_event_sources(&req),
167            "PutPartnerEvents" => self.put_partner_events(&req),
168            "TestEventPattern" => self.test_event_pattern(&req),
169            "UpdateEventBus" => self.update_event_bus(&req),
170            "CreateEndpoint" => self.create_endpoint(&req),
171            "DeleteEndpoint" => self.delete_endpoint(&req),
172            "DescribeEndpoint" => self.describe_endpoint(&req),
173            "ListEndpoints" => self.list_endpoints(&req),
174            "UpdateEndpoint" => self.update_endpoint(&req),
175            "DeauthorizeConnection" => self.deauthorize_connection(&req),
176            _ => Err(AwsServiceError::action_not_implemented(
177                "events",
178                &req.action,
179            )),
180        }
181    }
182
183    fn supported_actions(&self) -> &[&str] {
184        &[
185            "CreateEventBus",
186            "DeleteEventBus",
187            "ListEventBuses",
188            "DescribeEventBus",
189            "PutRule",
190            "DeleteRule",
191            "ListRules",
192            "DescribeRule",
193            "EnableRule",
194            "DisableRule",
195            "PutTargets",
196            "RemoveTargets",
197            "ListTargetsByRule",
198            "ListRuleNamesByTarget",
199            "PutEvents",
200            "PutPermission",
201            "RemovePermission",
202            "TagResource",
203            "UntagResource",
204            "ListTagsForResource",
205            "CreateArchive",
206            "DescribeArchive",
207            "ListArchives",
208            "UpdateArchive",
209            "DeleteArchive",
210            "CreateConnection",
211            "DescribeConnection",
212            "ListConnections",
213            "UpdateConnection",
214            "DeleteConnection",
215            "CreateApiDestination",
216            "DescribeApiDestination",
217            "ListApiDestinations",
218            "UpdateApiDestination",
219            "DeleteApiDestination",
220            "StartReplay",
221            "DescribeReplay",
222            "ListReplays",
223            "CancelReplay",
224            "CreatePartnerEventSource",
225            "DeletePartnerEventSource",
226            "DescribePartnerEventSource",
227            "ListPartnerEventSources",
228            "ListPartnerEventSourceAccounts",
229            "ActivateEventSource",
230            "DeactivateEventSource",
231            "DescribeEventSource",
232            "ListEventSources",
233            "PutPartnerEvents",
234            "TestEventPattern",
235            "UpdateEventBus",
236            "CreateEndpoint",
237            "DeleteEndpoint",
238            "DescribeEndpoint",
239            "ListEndpoints",
240            "UpdateEndpoint",
241            "DeauthorizeConnection",
242        ]
243    }
244}
245
246fn parse_tags(body: &Value) -> HashMap<String, String> {
247    let mut tags = HashMap::new();
248    if let Some(arr) = body["Tags"].as_array() {
249        for tag in arr {
250            if let (Some(key), Some(val)) = (tag["Key"].as_str(), tag["Value"].as_str()) {
251                tags.insert(key.to_string(), val.to_string());
252            }
253        }
254    }
255    tags
256}
257
258fn parse_target(target: &Value) -> EventTarget {
259    EventTarget {
260        id: target["Id"].as_str().unwrap_or("").to_string(),
261        arn: target["Arn"].as_str().unwrap_or("").to_string(),
262        input: target["Input"].as_str().map(|s| s.to_string()),
263        input_path: target["InputPath"].as_str().map(|s| s.to_string()),
264        input_transformer: target.get("InputTransformer").cloned(),
265        sqs_parameters: target.get("SqsParameters").cloned(),
266    }
267}
268
269fn target_to_json(t: &EventTarget) -> Value {
270    let mut obj = json!({ "Id": t.id, "Arn": t.arn });
271    if let Some(ref input) = t.input {
272        obj["Input"] = json!(input);
273    }
274    if let Some(ref input_path) = t.input_path {
275        obj["InputPath"] = json!(input_path);
276    }
277    if let Some(ref it) = t.input_transformer {
278        obj["InputTransformer"] = it.clone();
279    }
280    if let Some(ref sp) = t.sqs_parameters {
281        obj["SqsParameters"] = sp.clone();
282    }
283    obj
284}
285
286// ─── Event Bus Operations ───────────────────────────────────────────
287impl EventBridgeService {
288    fn create_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
289        let body = req.json_body();
290        validate_required("Name", &body["Name"])?;
291        let name = body["Name"]
292            .as_str()
293            .ok_or_else(|| missing("Name"))?
294            .to_string();
295        validate_string_length("name", &name, 1, 256)?;
296        validate_optional_string_length(
297            "eventSourceName",
298            body["EventSourceName"].as_str(),
299            1,
300            256,
301        )?;
302        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
303        validate_optional_string_length(
304            "kmsKeyIdentifier",
305            body["KmsKeyIdentifier"].as_str(),
306            0,
307            2048,
308        )?;
309
310        // Validate name doesn't contain '/' (unless partner bus)
311        if name.contains('/') && !name.starts_with("aws.partner/") {
312            return Err(AwsServiceError::aws_error(
313                StatusCode::BAD_REQUEST,
314                "ValidationException",
315                "Event bus name must not contain '/'.",
316            ));
317        }
318
319        // Partner event bus validation
320        if name.starts_with("aws.partner/") {
321            let event_source = body["EventSourceName"].as_str().unwrap_or("");
322            let state_r = self.state.read();
323            let has_source = state_r.partner_event_sources.contains_key(event_source);
324            drop(state_r);
325            if !has_source {
326                return Err(AwsServiceError::aws_error(
327                    StatusCode::BAD_REQUEST,
328                    "ResourceNotFoundException",
329                    format!("Event source {event_source} does not exist."),
330                ));
331            }
332        }
333
334        let mut state = self.state.write();
335
336        if state.buses.contains_key(&name) {
337            return Err(AwsServiceError::aws_error(
338                StatusCode::BAD_REQUEST,
339                "ResourceAlreadyExistsException",
340                format!("Event bus {name} already exists."),
341            ));
342        }
343
344        let arn = format!(
345            "arn:aws:events:{}:{}:event-bus/{}",
346            req.region, state.account_id, name
347        );
348        let now = Utc::now();
349        let description = body["Description"].as_str().map(|s| s.to_string());
350        let kms_key_identifier = body["KmsKeyIdentifier"].as_str().map(|s| s.to_string());
351        let dead_letter_config = body.get("DeadLetterConfig").cloned();
352
353        let tags = parse_tags(&body);
354
355        let bus = EventBus {
356            name: name.clone(),
357            arn: arn.clone(),
358            tags,
359            policy: None,
360            description,
361            kms_key_identifier,
362            dead_letter_config,
363            creation_time: now,
364            last_modified_time: now,
365        };
366        state.buses.insert(name, bus);
367
368        Ok(AwsResponse::ok_json(json!({ "EventBusArn": arn })))
369    }
370
371    fn delete_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
372        let body = req.json_body();
373        validate_required("Name", &body["Name"])?;
374        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
375        validate_string_length("name", name, 1, 256)?;
376
377        if name == "default" {
378            return Err(AwsServiceError::aws_error(
379                StatusCode::BAD_REQUEST,
380                "ValidationException",
381                format!("Cannot delete event bus {name}."),
382            ));
383        }
384
385        let mut state = self.state.write();
386        state.buses.remove(name);
387        state.rules.retain(|k, _| k.0 != name);
388
389        Ok(AwsResponse::ok_json(json!({})))
390    }
391
392    fn list_event_buses(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
393        let body = req.json_body();
394        validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 256)?;
395        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
396        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
397        let name_prefix = body["NamePrefix"].as_str();
398        let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
399        if let Some(t) = body["NextToken"].as_str() {
400            t.parse::<usize>().map_err(|_| {
401                AwsServiceError::aws_error(
402                    StatusCode::BAD_REQUEST,
403                    "InvalidNextTokenException",
404                    format!("Invalid NextToken value: '{t}'"),
405                )
406            })?;
407        }
408
409        let state = self.state.read();
410        let filtered: Vec<&_> = state
411            .buses
412            .values()
413            .filter(|b| match name_prefix {
414                Some(prefix) => b.name.starts_with(prefix),
415                None => true,
416            })
417            .collect();
418
419        let (page, next_token) = paginate(&filtered, body["NextToken"].as_str(), limit);
420        let buses: Vec<Value> = page
421            .iter()
422            .map(|b| json!({ "Name": b.name, "Arn": b.arn }))
423            .collect();
424        let mut resp = json!({ "EventBuses": buses });
425        if let Some(token) = next_token {
426            resp["NextToken"] = json!(token);
427        }
428
429        Ok(AwsResponse::ok_json(resp))
430    }
431
432    fn describe_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
433        let body = req.json_body();
434        validate_optional_string_length("name", body["Name"].as_str(), 1, 1600)?;
435        let name = body["Name"].as_str().unwrap_or("default");
436
437        let state = self.state.read();
438        let bus = state.buses.get(name).ok_or_else(|| {
439            AwsServiceError::aws_error(
440                StatusCode::BAD_REQUEST,
441                "ResourceNotFoundException",
442                format!("Event bus {name} does not exist."),
443            )
444        })?;
445
446        let mut resp = json!({
447            "Name": bus.name,
448            "Arn": bus.arn,
449            "CreationTime": bus.creation_time.timestamp() as f64,
450            "LastModifiedTime": bus.last_modified_time.timestamp() as f64,
451        });
452
453        if let Some(ref policy) = bus.policy {
454            resp["Policy"] = Value::String(serde_json::to_string(policy).unwrap());
455        }
456        if let Some(ref desc) = bus.description {
457            resp["Description"] = json!(desc);
458        }
459        if let Some(ref kms) = bus.kms_key_identifier {
460            resp["KmsKeyIdentifier"] = json!(kms);
461        }
462        if let Some(ref dlc) = bus.dead_letter_config {
463            resp["DeadLetterConfig"] = dlc.clone();
464        }
465
466        Ok(AwsResponse::ok_json(resp))
467    }
468
469    // ─── Permission Operations ──────────────────────────────────────────
470
471    fn put_permission(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
472        let body = req.json_body();
473        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 256)?;
474        validate_optional_string_length("action", body["Action"].as_str(), 1, 64)?;
475        validate_optional_string_length("principal", body["Principal"].as_str(), 1, 12)?;
476        validate_optional_string_length("statementId", body["StatementId"].as_str(), 1, 64)?;
477        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
478
479        let mut state = self.state.write();
480
481        let bus = state.buses.get_mut(event_bus_name).ok_or_else(|| {
482            AwsServiceError::aws_error(
483                StatusCode::BAD_REQUEST,
484                "ResourceNotFoundException",
485                format!("Event bus {event_bus_name} does not exist."),
486            )
487        })?;
488
489        // Check if Policy is provided (new-style)
490        if let Some(policy_str) = body["Policy"].as_str() {
491            if let Ok(policy) = serde_json::from_str::<Value>(policy_str) {
492                bus.policy = Some(policy);
493                return Ok(AwsResponse::ok_json(json!({})));
494            }
495        }
496
497        // Old-style: Action, Principal, StatementId
498        let action = body["Action"].as_str().unwrap_or("");
499        let principal = body["Principal"].as_str().unwrap_or("");
500        let statement_id = body["StatementId"].as_str().unwrap_or("");
501
502        // Validate action
503        if action != "events:PutEvents" {
504            return Err(AwsServiceError::aws_error(
505                StatusCode::BAD_REQUEST,
506                "ValidationException",
507                "Provided value in parameter 'action' is not supported.",
508            ));
509        }
510
511        let statement = json!({
512            "Sid": statement_id,
513            "Effect": "Allow",
514            "Principal": { "AWS": Arn::global("iam", principal, "root").to_string() },
515            "Action": action,
516            "Resource": bus.arn,
517        });
518
519        let policy = bus.policy.get_or_insert_with(|| {
520            json!({
521                "Version": "2012-10-17",
522                "Statement": [],
523            })
524        });
525
526        if let Some(stmts) = policy["Statement"].as_array_mut() {
527            stmts.push(statement);
528        }
529
530        Ok(AwsResponse::ok_json(json!({})))
531    }
532
533    fn remove_permission(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
534        let body = req.json_body();
535        validate_optional_string_length("statementId", body["StatementId"].as_str(), 1, 64)?;
536        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 256)?;
537        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
538        let statement_id = body["StatementId"].as_str().unwrap_or("");
539        let remove_all = body["RemoveAllPermissions"].as_bool().unwrap_or(false);
540
541        let mut state = self.state.write();
542
543        let bus = state.buses.get_mut(event_bus_name).ok_or_else(|| {
544            AwsServiceError::aws_error(
545                StatusCode::BAD_REQUEST,
546                "ResourceNotFoundException",
547                format!("Event bus {event_bus_name} does not exist."),
548            )
549        })?;
550
551        if remove_all {
552            bus.policy = None;
553            return Ok(AwsResponse::ok_json(json!({})));
554        }
555
556        let policy = bus.policy.as_mut().ok_or_else(|| {
557            AwsServiceError::aws_error(
558                StatusCode::BAD_REQUEST,
559                "ResourceNotFoundException",
560                "EventBus does not have a policy.",
561            )
562        })?;
563
564        if let Some(stmts) = policy["Statement"].as_array_mut() {
565            let before = stmts.len();
566            stmts.retain(|s| s["Sid"].as_str() != Some(statement_id));
567            if stmts.len() == before {
568                return Err(AwsServiceError::aws_error(
569                    StatusCode::BAD_REQUEST,
570                    "ResourceNotFoundException",
571                    "Statement with the provided id does not exist.",
572                ));
573            }
574            if stmts.is_empty() {
575                bus.policy = None;
576            }
577        }
578
579        Ok(AwsResponse::ok_json(json!({})))
580    }
581
582    // ─── Rule Operations ────────────────────────────────────────────────
583
584    fn put_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
585        let body = req.json_body();
586        validate_required("Name", &body["Name"])?;
587        let name = body["Name"]
588            .as_str()
589            .ok_or_else(|| missing("Name"))?
590            .to_string();
591        validate_string_length("name", &name, 1, 64)?;
592        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
593        validate_optional_string_length(
594            "scheduleExpression",
595            body["ScheduleExpression"].as_str(),
596            0,
597            256,
598        )?;
599        validate_optional_string_length("eventPattern", body["EventPattern"].as_str(), 0, 4096)?;
600        validate_optional_enum(
601            "state",
602            body["State"].as_str(),
603            &[
604                "ENABLED",
605                "DISABLED",
606                "ENABLED_WITH_ALL_CLOUDTRAIL_MANAGEMENT_EVENTS",
607            ],
608        )?;
609        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
610        validate_optional_string_length("roleArn", body["RoleArn"].as_str(), 1, 1600)?;
611
612        let raw_bus = body["EventBusName"]
613            .as_str()
614            .unwrap_or("default")
615            .to_string();
616
617        let mut state = self.state.write();
618        let event_bus_name = state.resolve_bus_name(&raw_bus);
619
620        let event_pattern = body["EventPattern"].as_str().and_then(|s| {
621            if s.is_empty() {
622                None
623            } else {
624                Some(s.to_string())
625            }
626        });
627        let schedule_expression = body["ScheduleExpression"].as_str().and_then(|s| {
628            if s.is_empty() {
629                None
630            } else {
631                Some(s.to_string())
632            }
633        });
634        let description = body["Description"].as_str().map(|s| s.to_string());
635        let role_arn = body["RoleArn"].as_str().map(|s| s.to_string());
636        let rule_state = body["State"].as_str().unwrap_or("ENABLED").to_string();
637
638        // Validate: schedule expressions only on default bus
639        if schedule_expression.is_some() && event_bus_name != "default" {
640            return Err(AwsServiceError::aws_error(
641                StatusCode::BAD_REQUEST,
642                "ValidationException",
643                "ScheduleExpression is supported only on the default event bus.",
644            ));
645        }
646
647        if !state.buses.contains_key(&event_bus_name) {
648            return Err(AwsServiceError::aws_error(
649                StatusCode::BAD_REQUEST,
650                "ResourceNotFoundException",
651                format!("Event bus {event_bus_name} does not exist."),
652            ));
653        }
654
655        let arn = if event_bus_name == "default" {
656            format!(
657                "arn:aws:events:{}:{}:rule/{}",
658                req.region, state.account_id, name
659            )
660        } else {
661            format!(
662                "arn:aws:events:{}:{}:rule/{}/{}",
663                req.region, state.account_id, event_bus_name, name
664            )
665        };
666
667        let key = (event_bus_name.clone(), name.clone());
668        let targets = state
669            .rules
670            .get(&key)
671            .map(|r| r.targets.clone())
672            .unwrap_or_default();
673
674        let tags = parse_tags(&body);
675
676        let rule = EventRule {
677            name: name.clone(),
678            arn: arn.clone(),
679            event_bus_name,
680            event_pattern,
681            schedule_expression,
682            state: rule_state,
683            description,
684            role_arn,
685            managed_by: None,
686            created_by: None,
687            targets,
688            tags,
689            last_fired: None,
690        };
691
692        state.rules.insert(key, rule);
693        Ok(AwsResponse::ok_json(json!({ "RuleArn": arn })))
694    }
695
696    fn delete_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
697        let body = req.json_body();
698        validate_required("Name", &body["Name"])?;
699        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
700        validate_string_length("name", name, 1, 64)?;
701        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
702        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
703
704        let mut state = self.state.write();
705        let bus_name = state.resolve_bus_name(event_bus_name);
706        let key = (bus_name, name.to_string());
707
708        // Check if rule has targets
709        if let Some(rule) = state.rules.get(&key) {
710            if !rule.targets.is_empty() {
711                return Err(AwsServiceError::aws_error(
712                    StatusCode::BAD_REQUEST,
713                    "ValidationException",
714                    "Rule can't be deleted since it has targets.",
715                ));
716            }
717        }
718
719        state.rules.remove(&key);
720        Ok(AwsResponse::ok_json(json!({})))
721    }
722
723    fn list_rules(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
724        let body = req.json_body();
725        validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 64)?;
726        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
727        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
728        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
729        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
730        let name_prefix = body["NamePrefix"].as_str();
731        let limit = body["Limit"].as_u64().map(|n| n as usize);
732        let next_token = body["NextToken"].as_str();
733
734        let state = self.state.read();
735        let bus_name = state.resolve_bus_name(event_bus_name);
736
737        let mut rules: Vec<&EventRule> = state
738            .rules
739            .values()
740            .filter(|r| r.event_bus_name == bus_name)
741            .filter(|r| match name_prefix {
742                Some(prefix) => r.name.starts_with(prefix),
743                None => true,
744            })
745            .collect();
746        rules.sort_by(|a, b| a.name.cmp(&b.name));
747
748        // Pagination
749        let start = next_token
750            .and_then(|t| t.parse::<usize>().ok())
751            .unwrap_or(0)
752            .min(rules.len());
753        let rules_slice = &rules[start..];
754
755        let (page, new_next_token) = if let Some(lim) = limit {
756            if rules_slice.len() > lim {
757                (&rules_slice[..lim], Some((start + lim).to_string()))
758            } else {
759                (rules_slice, None)
760            }
761        } else {
762            (rules_slice, None)
763        };
764
765        let rules_json: Vec<Value> = page
766            .iter()
767            .map(|r| {
768                let mut obj = json!({
769                    "Name": r.name,
770                    "Arn": r.arn,
771                    "EventBusName": r.event_bus_name,
772                    "State": r.state,
773                });
774                if let Some(ref desc) = r.description {
775                    obj["Description"] = json!(desc);
776                }
777                if let Some(ref ep) = r.event_pattern {
778                    obj["EventPattern"] = json!(ep);
779                }
780                if let Some(ref se) = r.schedule_expression {
781                    obj["ScheduleExpression"] = json!(se);
782                }
783                if let Some(ref mb) = r.managed_by {
784                    obj["ManagedBy"] = json!(mb);
785                }
786                obj
787            })
788            .collect();
789
790        let mut resp = json!({ "Rules": rules_json });
791        if let Some(token) = new_next_token {
792            resp["NextToken"] = json!(token);
793        }
794
795        Ok(AwsResponse::ok_json(resp))
796    }
797
798    fn describe_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
799        let body = req.json_body();
800        validate_required("Name", &body["Name"])?;
801        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
802        validate_string_length("name", name, 1, 64)?;
803        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
804        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
805
806        let state = self.state.read();
807        let bus_name = state.resolve_bus_name(event_bus_name);
808        let key = (bus_name.clone(), name.to_string());
809
810        let rule = state.rules.get(&key).ok_or_else(|| {
811            AwsServiceError::aws_error(
812                StatusCode::BAD_REQUEST,
813                "ResourceNotFoundException",
814                format!("Rule {name} does not exist."),
815            )
816        })?;
817
818        let mut resp = json!({
819            "Name": rule.name,
820            "Arn": rule.arn,
821            "EventBusName": rule.event_bus_name,
822            "State": rule.state,
823        });
824
825        if let Some(ref desc) = rule.description {
826            resp["Description"] = json!(desc);
827        }
828        if let Some(ref ep) = rule.event_pattern {
829            resp["EventPattern"] = json!(ep);
830        }
831        if let Some(ref se) = rule.schedule_expression {
832            resp["ScheduleExpression"] = json!(se);
833        }
834        if let Some(ref role) = rule.role_arn {
835            resp["RoleArn"] = json!(role);
836        }
837        if let Some(ref mb) = rule.managed_by {
838            resp["ManagedBy"] = json!(mb);
839        }
840        if let Some(ref cb) = rule.created_by {
841            resp["CreatedBy"] = json!(cb);
842        }
843        // If non-default bus, set CreatedBy to account_id
844        if rule.event_bus_name != "default" && rule.created_by.is_none() {
845            resp["CreatedBy"] = json!(state.account_id);
846        }
847
848        Ok(AwsResponse::ok_json(resp))
849    }
850
851    fn enable_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
852        let body = req.json_body();
853        validate_required("Name", &body["Name"])?;
854        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
855        validate_string_length("name", name, 1, 64)?;
856        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
857        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
858
859        let mut state = self.state.write();
860        let bus_name = state.resolve_bus_name(event_bus_name);
861        let key = (bus_name, name.to_string());
862
863        let rule = state.rules.get_mut(&key).ok_or_else(|| {
864            AwsServiceError::aws_error(
865                StatusCode::BAD_REQUEST,
866                "ResourceNotFoundException",
867                format!("Rule {name} does not exist."),
868            )
869        })?;
870
871        rule.state = "ENABLED".to_string();
872        Ok(AwsResponse::ok_json(json!({})))
873    }
874
875    fn disable_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
876        let body = req.json_body();
877        validate_required("Name", &body["Name"])?;
878        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
879        validate_string_length("name", name, 1, 64)?;
880        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
881        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
882
883        let mut state = self.state.write();
884        let bus_name = state.resolve_bus_name(event_bus_name);
885        let key = (bus_name, name.to_string());
886
887        let rule = state.rules.get_mut(&key).ok_or_else(|| {
888            AwsServiceError::aws_error(
889                StatusCode::BAD_REQUEST,
890                "ResourceNotFoundException",
891                format!("Rule {name} does not exist."),
892            )
893        })?;
894
895        rule.state = "DISABLED".to_string();
896        Ok(AwsResponse::ok_json(json!({})))
897    }
898
899    // ─── Target Operations ──────────────────────────────────────────────
900
901    fn put_targets(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
902        let body = req.json_body();
903        validate_required("Rule", &body["Rule"])?;
904        let rule_name = body["Rule"].as_str().ok_or_else(|| missing("Rule"))?;
905        validate_string_length("rule", rule_name, 1, 64)?;
906        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
907        validate_required("Targets", &body["Targets"])?;
908        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
909        let targets = body["Targets"]
910            .as_array()
911            .ok_or_else(|| missing("Targets"))?;
912
913        // Validate targets - check for FIFO SQS without SqsParameters
914        for target in targets {
915            let target_id = target["Id"].as_str().unwrap_or("");
916            let target_arn = target["Arn"].as_str().unwrap_or("");
917
918            if target_arn.ends_with(".fifo") && target.get("SqsParameters").is_none() {
919                return Err(AwsServiceError::aws_error(
920                    StatusCode::BAD_REQUEST,
921                    "ValidationException",
922                    format!(
923                        "Parameter(s) SqsParameters must be specified for target: {target_id}."
924                    ),
925                ));
926            }
927
928            // Validate ARN format
929            if !target_arn.starts_with("arn:") {
930                return Err(AwsServiceError::aws_error(
931                    StatusCode::BAD_REQUEST,
932                    "ValidationException",
933                    format!(
934                        "Parameter {target_arn} is not valid. Reason: Provided Arn is not in correct format."
935                    ),
936                ));
937            }
938        }
939
940        let mut state = self.state.write();
941        let bus_name = state.resolve_bus_name(event_bus_name);
942        let key = (bus_name.clone(), rule_name.to_string());
943
944        let rule = state.rules.get_mut(&key).ok_or_else(|| {
945            AwsServiceError::aws_error(
946                StatusCode::BAD_REQUEST,
947                "ResourceNotFoundException",
948                format!("Rule {rule_name} does not exist on EventBus {bus_name}."),
949            )
950        })?;
951
952        for target in targets {
953            let et = parse_target(target);
954            // Remove existing target with same ID
955            rule.targets.retain(|t| t.id != et.id);
956            rule.targets.push(et);
957        }
958
959        Ok(AwsResponse::ok_json(json!({
960            "FailedEntryCount": 0,
961            "FailedEntries": [],
962        })))
963    }
964
965    fn remove_targets(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
966        let body = req.json_body();
967        validate_required("Rule", &body["Rule"])?;
968        let rule_name = body["Rule"].as_str().ok_or_else(|| missing("Rule"))?;
969        validate_string_length("rule", rule_name, 1, 64)?;
970        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
971        validate_required("Ids", &body["Ids"])?;
972        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
973        let ids = body["Ids"].as_array().ok_or_else(|| missing("Ids"))?;
974
975        let target_ids: Vec<String> = ids
976            .iter()
977            .filter_map(|v| v.as_str().map(|s| s.to_string()))
978            .collect();
979
980        let mut state = self.state.write();
981        let bus_name = state.resolve_bus_name(event_bus_name);
982        let key = (bus_name.clone(), rule_name.to_string());
983
984        let rule = state.rules.get_mut(&key).ok_or_else(|| {
985            AwsServiceError::aws_error(
986                StatusCode::BAD_REQUEST,
987                "ResourceNotFoundException",
988                format!("Rule {rule_name} does not exist on EventBus {bus_name}."),
989            )
990        })?;
991
992        rule.targets.retain(|t| !target_ids.contains(&t.id));
993
994        Ok(AwsResponse::ok_json(json!({
995            "FailedEntryCount": 0,
996            "FailedEntries": [],
997        })))
998    }
999
1000    fn list_targets_by_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1001        let body = req.json_body();
1002        validate_required("Rule", &body["Rule"])?;
1003        let rule_name = body["Rule"].as_str().ok_or_else(|| missing("Rule"))?;
1004        validate_string_length("rule", rule_name, 1, 64)?;
1005        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
1006        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1007        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1008        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
1009        let limit = body["Limit"].as_u64().map(|n| n as usize);
1010        let next_token = body["NextToken"].as_str();
1011
1012        let state = self.state.read();
1013        let bus_name = state.resolve_bus_name(event_bus_name);
1014        let key = (bus_name, rule_name.to_string());
1015
1016        let rule = state.rules.get(&key).ok_or_else(|| {
1017            AwsServiceError::aws_error(
1018                StatusCode::BAD_REQUEST,
1019                "ResourceNotFoundException",
1020                format!("Rule {rule_name} does not exist."),
1021            )
1022        })?;
1023
1024        let all_targets = &rule.targets;
1025        let start = next_token
1026            .and_then(|t| t.parse::<usize>().ok())
1027            .unwrap_or(0)
1028            .min(all_targets.len());
1029        let slice = &all_targets[start..];
1030
1031        let (page, new_next_token) = if let Some(lim) = limit {
1032            if slice.len() > lim {
1033                (&slice[..lim], Some((start + lim).to_string()))
1034            } else {
1035                (slice, None)
1036            }
1037        } else {
1038            (slice, None)
1039        };
1040
1041        let targets: Vec<Value> = page.iter().map(target_to_json).collect();
1042
1043        let mut resp = json!({ "Targets": targets });
1044        if let Some(token) = new_next_token {
1045            resp["NextToken"] = json!(token);
1046        }
1047
1048        Ok(AwsResponse::ok_json(resp))
1049    }
1050
1051    fn list_rule_names_by_target(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1052        let body = req.json_body();
1053        validate_required("TargetArn", &body["TargetArn"])?;
1054        let target_arn = body["TargetArn"]
1055            .as_str()
1056            .ok_or_else(|| missing("TargetArn"))?;
1057        validate_string_length("targetArn", target_arn, 1, 1600)?;
1058        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
1059        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1060        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1061        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
1062        let limit = body["Limit"].as_u64().map(|n| n as usize);
1063        let next_token = body["NextToken"].as_str();
1064
1065        let state = self.state.read();
1066        let bus_name = state.resolve_bus_name(event_bus_name);
1067
1068        // Deduplicate rule names
1069        let mut rule_names: Vec<String> = Vec::new();
1070        for rule in state.rules.values() {
1071            if rule.event_bus_name == bus_name
1072                && rule.targets.iter().any(|t| t.arn == target_arn)
1073                && !rule_names.contains(&rule.name)
1074            {
1075                rule_names.push(rule.name.clone());
1076            }
1077        }
1078        rule_names.sort();
1079
1080        let start = next_token
1081            .and_then(|t| t.parse::<usize>().ok())
1082            .unwrap_or(0)
1083            .min(rule_names.len());
1084        let slice = &rule_names[start..];
1085
1086        let (page, new_next_token) = if let Some(lim) = limit {
1087            if slice.len() > lim {
1088                (&slice[..lim], Some((start + lim).to_string()))
1089            } else {
1090                (slice, None)
1091            }
1092        } else {
1093            (slice, None)
1094        };
1095
1096        let mut resp = json!({ "RuleNames": page });
1097        if let Some(token) = new_next_token {
1098            resp["NextToken"] = json!(token);
1099        }
1100
1101        Ok(AwsResponse::ok_json(resp))
1102    }
1103
1104    // ─── Partner Event Sources ────────────���───────────────────────────
1105
1106    fn create_partner_event_source(
1107        &self,
1108        req: &AwsRequest,
1109    ) -> Result<AwsResponse, AwsServiceError> {
1110        let body = req.json_body();
1111        validate_required("Name", &body["Name"])?;
1112        let name = body["Name"]
1113            .as_str()
1114            .ok_or_else(|| missing("Name"))?
1115            .to_string();
1116        validate_string_length("name", &name, 1, 256)?;
1117        validate_required("Account", &body["Account"])?;
1118        let account = body["Account"]
1119            .as_str()
1120            .ok_or_else(|| missing("Account"))?
1121            .to_string();
1122        validate_string_length("account", &account, 12, 12)?;
1123
1124        let mut state = self.state.write();
1125        if state.partner_event_sources.contains_key(&name) {
1126            return Err(AwsServiceError::aws_error(
1127                StatusCode::CONFLICT,
1128                "ResourceAlreadyExistsException",
1129                format!("Partner event source {name} already exists."),
1130            ));
1131        }
1132        let arn = format!(
1133            "arn:aws:events:{}::event-source/aws.partner/{}",
1134            state.region, name
1135        );
1136        let now = Utc::now();
1137        let ps = PartnerEventSource {
1138            name: name.clone(),
1139            arn: arn.clone(),
1140            account,
1141            creation_time: now,
1142            expiration_time: None,
1143            state: "ACTIVE".to_string(),
1144        };
1145        state.partner_event_sources.insert(name.clone(), ps);
1146
1147        Ok(AwsResponse::ok_json(json!({ "EventSourceArn": arn })))
1148    }
1149
1150    fn delete_partner_event_source(
1151        &self,
1152        req: &AwsRequest,
1153    ) -> Result<AwsResponse, AwsServiceError> {
1154        let body = req.json_body();
1155        validate_required("Name", &body["Name"])?;
1156        let name = body["Name"]
1157            .as_str()
1158            .ok_or_else(|| missing("Name"))?
1159            .to_string();
1160        validate_required("Account", &body["Account"])?;
1161        let account = body["Account"]
1162            .as_str()
1163            .ok_or_else(|| missing("Account"))?
1164            .to_string();
1165
1166        let mut state = self.state.write();
1167        match state.partner_event_sources.get(&name) {
1168            Some(ps) if ps.account == account => {
1169                state.partner_event_sources.remove(&name);
1170            }
1171            Some(_) => {
1172                return Err(AwsServiceError::aws_error(
1173                    StatusCode::NOT_FOUND,
1174                    "ResourceNotFoundException",
1175                    format!("Partner event source {name} does not exist for account {account}."),
1176                ));
1177            }
1178            None => {
1179                return Err(AwsServiceError::aws_error(
1180                    StatusCode::NOT_FOUND,
1181                    "ResourceNotFoundException",
1182                    format!("Partner event source {name} does not exist."),
1183                ));
1184            }
1185        }
1186
1187        Ok(AwsResponse::ok_json(json!({})))
1188    }
1189
1190    fn describe_partner_event_source(
1191        &self,
1192        req: &AwsRequest,
1193    ) -> Result<AwsResponse, AwsServiceError> {
1194        let body = req.json_body();
1195        validate_required("Name", &body["Name"])?;
1196        let name = body["Name"]
1197            .as_str()
1198            .ok_or_else(|| missing("Name"))?
1199            .to_string();
1200        validate_string_length("name", &name, 1, 256)?;
1201
1202        let state = self.state.read();
1203        let ps = state.partner_event_sources.get(&name).ok_or_else(|| {
1204            AwsServiceError::aws_error(
1205                StatusCode::NOT_FOUND,
1206                "ResourceNotFoundException",
1207                format!("Partner event source {name} does not exist."),
1208            )
1209        })?;
1210
1211        Ok(AwsResponse::ok_json(json!({
1212            "Arn": ps.arn,
1213            "Name": ps.name,
1214        })))
1215    }
1216
1217    fn list_partner_event_sources(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1218        let body = req.json_body();
1219        validate_required("namePrefix", &body["NamePrefix"])?;
1220        let name_prefix = body["NamePrefix"]
1221            .as_str()
1222            .ok_or_else(|| missing("NamePrefix"))?;
1223        validate_string_length("namePrefix", name_prefix, 1, 256)?;
1224        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1225        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1226        let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
1227
1228        let state = self.state.read();
1229        let all: Vec<Value> = state
1230            .partner_event_sources
1231            .values()
1232            .filter(|ps| ps.name.starts_with(name_prefix))
1233            .map(|ps| {
1234                json!({
1235                    "Arn": ps.arn,
1236                    "Name": ps.name,
1237                })
1238            })
1239            .collect();
1240
1241        let (sources, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
1242        let mut resp = json!({ "PartnerEventSources": sources });
1243        if let Some(token) = next_token {
1244            resp["NextToken"] = json!(token);
1245        }
1246
1247        Ok(AwsResponse::ok_json(resp))
1248    }
1249
1250    fn list_partner_event_source_accounts(
1251        &self,
1252        req: &AwsRequest,
1253    ) -> Result<AwsResponse, AwsServiceError> {
1254        let body = req.json_body();
1255        validate_required("EventSourceName", &body["EventSourceName"])?;
1256        let event_source_name = body["EventSourceName"]
1257            .as_str()
1258            .ok_or_else(|| missing("EventSourceName"))?;
1259        validate_string_length("eventSourceName", event_source_name, 1, 256)?;
1260        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1261        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1262
1263        let state = self.state.read();
1264        let accounts: Vec<Value> = state
1265            .partner_event_sources
1266            .values()
1267            .filter(|ps| ps.name == event_source_name)
1268            .map(|ps| json!({ "Account": ps.account }))
1269            .collect();
1270
1271        Ok(AwsResponse::ok_json(json!({
1272            "PartnerEventSourceAccounts": accounts
1273        })))
1274    }
1275
1276    fn activate_event_source(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1277        let body = req.json_body();
1278        validate_required("Name", &body["Name"])?;
1279        let name = body["Name"]
1280            .as_str()
1281            .ok_or_else(|| missing("Name"))?
1282            .to_string();
1283
1284        let mut state = self.state.write();
1285        let ps = state.partner_event_sources.get_mut(&name).ok_or_else(|| {
1286            AwsServiceError::aws_error(
1287                StatusCode::NOT_FOUND,
1288                "ResourceNotFoundException",
1289                format!("Event source {name} does not exist."),
1290            )
1291        })?;
1292        ps.state = "ACTIVE".to_string();
1293
1294        Ok(AwsResponse::ok_json(json!({})))
1295    }
1296
1297    fn deactivate_event_source(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1298        let body = req.json_body();
1299        validate_required("Name", &body["Name"])?;
1300        let name = body["Name"]
1301            .as_str()
1302            .ok_or_else(|| missing("Name"))?
1303            .to_string();
1304
1305        let mut state = self.state.write();
1306        let ps = state.partner_event_sources.get_mut(&name).ok_or_else(|| {
1307            AwsServiceError::aws_error(
1308                StatusCode::NOT_FOUND,
1309                "ResourceNotFoundException",
1310                format!("Event source {name} does not exist."),
1311            )
1312        })?;
1313        ps.state = "INACTIVE".to_string();
1314
1315        Ok(AwsResponse::ok_json(json!({})))
1316    }
1317
1318    fn describe_event_source(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1319        let body = req.json_body();
1320        validate_required("Name", &body["Name"])?;
1321        let name = body["Name"]
1322            .as_str()
1323            .ok_or_else(|| missing("Name"))?
1324            .to_string();
1325
1326        let state = self.state.read();
1327        let ps = state.partner_event_sources.get(&name).ok_or_else(|| {
1328            AwsServiceError::aws_error(
1329                StatusCode::NOT_FOUND,
1330                "ResourceNotFoundException",
1331                format!("Event source {name} does not exist."),
1332            )
1333        })?;
1334
1335        Ok(AwsResponse::ok_json(json!({
1336            "Arn": ps.arn,
1337            "Name": ps.name,
1338            "CreatedBy": ps.account,
1339            "CreationTime": ps.creation_time.timestamp() as f64,
1340            "State": ps.state,
1341        })))
1342    }
1343
1344    fn list_event_sources(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1345        let body = req.json_body();
1346        validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 256)?;
1347        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1348        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1349        let name_prefix = body["NamePrefix"].as_str();
1350        let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
1351
1352        let state = self.state.read();
1353        let all: Vec<Value> = state
1354            .partner_event_sources
1355            .values()
1356            .filter(|ps| match name_prefix {
1357                Some(prefix) => ps.name.starts_with(prefix),
1358                None => true,
1359            })
1360            .map(|ps| {
1361                json!({
1362                    "Arn": ps.arn,
1363                    "Name": ps.name,
1364                    "CreatedBy": ps.account,
1365                    "CreationTime": ps.creation_time.timestamp() as f64,
1366                    "State": ps.state,
1367                })
1368            })
1369            .collect();
1370
1371        let (sources, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
1372        let mut resp = json!({ "EventSources": sources });
1373        if let Some(token) = next_token {
1374            resp["NextToken"] = json!(token);
1375        }
1376
1377        Ok(AwsResponse::ok_json(resp))
1378    }
1379
1380    fn put_partner_events(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1381        let body = req.json_body();
1382        validate_required("Entries", &body["Entries"])?;
1383        let entries = body["Entries"]
1384            .as_array()
1385            .ok_or_else(|| missing("Entries"))?;
1386
1387        let mut result_entries = Vec::new();
1388        for _entry in entries {
1389            let event_id = uuid::Uuid::new_v4().to_string();
1390            result_entries.push(json!({ "EventId": event_id }));
1391        }
1392
1393        Ok(AwsResponse::ok_json(json!({
1394            "FailedEntryCount": 0,
1395            "Entries": result_entries,
1396        })))
1397    }
1398
1399    // ─── TestEventPattern ────────────────────────────────────────────────
1400
1401    fn test_event_pattern(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1402        let body = req.json_body();
1403        validate_required("EventPattern", &body["EventPattern"])?;
1404        validate_required("Event", &body["Event"])?;
1405        let event_pattern = body["EventPattern"]
1406            .as_str()
1407            .ok_or_else(|| missing("EventPattern"))?;
1408        let event_str = body["Event"].as_str().ok_or_else(|| missing("Event"))?;
1409
1410        // Parse the event JSON
1411        let event: Value = serde_json::from_str(event_str).map_err(|_| {
1412            AwsServiceError::aws_error(
1413                StatusCode::BAD_REQUEST,
1414                "InvalidEventPatternException",
1415                "Event is not valid JSON.",
1416            )
1417        })?;
1418
1419        // Parse the pattern JSON
1420        let _pattern: Value = serde_json::from_str(event_pattern).map_err(|_| {
1421            AwsServiceError::aws_error(
1422                StatusCode::BAD_REQUEST,
1423                "InvalidEventPatternException",
1424                "Event pattern is not valid JSON.",
1425            )
1426        })?;
1427
1428        let source = event["source"].as_str().unwrap_or("");
1429        let detail_type = event["detail-type"].as_str().unwrap_or("");
1430        let detail = event
1431            .get("detail")
1432            .map(|v| serde_json::to_string(v).unwrap_or_default())
1433            .unwrap_or_else(|| "{}".to_string());
1434        let account = event["account"].as_str().unwrap_or("");
1435        let region = event["region"].as_str().unwrap_or("");
1436        let resources: Vec<String> = event["resources"]
1437            .as_array()
1438            .map(|arr| {
1439                arr.iter()
1440                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
1441                    .collect()
1442            })
1443            .unwrap_or_default();
1444
1445        let result = matches_pattern(
1446            Some(event_pattern),
1447            source,
1448            detail_type,
1449            &detail,
1450            account,
1451            region,
1452            &resources,
1453        );
1454
1455        Ok(AwsResponse::ok_json(json!({ "Result": result })))
1456    }
1457
1458    // ─── UpdateEventBus ─────────────────────────────────────────────────
1459
1460    fn update_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1461        let body = req.json_body();
1462        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
1463        validate_optional_string_length(
1464            "kmsKeyIdentifier",
1465            body["KmsKeyIdentifier"].as_str(),
1466            0,
1467            2048,
1468        )?;
1469        let name = body["Name"].as_str().unwrap_or("default");
1470
1471        let mut state = self.state.write();
1472        let bus = state.buses.get_mut(name).ok_or_else(|| {
1473            AwsServiceError::aws_error(
1474                StatusCode::BAD_REQUEST,
1475                "ResourceNotFoundException",
1476                format!("Event bus {name} does not exist."),
1477            )
1478        })?;
1479
1480        if let Some(desc) = body["Description"].as_str() {
1481            bus.description = Some(desc.to_string());
1482        }
1483        if let Some(kms) = body["KmsKeyIdentifier"].as_str() {
1484            bus.kms_key_identifier = Some(kms.to_string());
1485        }
1486        if let Some(dlc) = body.get("DeadLetterConfig") {
1487            bus.dead_letter_config = Some(dlc.clone());
1488        }
1489        bus.last_modified_time = Utc::now();
1490
1491        let arn = bus.arn.clone();
1492        let bus_name = bus.name.clone();
1493
1494        Ok(AwsResponse::ok_json(json!({
1495            "Arn": arn,
1496            "Name": bus_name,
1497        })))
1498    }
1499
1500    // ─── Endpoint Operations ────────────────────────────────────────────
1501
1502    fn create_endpoint(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1503        let body = req.json_body();
1504        validate_required("Name", &body["Name"])?;
1505        let name = body["Name"]
1506            .as_str()
1507            .ok_or_else(|| missing("Name"))?
1508            .to_string();
1509        validate_string_length("name", &name, 1, 64)?;
1510        validate_required("RoutingConfig", &body["RoutingConfig"])?;
1511        validate_required("EventBuses", &body["EventBuses"])?;
1512
1513        let description = body["Description"].as_str().map(|s| s.to_string());
1514        let routing_config = body["RoutingConfig"].clone();
1515        let replication_config = body.get("ReplicationConfig").cloned();
1516        let event_buses = body["EventBuses"].as_array().cloned().unwrap_or_default();
1517        let role_arn = body["RoleArn"].as_str().map(|s| s.to_string());
1518
1519        let mut state = self.state.write();
1520        if state.endpoints.contains_key(&name) {
1521            return Err(AwsServiceError::aws_error(
1522                StatusCode::CONFLICT,
1523                "ResourceAlreadyExistsException",
1524                format!("Endpoint {name} already exists."),
1525            ));
1526        }
1527
1528        let endpoint_id = format!("{}.abc123", name);
1529        let arn = format!(
1530            "arn:aws:events:{}:{}:endpoint/{}",
1531            req.region, state.account_id, name
1532        );
1533        let endpoint_url = format!(
1534            "https://{}.endpoint.events.{}.amazonaws.com",
1535            endpoint_id, req.region
1536        );
1537        let now = Utc::now();
1538
1539        let endpoint = Endpoint {
1540            name: name.clone(),
1541            arn: arn.clone(),
1542            endpoint_id: endpoint_id.clone(),
1543            endpoint_url: Some(endpoint_url),
1544            description,
1545            routing_config: routing_config.clone(),
1546            replication_config: replication_config.clone(),
1547            event_buses: event_buses.clone(),
1548            role_arn: role_arn.clone(),
1549            state: "ACTIVE".to_string(),
1550            creation_time: now,
1551            last_modified_time: now,
1552        };
1553        state.endpoints.insert(name.clone(), endpoint);
1554
1555        let mut resp = json!({
1556            "Name": name,
1557            "Arn": arn,
1558            "State": "ACTIVE",
1559            "RoutingConfig": routing_config,
1560            "EventBuses": event_buses,
1561        });
1562        if let Some(ref rc) = replication_config {
1563            resp["ReplicationConfig"] = rc.clone();
1564        }
1565        if let Some(ref ra) = role_arn {
1566            resp["RoleArn"] = json!(ra);
1567        }
1568
1569        Ok(AwsResponse::ok_json(resp))
1570    }
1571
1572    fn delete_endpoint(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1573        let body = req.json_body();
1574        validate_required("Name", &body["Name"])?;
1575        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
1576
1577        let mut state = self.state.write();
1578        state.endpoints.remove(name).ok_or_else(|| {
1579            AwsServiceError::aws_error(
1580                StatusCode::BAD_REQUEST,
1581                "ResourceNotFoundException",
1582                format!("Endpoint '{name}' does not exist."),
1583            )
1584        })?;
1585
1586        Ok(AwsResponse::ok_json(json!({})))
1587    }
1588
1589    fn describe_endpoint(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1590        let body = req.json_body();
1591        validate_required("Name", &body["Name"])?;
1592        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
1593
1594        let state = self.state.read();
1595        let ep = state.endpoints.get(name).ok_or_else(|| {
1596            AwsServiceError::aws_error(
1597                StatusCode::BAD_REQUEST,
1598                "ResourceNotFoundException",
1599                format!("Endpoint '{name}' does not exist."),
1600            )
1601        })?;
1602
1603        let mut resp = json!({
1604            "Name": ep.name,
1605            "Arn": ep.arn,
1606            "EndpointId": ep.endpoint_id,
1607            "State": ep.state,
1608            "RoutingConfig": ep.routing_config,
1609            "EventBuses": ep.event_buses,
1610            "CreationTime": ep.creation_time.timestamp() as f64,
1611            "LastModifiedTime": ep.last_modified_time.timestamp() as f64,
1612        });
1613        if let Some(ref url) = ep.endpoint_url {
1614            resp["EndpointUrl"] = json!(url);
1615        }
1616        if let Some(ref desc) = ep.description {
1617            resp["Description"] = json!(desc);
1618        }
1619        if let Some(ref rc) = ep.replication_config {
1620            resp["ReplicationConfig"] = rc.clone();
1621        }
1622        if let Some(ref ra) = ep.role_arn {
1623            resp["RoleArn"] = json!(ra);
1624        }
1625
1626        Ok(AwsResponse::ok_json(resp))
1627    }
1628
1629    fn list_endpoints(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1630        let body = req.json_body();
1631        validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 64)?;
1632        validate_optional_string_length("homeRegion", body["HomeRegion"].as_str(), 9, 20)?;
1633        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1634        validate_optional_range_i64("maxResults", body["MaxResults"].as_i64(), 1, 100)?;
1635        let name_prefix = body["NamePrefix"].as_str();
1636        let limit = body["MaxResults"].as_i64().unwrap_or(100) as usize;
1637
1638        let state = self.state.read();
1639        let all: Vec<Value> = state
1640            .endpoints
1641            .values()
1642            .filter(|ep| match name_prefix {
1643                Some(prefix) => ep.name.starts_with(prefix),
1644                None => true,
1645            })
1646            .map(|ep| {
1647                let mut obj = json!({
1648                    "Name": ep.name,
1649                    "Arn": ep.arn,
1650                    "EndpointId": ep.endpoint_id,
1651                    "State": ep.state,
1652                    "RoutingConfig": ep.routing_config,
1653                    "EventBuses": ep.event_buses,
1654                    "CreationTime": ep.creation_time.timestamp() as f64,
1655                    "LastModifiedTime": ep.last_modified_time.timestamp() as f64,
1656                });
1657                if let Some(ref url) = ep.endpoint_url {
1658                    obj["EndpointUrl"] = json!(url);
1659                }
1660                obj
1661            })
1662            .collect();
1663
1664        let (endpoints, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
1665        let mut resp = json!({ "Endpoints": endpoints });
1666        if let Some(token) = next_token {
1667            resp["NextToken"] = json!(token);
1668        }
1669
1670        Ok(AwsResponse::ok_json(resp))
1671    }
1672
1673    fn update_endpoint(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1674        let body = req.json_body();
1675        validate_required("Name", &body["Name"])?;
1676        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
1677
1678        let mut state = self.state.write();
1679        let ep = state.endpoints.get_mut(name).ok_or_else(|| {
1680            AwsServiceError::aws_error(
1681                StatusCode::BAD_REQUEST,
1682                "ResourceNotFoundException",
1683                format!("Endpoint '{name}' does not exist."),
1684            )
1685        })?;
1686
1687        if let Some(desc) = body["Description"].as_str() {
1688            ep.description = Some(desc.to_string());
1689        }
1690        if !body["RoutingConfig"].is_null() {
1691            ep.routing_config = body["RoutingConfig"].clone();
1692        }
1693        if let Some(rc) = body.get("ReplicationConfig") {
1694            ep.replication_config = Some(rc.clone());
1695        }
1696        if let Some(buses) = body["EventBuses"].as_array() {
1697            ep.event_buses = buses.clone();
1698        }
1699        if let Some(ra) = body["RoleArn"].as_str() {
1700            ep.role_arn = Some(ra.to_string());
1701        }
1702        ep.last_modified_time = Utc::now();
1703
1704        let resp = json!({
1705            "Name": ep.name,
1706            "Arn": ep.arn,
1707            "EndpointId": ep.endpoint_id,
1708            "State": ep.state,
1709            "RoutingConfig": ep.routing_config,
1710            "EventBuses": ep.event_buses,
1711        });
1712
1713        Ok(AwsResponse::ok_json(resp))
1714    }
1715
1716    // ─── DeauthorizeConnection ──────────────────────────────────────────
1717
1718    fn deauthorize_connection(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1719        let body = req.json_body();
1720        validate_required("Name", &body["Name"])?;
1721        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
1722        validate_string_length("name", name, 1, 64)?;
1723
1724        let mut state = self.state.write();
1725        let conn = state.connections.get_mut(name).ok_or_else(|| {
1726            AwsServiceError::aws_error(
1727                StatusCode::BAD_REQUEST,
1728                "ResourceNotFoundException",
1729                format!("Connection '{name}' does not exist."),
1730            )
1731        })?;
1732
1733        conn.connection_state = "DEAUTHORIZING".to_string();
1734        conn.last_modified_time = Utc::now();
1735
1736        let resp = json!({
1737            "ConnectionArn": conn.arn,
1738            "ConnectionState": conn.connection_state,
1739            "CreationTime": conn.creation_time.timestamp() as f64,
1740            "LastModifiedTime": conn.last_modified_time.timestamp() as f64,
1741            "LastAuthorizedTime": conn.last_authorized_time.timestamp() as f64,
1742        });
1743
1744        Ok(AwsResponse::ok_json(resp))
1745    }
1746
1747    // ─── PutEvents ──────────────────────────────────────────────────────
1748
1749    fn put_events(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1750        let body = req.json_body();
1751        validate_required("Entries", &body["Entries"])?;
1752        validate_optional_string_length("endpointId", body["EndpointId"].as_str(), 1, 50)?;
1753        let entries = body["Entries"]
1754            .as_array()
1755            .ok_or_else(|| missing("Entries"))?;
1756
1757        // Validate entries count
1758        if entries.is_empty() {
1759            return Err(AwsServiceError::aws_error(
1760                StatusCode::BAD_REQUEST,
1761                "ValidationException",
1762                "1 validation error detected: Value '[PutEventsRequestEntry]' at 'entries' failed to satisfy constraint: Member must have length greater than or equal to 1",
1763            ));
1764        }
1765        if entries.len() > 10 {
1766            return Err(AwsServiceError::aws_error(
1767                StatusCode::BAD_REQUEST,
1768                "ValidationException",
1769                "1 validation error detected: Value '[PutEventsRequestEntry]' at 'entries' failed to satisfy constraint: Member must have length less than or equal to 10",
1770            ));
1771        }
1772
1773        let mut state = self.state.write();
1774        let mut result_entries = Vec::new();
1775        let mut events_to_deliver = Vec::new();
1776        let mut failed_count = 0;
1777
1778        for entry in entries {
1779            let source = entry["Source"].as_str().unwrap_or("").to_string();
1780            let detail_type = entry["DetailType"].as_str().unwrap_or("").to_string();
1781            let detail = entry["Detail"].as_str().unwrap_or("").to_string();
1782
1783            if let Err(error) = validate_put_events_entry(&source, &detail_type, &detail) {
1784                failed_count += 1;
1785                result_entries.push(error);
1786                continue;
1787            }
1788
1789            let event_id = uuid::Uuid::new_v4().to_string();
1790            let raw_bus = entry["EventBusName"]
1791                .as_str()
1792                .unwrap_or("default")
1793                .to_string();
1794            let event_bus_name = state.resolve_bus_name(&raw_bus);
1795            let time = parse_put_events_time(&entry["Time"]);
1796            let resources: Vec<String> = entry["Resources"]
1797                .as_array()
1798                .map(|arr| {
1799                    arr.iter()
1800                        .filter_map(|v| v.as_str().map(|s| s.to_string()))
1801                        .collect()
1802                })
1803                .unwrap_or_default();
1804
1805            let event = PutEvent {
1806                event_id: event_id.clone(),
1807                source: source.clone(),
1808                detail_type: detail_type.clone(),
1809                detail: detail.clone(),
1810                event_bus_name: event_bus_name.clone(),
1811                time,
1812                resources: resources.clone(),
1813            };
1814
1815            archive_matching_event(
1816                &mut state,
1817                &event,
1818                &event_bus_name,
1819                &source,
1820                &detail_type,
1821                &detail,
1822                &req.account_id,
1823                &req.region,
1824                &resources,
1825            );
1826
1827            state.events.push(event);
1828
1829            // Find matching rules and their targets
1830            let matching_targets: Vec<EventTarget> = state
1831                .rules
1832                .values()
1833                .filter(|r| {
1834                    r.event_bus_name == event_bus_name
1835                        && r.state == "ENABLED"
1836                        && matches_pattern(
1837                            r.event_pattern.as_deref(),
1838                            &source,
1839                            &detail_type,
1840                            &detail,
1841                            &req.account_id,
1842                            &req.region,
1843                            &resources,
1844                        )
1845                })
1846                .flat_map(|r| r.targets.clone())
1847                .collect();
1848
1849            if !matching_targets.is_empty() {
1850                events_to_deliver.push((
1851                    event_id.clone(),
1852                    source,
1853                    detail_type,
1854                    detail,
1855                    time,
1856                    resources,
1857                    matching_targets,
1858                ));
1859            }
1860
1861            result_entries.push(json!({ "EventId": event_id }));
1862        }
1863
1864        // Drop the lock before delivering
1865        drop(state);
1866
1867        // Deliver to targets
1868        for (event_id, source, detail_type, detail, time, resources, targets) in events_to_deliver {
1869            let detail_value: Value = serde_json::from_str(&detail).unwrap_or(json!({}));
1870            let event_json = json!({
1871                "version": "0",
1872                "id": event_id,
1873                "source": source,
1874                "account": req.account_id,
1875                "detail-type": detail_type,
1876                "detail": detail_value,
1877                "time": time.format("%Y-%m-%dT%H:%M:%SZ").to_string(),
1878                "region": req.region,
1879                "resources": resources,
1880            });
1881            let event_str = event_json.to_string();
1882
1883            for target in targets {
1884                let arn = &target.arn;
1885                // Compute the message body, applying InputTransformer if present
1886                let body_str = if let Some(ref transformer) = target.input_transformer {
1887                    apply_input_transformer(transformer, &event_json)
1888                } else if let Some(ref input) = target.input {
1889                    input.clone()
1890                } else if let Some(ref input_path) = target.input_path {
1891                    resolve_json_path(&event_json, input_path)
1892                        .map(|v| v.to_string())
1893                        .unwrap_or_else(|| event_str.clone())
1894                } else {
1895                    event_str.clone()
1896                };
1897
1898                if arn.contains(":sqs:") {
1899                    // Extract FIFO parameters (MessageGroupId)
1900                    let group_id = target
1901                        .sqs_parameters
1902                        .as_ref()
1903                        .and_then(|p| p["MessageGroupId"].as_str())
1904                        .map(|s| s.to_string());
1905                    if group_id.is_some() {
1906                        // FIFO queue: send with group ID but no dedup ID.
1907                        // Queues with content-based dedup will auto-generate one;
1908                        // queues without it will reject the message.
1909                        self.delivery.send_to_sqs_with_attrs(
1910                            arn,
1911                            &body_str,
1912                            &HashMap::new(),
1913                            group_id.as_deref(),
1914                            None,
1915                        );
1916                    } else {
1917                        self.delivery.send_to_sqs(arn, &body_str, &HashMap::new());
1918                    }
1919                } else if arn.contains(":sns:") {
1920                    self.delivery
1921                        .publish_to_sns(arn, &body_str, Some(&detail_type));
1922                } else if arn.contains(":lambda:") {
1923                    tracing::info!(
1924                        function_arn = %arn,
1925                        payload = %body_str,
1926                        "EventBridge delivering to Lambda function"
1927                    );
1928                    let now = Utc::now();
1929                    let mut state = self.state.write();
1930                    state
1931                        .lambda_invocations
1932                        .push(crate::state::LambdaInvocation {
1933                            function_arn: arn.clone(),
1934                            payload: body_str.clone(),
1935                            timestamp: now,
1936                        });
1937                    drop(state);
1938                    // Record in Lambda state for cross-service visibility
1939                    if let Some(ref ls) = self.lambda_state {
1940                        ls.write().invocations.push(LambdaInvocation {
1941                            function_arn: arn.clone(),
1942                            payload: body_str.clone(),
1943                            timestamp: now,
1944                            source: "aws:events".to_string(),
1945                        });
1946                    }
1947                    // Actually invoke the Lambda function if a container runtime is available
1948                    invoke_lambda_async(
1949                        &self.container_runtime,
1950                        &self.lambda_state,
1951                        arn,
1952                        &body_str,
1953                    );
1954                } else if arn.contains(":logs:") {
1955                    tracing::info!(
1956                        log_group_arn = %arn,
1957                        payload = %body_str,
1958                        "EventBridge delivering to CloudWatch Logs"
1959                    );
1960                    let now = Utc::now();
1961                    let mut state = self.state.write();
1962                    state.log_deliveries.push(crate::state::LogDelivery {
1963                        log_group_arn: arn.clone(),
1964                        payload: body_str.clone(),
1965                        timestamp: now,
1966                    });
1967                    drop(state);
1968                    // Write event to CloudWatch Logs state
1969                    if let Some(ref log_state) = self.logs_state {
1970                        deliver_to_logs(log_state, arn, &body_str, now);
1971                    }
1972                } else if arn.contains(":kinesis:") {
1973                    tracing::info!(
1974                        stream_arn = %arn,
1975                        "EventBridge delivering to Kinesis stream"
1976                    );
1977                    // Use event ID as partition key for even distribution
1978                    self.delivery.send_to_kinesis(arn, &body_str, &event_id);
1979                } else if arn.contains(":states:") {
1980                    tracing::info!(
1981                        state_machine_arn = %arn,
1982                        "EventBridge delivering to Step Functions"
1983                    );
1984                    self.delivery.start_stepfunctions_execution(arn, &body_str);
1985                    let mut state = self.state.write();
1986                    state
1987                        .step_function_executions
1988                        .push(crate::state::StepFunctionExecution {
1989                            state_machine_arn: arn.clone(),
1990                            payload: body_str.clone(),
1991                            timestamp: Utc::now(),
1992                        });
1993                } else if arn.contains(":api-destination/") {
1994                    // ApiDestination target: look up destination + connection, then POST
1995                    let state = self.state.read();
1996                    let dest = state.api_destinations.values().find(|d| d.arn == *arn);
1997                    if let Some(dest) = dest {
1998                        let url = dest.invocation_endpoint.clone();
1999                        let method = dest.http_method.clone();
2000                        let conn = state
2001                            .connections
2002                            .values()
2003                            .find(|c| c.arn == dest.connection_arn)
2004                            .cloned();
2005                        drop(state);
2006
2007                        let payload = body_str.clone();
2008                        tokio::spawn(async move {
2009                            let client = reqwest::Client::new();
2010                            let mut req_builder = match method.as_str() {
2011                                "GET" => client.get(&url),
2012                                "PUT" => client.put(&url),
2013                                "DELETE" => client.delete(&url),
2014                                "PATCH" => client.patch(&url),
2015                                "HEAD" => client.head(&url),
2016                                _ => client.post(&url),
2017                            };
2018                            req_builder = req_builder.header("Content-Type", "application/json");
2019
2020                            // Apply auth from connection
2021                            if let Some(conn) = conn {
2022                                req_builder = apply_connection_auth(req_builder, &conn);
2023                            }
2024
2025                            let result = req_builder.body(payload).send().await;
2026                            if let Err(e) = result {
2027                                tracing::warn!(
2028                                    endpoint = %url,
2029                                    error = %e,
2030                                    "EventBridge ApiDestination delivery failed"
2031                                );
2032                            }
2033                        });
2034                    }
2035                } else if arn.starts_with("https://") || arn.starts_with("http://") {
2036                    // HTTP target — fire-and-forget POST
2037                    let url = arn.clone();
2038                    let payload = body_str.clone();
2039                    tokio::spawn(async move {
2040                        let client = reqwest::Client::new();
2041                        let result = client
2042                            .post(&url)
2043                            .header("Content-Type", "application/json")
2044                            .body(payload)
2045                            .send()
2046                            .await;
2047                        if let Err(e) = result {
2048                            tracing::warn!(
2049                                endpoint = %url,
2050                                error = %e,
2051                                "EventBridge HTTP target delivery failed"
2052                            );
2053                        }
2054                    });
2055                }
2056            }
2057        }
2058
2059        let resp = json!({
2060            "FailedEntryCount": failed_count,
2061            "Entries": result_entries,
2062        });
2063
2064        Ok(AwsResponse::ok_json(resp))
2065    }
2066
2067    // ─── Tagging ────────────────────────────────────────────────────────
2068
2069    fn tag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2070        let body = req.json_body();
2071        validate_required("ResourceARN", &body["ResourceARN"])?;
2072        let arn = body["ResourceARN"]
2073            .as_str()
2074            .ok_or_else(|| missing("ResourceARN"))?;
2075        validate_string_length("resourceARN", arn, 1, 1600)?;
2076        validate_required("Tags", &body["Tags"])?;
2077
2078        let mut state = self.state.write();
2079        let tag_map = find_tags_mut(&mut state, arn)?;
2080
2081        fakecloud_core::tags::apply_tags(tag_map, &body, "Tags", "Key", "Value").map_err(|f| {
2082            AwsServiceError::aws_error(
2083                StatusCode::BAD_REQUEST,
2084                "ValidationException",
2085                format!("{f} must be a list"),
2086            )
2087        })?;
2088
2089        Ok(AwsResponse::ok_json(json!({})))
2090    }
2091
2092    fn untag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2093        let body = req.json_body();
2094        validate_required("ResourceARN", &body["ResourceARN"])?;
2095        let arn = body["ResourceARN"]
2096            .as_str()
2097            .ok_or_else(|| missing("ResourceARN"))?;
2098        validate_string_length("resourceARN", arn, 1, 1600)?;
2099        validate_required("TagKeys", &body["TagKeys"])?;
2100
2101        let mut state = self.state.write();
2102        let tag_map = find_tags_mut(&mut state, arn)?;
2103
2104        fakecloud_core::tags::remove_tags(tag_map, &body, "TagKeys").map_err(|f| {
2105            AwsServiceError::aws_error(
2106                StatusCode::BAD_REQUEST,
2107                "ValidationException",
2108                format!("{f} must be a list"),
2109            )
2110        })?;
2111
2112        Ok(AwsResponse::ok_json(json!({})))
2113    }
2114
2115    fn list_tags_for_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2116        let body = req.json_body();
2117        validate_required("ResourceARN", &body["ResourceARN"])?;
2118        let arn = body["ResourceARN"]
2119            .as_str()
2120            .ok_or_else(|| missing("ResourceARN"))?;
2121        validate_string_length("resourceARN", arn, 1, 1600)?;
2122
2123        let state = self.state.read();
2124        let tag_map = find_tags(&state, arn)?;
2125
2126        let tags = fakecloud_core::tags::tags_to_json(tag_map, "Key", "Value");
2127
2128        Ok(AwsResponse::ok_json(json!({ "Tags": tags })))
2129    }
2130
2131    // ─── Archive Operations ─────────────────────────────────────────────
2132
2133    fn create_archive(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2134        let body = req.json_body();
2135        validate_required("ArchiveName", &body["ArchiveName"])?;
2136        let name = body["ArchiveName"]
2137            .as_str()
2138            .ok_or_else(|| missing("ArchiveName"))?
2139            .to_string();
2140        validate_string_length("archiveName", &name, 1, 48)?;
2141        validate_required("EventSourceArn", &body["EventSourceArn"])?;
2142        let event_source_arn = body["EventSourceArn"]
2143            .as_str()
2144            .ok_or_else(|| missing("EventSourceArn"))?
2145            .to_string();
2146        validate_string_length("eventSourceArn", &event_source_arn, 1, 1600)?;
2147        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2148        validate_optional_string_length("eventPattern", body["EventPattern"].as_str(), 0, 4096)?;
2149        if let Some(rd) = body["RetentionDays"].as_i64() {
2150            validate_range_i64("retentionDays", rd, 0, i64::MAX)?;
2151        }
2152        let description = body["Description"].as_str().map(|s| s.to_string());
2153        let event_pattern = body["EventPattern"].as_str().map(|s| s.to_string());
2154        let retention_days = body["RetentionDays"].as_i64().unwrap_or(0);
2155
2156        // Validate event pattern if provided
2157        if let Some(ref pattern) = event_pattern {
2158            validate_event_pattern(pattern)?;
2159        }
2160
2161        let mut state = self.state.write();
2162
2163        // Validate event bus exists
2164        let bus_name = state.resolve_bus_name(&event_source_arn);
2165        if !state.buses.contains_key(&bus_name) {
2166            return Err(AwsServiceError::aws_error(
2167                StatusCode::BAD_REQUEST,
2168                "ResourceNotFoundException",
2169                format!("Event bus {bus_name} does not exist."),
2170            ));
2171        }
2172
2173        // Check duplicate
2174        if state.archives.contains_key(&name) {
2175            return Err(AwsServiceError::aws_error(
2176                StatusCode::BAD_REQUEST,
2177                "ResourceAlreadyExistsException",
2178                format!("Archive {name} already exists."),
2179            ));
2180        }
2181
2182        let now = Utc::now();
2183        let arn = format!(
2184            "arn:aws:events:{}:{}:archive/{}",
2185            req.region, state.account_id, name
2186        );
2187
2188        let archive = Archive {
2189            name: name.clone(),
2190            arn: arn.clone(),
2191            event_source_arn: event_source_arn.clone(),
2192            description,
2193            event_pattern: event_pattern.clone(),
2194            retention_days,
2195            state: "ENABLED".to_string(),
2196            creation_time: now,
2197            event_count: 0,
2198            size_bytes: 0,
2199            events: Vec::new(),
2200        };
2201        state.archives.insert(name.clone(), archive);
2202
2203        // Create the archive rule
2204        let rule_name = format!("Events-Archive-{name}");
2205        let rule_arn = format!(
2206            "arn:aws:events:{}:{}:rule/{}",
2207            req.region, state.account_id, rule_name
2208        );
2209        // Merge archive event pattern with replay-name filter
2210        let rule_event_pattern = {
2211            let mut merged = if let Some(ref ep) = event_pattern {
2212                serde_json::from_str::<Value>(ep).unwrap_or_else(|_| json!({}))
2213            } else {
2214                json!({})
2215            };
2216            if let Some(obj) = merged.as_object_mut() {
2217                obj.insert("replay-name".to_string(), json!([{"exists": false}]));
2218            }
2219            serde_json::to_string(&merged).unwrap_or_default()
2220        };
2221
2222        // Build the archive target with InputTransformer
2223        let archive_target = EventTarget {
2224            id: name.clone(),
2225            arn: format!("arn:aws:events:{}:::", req.region),
2226            input: None,
2227            input_path: None,
2228            input_transformer: Some(json!({
2229                "InputPathsMap": {},
2230                "InputTemplate": format!(
2231                    "{{\"archive-arn\": \"{}\", \"event\": <aws.events.event.json>, \"ingestion-time\": <aws.events.event.ingestion-time>}}",
2232                    arn
2233                )
2234            })),
2235            sqs_parameters: None,
2236        };
2237
2238        let archive_rule = EventRule {
2239            name: rule_name.clone(),
2240            arn: rule_arn,
2241            event_bus_name: bus_name.clone(),
2242            event_pattern: Some(rule_event_pattern),
2243            schedule_expression: None,
2244            state: "ENABLED".to_string(),
2245            description: None,
2246            role_arn: None,
2247            managed_by: Some("prod.vhs.events.aws.internal".to_string()),
2248            created_by: Some(state.account_id.clone()),
2249            targets: vec![archive_target],
2250            tags: HashMap::new(),
2251            last_fired: None,
2252        };
2253        let key = (bus_name, rule_name);
2254        state.rules.insert(key, archive_rule);
2255
2256        Ok(AwsResponse::ok_json(json!({
2257            "ArchiveArn": arn,
2258            "CreationTime": now.timestamp() as f64,
2259            "State": "ENABLED",
2260        })))
2261    }
2262
2263    fn describe_archive(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2264        let body = req.json_body();
2265        validate_required("ArchiveName", &body["ArchiveName"])?;
2266        let name = body["ArchiveName"]
2267            .as_str()
2268            .ok_or_else(|| missing("ArchiveName"))?;
2269        validate_string_length("archiveName", name, 1, 48)?;
2270
2271        let state = self.state.read();
2272        let archive = state.archives.get(name).ok_or_else(|| {
2273            AwsServiceError::aws_error(
2274                StatusCode::BAD_REQUEST,
2275                "ResourceNotFoundException",
2276                format!("Archive {name} does not exist."),
2277            )
2278        })?;
2279
2280        let mut resp = json!({
2281            "ArchiveArn": archive.arn,
2282            "ArchiveName": archive.name,
2283            "CreationTime": archive.creation_time.timestamp() as f64,
2284            "EventCount": archive.event_count,
2285            "EventSourceArn": archive.event_source_arn,
2286            "RetentionDays": archive.retention_days,
2287            "SizeBytes": archive.size_bytes,
2288            "State": archive.state,
2289        });
2290        if let Some(ref desc) = archive.description {
2291            resp["Description"] = json!(desc);
2292        }
2293        if let Some(ref ep) = archive.event_pattern {
2294            resp["EventPattern"] = json!(ep);
2295        }
2296
2297        Ok(AwsResponse::ok_json(resp))
2298    }
2299
2300    fn list_archives(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2301        let body = req.json_body();
2302        validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 48)?;
2303        validate_optional_string_length(
2304            "eventSourceArn",
2305            body["EventSourceArn"].as_str(),
2306            1,
2307            1600,
2308        )?;
2309        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
2310        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
2311        let name_prefix = body["NamePrefix"].as_str();
2312        let source_arn = body["EventSourceArn"].as_str();
2313        let archive_state = body["State"].as_str();
2314
2315        // Validate at most one filter
2316        let filter_count = [
2317            name_prefix.is_some(),
2318            source_arn.is_some(),
2319            archive_state.is_some(),
2320        ]
2321        .iter()
2322        .filter(|&&x| x)
2323        .count();
2324        if filter_count > 1 {
2325            return Err(AwsServiceError::aws_error(
2326                StatusCode::BAD_REQUEST,
2327                "ValidationException",
2328                "At most one filter is allowed for ListArchives. Use either : State, EventSourceArn, or NamePrefix.",
2329            ));
2330        }
2331
2332        // Validate state
2333        if let Some(s) = archive_state {
2334            let valid = [
2335                "ENABLED",
2336                "DISABLED",
2337                "CREATING",
2338                "UPDATING",
2339                "CREATE_FAILED",
2340                "UPDATE_FAILED",
2341            ];
2342            if !valid.contains(&s) {
2343                return Err(AwsServiceError::aws_error(
2344                    StatusCode::BAD_REQUEST,
2345                    "ValidationException",
2346                    format!(
2347                        "1 validation error detected: Value '{}' at 'state' failed to satisfy constraint: Member must satisfy enum value set: [ENABLED, DISABLED, CREATING, UPDATING, CREATE_FAILED, UPDATE_FAILED]",
2348                        s
2349                    ),
2350                ));
2351            }
2352        }
2353
2354        let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
2355
2356        let state = self.state.read();
2357        let all: Vec<Value> = state
2358            .archives
2359            .values()
2360            .filter(|a| {
2361                if let Some(prefix) = name_prefix {
2362                    a.name.starts_with(prefix)
2363                } else if let Some(arn) = source_arn {
2364                    a.event_source_arn == arn
2365                } else if let Some(s) = archive_state {
2366                    a.state == s
2367                } else {
2368                    true
2369                }
2370            })
2371            .map(|a| {
2372                json!({
2373                    "ArchiveName": a.name,
2374                    "CreationTime": a.creation_time.timestamp() as f64,
2375                    "EventCount": a.event_count,
2376                    "EventSourceArn": a.event_source_arn,
2377                    "RetentionDays": a.retention_days,
2378                    "SizeBytes": a.size_bytes,
2379                    "State": a.state,
2380                })
2381            })
2382            .collect();
2383
2384        let (archives, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
2385        let mut resp = json!({ "Archives": archives });
2386        if let Some(token) = next_token {
2387            resp["NextToken"] = json!(token);
2388        }
2389
2390        Ok(AwsResponse::ok_json(resp))
2391    }
2392
2393    fn update_archive(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2394        let body = req.json_body();
2395        validate_required("ArchiveName", &body["ArchiveName"])?;
2396        let name = body["ArchiveName"]
2397            .as_str()
2398            .ok_or_else(|| missing("ArchiveName"))?;
2399        validate_string_length("archiveName", name, 1, 48)?;
2400        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2401        validate_optional_string_length("eventPattern", body["EventPattern"].as_str(), 0, 4096)?;
2402        if let Some(rd) = body["RetentionDays"].as_i64() {
2403            validate_range_i64("retentionDays", rd, 0, i64::MAX)?;
2404        }
2405
2406        // Validate event pattern if provided
2407        if let Some(pattern) = body["EventPattern"].as_str() {
2408            validate_event_pattern(pattern)?;
2409        }
2410
2411        let mut state = self.state.write();
2412        let archive = state.archives.get_mut(name).ok_or_else(|| {
2413            AwsServiceError::aws_error(
2414                StatusCode::BAD_REQUEST,
2415                "ResourceNotFoundException",
2416                format!("Archive {name} does not exist."),
2417            )
2418        })?;
2419
2420        if let Some(desc) = body["Description"].as_str() {
2421            archive.description = Some(desc.to_string());
2422        }
2423        if let Some(pattern) = body["EventPattern"].as_str() {
2424            archive.event_pattern = Some(pattern.to_string());
2425        }
2426        if let Some(days) = body["RetentionDays"].as_i64() {
2427            archive.retention_days = days;
2428        }
2429
2430        Ok(AwsResponse::ok_json(json!({
2431            "ArchiveArn": archive.arn,
2432            "CreationTime": archive.creation_time.timestamp() as f64,
2433            "State": archive.state,
2434        })))
2435    }
2436
2437    fn delete_archive(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2438        let body = req.json_body();
2439        validate_required("ArchiveName", &body["ArchiveName"])?;
2440        let name = body["ArchiveName"]
2441            .as_str()
2442            .ok_or_else(|| missing("ArchiveName"))?;
2443        validate_string_length("archiveName", name, 1, 48)?;
2444
2445        let mut state = self.state.write();
2446        if !state.archives.contains_key(name) {
2447            return Err(AwsServiceError::aws_error(
2448                StatusCode::BAD_REQUEST,
2449                "ResourceNotFoundException",
2450                format!("Archive {name} does not exist."),
2451            ));
2452        }
2453
2454        state.archives.remove(name);
2455
2456        // Remove the archive rule
2457        let rule_name = format!("Events-Archive-{name}");
2458        state.rules.retain(|k, _| k.1 != rule_name);
2459
2460        Ok(AwsResponse::ok_json(json!({})))
2461    }
2462
2463    // ─── Connection Operations ──────────────────────────────────────────
2464
2465    fn create_connection(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2466        let body = req.json_body();
2467        validate_required("Name", &body["Name"])?;
2468        let name = body["Name"]
2469            .as_str()
2470            .ok_or_else(|| missing("Name"))?
2471            .to_string();
2472        validate_string_length("name", &name, 1, 64)?;
2473        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2474        validate_required("AuthorizationType", &body["AuthorizationType"])?;
2475        let description = body["Description"].as_str().map(|s| s.to_string());
2476        let auth_type = body["AuthorizationType"]
2477            .as_str()
2478            .ok_or_else(|| missing("AuthorizationType"))?
2479            .to_string();
2480        validate_enum(
2481            "authorizationType",
2482            &auth_type,
2483            &["BASIC", "OAUTH_CLIENT_CREDENTIALS", "API_KEY"],
2484        )?;
2485        validate_optional_string_length(
2486            "kmsKeyIdentifier",
2487            body["KmsKeyIdentifier"].as_str(),
2488            0,
2489            2048,
2490        )?;
2491        validate_required("AuthParameters", &body["AuthParameters"])?;
2492        let auth_params = body["AuthParameters"].clone();
2493
2494        let mut state = self.state.write();
2495        let now = Utc::now();
2496        let conn_uuid = uuid::Uuid::new_v4();
2497        let arn = format!(
2498            "arn:aws:events:{}:{}:connection/{}/{}",
2499            req.region, state.account_id, name, conn_uuid
2500        );
2501        let secret_arn = format!(
2502            "arn:aws:secretsmanager:{}:{}:secret:events!connection/{}/{}",
2503            req.region, state.account_id, name, conn_uuid
2504        );
2505
2506        let conn = Connection {
2507            name: name.clone(),
2508            arn: arn.clone(),
2509            description,
2510            authorization_type: auth_type.clone(),
2511            auth_parameters: auth_params,
2512            connection_state: "AUTHORIZED".to_string(),
2513            secret_arn: secret_arn.clone(),
2514            creation_time: now,
2515            last_modified_time: now,
2516            last_authorized_time: now,
2517        };
2518        state.connections.insert(name, conn);
2519
2520        Ok(AwsResponse::ok_json(json!({
2521            "ConnectionArn": arn,
2522            "ConnectionState": "AUTHORIZED",
2523            "CreationTime": now.timestamp() as f64,
2524            "LastModifiedTime": now.timestamp() as f64,
2525        })))
2526    }
2527
2528    fn describe_connection(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2529        let body = req.json_body();
2530        validate_required("Name", &body["Name"])?;
2531        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
2532        validate_string_length("name", name, 1, 64)?;
2533
2534        let state = self.state.read();
2535        let conn = state.connections.get(name).ok_or_else(|| {
2536            AwsServiceError::aws_error(
2537                StatusCode::BAD_REQUEST,
2538                "ResourceNotFoundException",
2539                format!("Connection '{name}' does not exist."),
2540            )
2541        })?;
2542
2543        // Build auth parameters response - strip secrets
2544        let auth_params_response =
2545            build_auth_params_response(&conn.authorization_type, &conn.auth_parameters);
2546
2547        let mut resp = json!({
2548            "ConnectionArn": conn.arn,
2549            "Name": conn.name,
2550            "AuthorizationType": conn.authorization_type,
2551            "AuthParameters": auth_params_response,
2552            "ConnectionState": conn.connection_state,
2553            "SecretArn": conn.secret_arn,
2554            "CreationTime": conn.creation_time.timestamp() as f64,
2555            "LastModifiedTime": conn.last_modified_time.timestamp() as f64,
2556            "LastAuthorizedTime": conn.last_authorized_time.timestamp() as f64,
2557        });
2558        if let Some(ref desc) = conn.description {
2559            resp["Description"] = json!(desc);
2560        }
2561
2562        Ok(AwsResponse::ok_json(resp))
2563    }
2564
2565    fn list_connections(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2566        let body = req.json_body();
2567        validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 64)?;
2568        validate_optional_enum(
2569            "connectionState",
2570            body["ConnectionState"].as_str(),
2571            &[
2572                "CREATING",
2573                "UPDATING",
2574                "DELETING",
2575                "AUTHORIZED",
2576                "DEAUTHORIZED",
2577                "AUTHORIZING",
2578                "DEAUTHORIZING",
2579                "ACTIVE",
2580                "FAILED_CONNECTIVITY",
2581            ],
2582        )?;
2583        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
2584        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
2585
2586        let name_prefix = body["NamePrefix"].as_str();
2587        let connection_state = body["ConnectionState"].as_str();
2588        let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
2589
2590        let state = self.state.read();
2591        let all: Vec<Value> = state
2592            .connections
2593            .values()
2594            .filter(|c| {
2595                if let Some(prefix) = name_prefix {
2596                    if !c.name.starts_with(prefix) {
2597                        return false;
2598                    }
2599                }
2600                if let Some(cs) = connection_state {
2601                    if c.connection_state != cs {
2602                        return false;
2603                    }
2604                }
2605                true
2606            })
2607            .map(|c| {
2608                json!({
2609                    "ConnectionArn": c.arn,
2610                    "Name": c.name,
2611                    "AuthorizationType": c.authorization_type,
2612                    "ConnectionState": c.connection_state,
2613                    "CreationTime": c.creation_time.timestamp() as f64,
2614                    "LastModifiedTime": c.last_modified_time.timestamp() as f64,
2615                    "LastAuthorizedTime": c.last_authorized_time.timestamp() as f64,
2616                })
2617            })
2618            .collect();
2619
2620        let (conns, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
2621        let mut resp = json!({ "Connections": conns });
2622        if let Some(token) = next_token {
2623            resp["NextToken"] = json!(token);
2624        }
2625
2626        Ok(AwsResponse::ok_json(resp))
2627    }
2628
2629    fn update_connection(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2630        let body = req.json_body();
2631        validate_required("Name", &body["Name"])?;
2632        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
2633        validate_string_length("name", name, 1, 64)?;
2634        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2635        validate_optional_enum(
2636            "authorizationType",
2637            body["AuthorizationType"].as_str(),
2638            &["BASIC", "OAUTH_CLIENT_CREDENTIALS", "API_KEY"],
2639        )?;
2640
2641        let mut state = self.state.write();
2642        let conn = state.connections.get_mut(name).ok_or_else(|| {
2643            AwsServiceError::aws_error(
2644                StatusCode::BAD_REQUEST,
2645                "ResourceNotFoundException",
2646                format!("Connection '{name}' does not exist."),
2647            )
2648        })?;
2649
2650        if let Some(desc) = body["Description"].as_str() {
2651            conn.description = Some(desc.to_string());
2652        }
2653        if let Some(auth_type) = body["AuthorizationType"].as_str() {
2654            conn.authorization_type = auth_type.to_string();
2655        }
2656        if body.get("AuthParameters").is_some() {
2657            conn.auth_parameters = body["AuthParameters"].clone();
2658        }
2659        conn.last_modified_time = Utc::now();
2660
2661        Ok(AwsResponse::ok_json(json!({
2662            "ConnectionArn": conn.arn,
2663            "ConnectionState": conn.connection_state,
2664            "CreationTime": conn.creation_time.timestamp() as f64,
2665            "LastModifiedTime": conn.last_modified_time.timestamp() as f64,
2666            "LastAuthorizedTime": conn.last_authorized_time.timestamp() as f64,
2667        })))
2668    }
2669
2670    fn delete_connection(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2671        let body = req.json_body();
2672        validate_required("Name", &body["Name"])?;
2673        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
2674        validate_string_length("name", name, 1, 64)?;
2675
2676        let mut state = self.state.write();
2677        let conn = state.connections.remove(name).ok_or_else(|| {
2678            AwsServiceError::aws_error(
2679                StatusCode::BAD_REQUEST,
2680                "ResourceNotFoundException",
2681                format!("Connection '{name}' does not exist."),
2682            )
2683        })?;
2684
2685        Ok(AwsResponse::ok_json(json!({
2686            "ConnectionArn": conn.arn,
2687            "ConnectionState": conn.connection_state,
2688            "CreationTime": conn.creation_time.timestamp() as f64,
2689            "LastModifiedTime": conn.last_modified_time.timestamp() as f64,
2690            "LastAuthorizedTime": conn.last_authorized_time.timestamp() as f64,
2691        })))
2692    }
2693
2694    // ─── API Destination Operations ─────────────────────────────────────
2695
2696    fn create_api_destination(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2697        let body = req.json_body();
2698        validate_required("Name", &body["Name"])?;
2699        let name = body["Name"]
2700            .as_str()
2701            .ok_or_else(|| missing("Name"))?
2702            .to_string();
2703        validate_string_length("name", &name, 1, 64)?;
2704        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2705        validate_required("ConnectionArn", &body["ConnectionArn"])?;
2706        let description = body["Description"].as_str().map(|s| s.to_string());
2707        let connection_arn = body["ConnectionArn"]
2708            .as_str()
2709            .ok_or_else(|| missing("ConnectionArn"))?
2710            .to_string();
2711        validate_string_length("connectionArn", &connection_arn, 1, 1600)?;
2712        validate_required("InvocationEndpoint", &body["InvocationEndpoint"])?;
2713        let endpoint = body["InvocationEndpoint"]
2714            .as_str()
2715            .ok_or_else(|| missing("InvocationEndpoint"))?
2716            .to_string();
2717        validate_string_length("invocationEndpoint", &endpoint, 1, 2048)?;
2718        validate_required("HttpMethod", &body["HttpMethod"])?;
2719        let http_method = body["HttpMethod"]
2720            .as_str()
2721            .ok_or_else(|| missing("HttpMethod"))?
2722            .to_string();
2723        validate_enum(
2724            "httpMethod",
2725            &http_method,
2726            &["POST", "GET", "HEAD", "OPTIONS", "PUT", "PATCH", "DELETE"],
2727        )?;
2728        let rate_limit = body["InvocationRateLimitPerSecond"].as_i64();
2729        if let Some(r) = rate_limit {
2730            validate_range_i64("invocationRateLimitPerSecond", r, 1, i64::MAX)?;
2731        }
2732
2733        let mut state = self.state.write();
2734        let now = Utc::now();
2735        let dest_uuid = uuid::Uuid::new_v4();
2736        let arn = format!(
2737            "arn:aws:events:{}:{}:api-destination/{}/{}",
2738            req.region, state.account_id, name, dest_uuid
2739        );
2740
2741        let dest = ApiDestination {
2742            name: name.clone(),
2743            arn: arn.clone(),
2744            description,
2745            connection_arn,
2746            invocation_endpoint: endpoint,
2747            http_method,
2748            invocation_rate_limit_per_second: rate_limit,
2749            state: "ACTIVE".to_string(),
2750            creation_time: now,
2751            last_modified_time: now,
2752        };
2753        state.api_destinations.insert(name, dest);
2754
2755        Ok(AwsResponse::ok_json(json!({
2756            "ApiDestinationArn": arn,
2757            "ApiDestinationState": "ACTIVE",
2758            "CreationTime": now.timestamp() as f64,
2759            "LastModifiedTime": now.timestamp() as f64,
2760        })))
2761    }
2762
2763    fn describe_api_destination(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2764        let body = req.json_body();
2765        validate_required("Name", &body["Name"])?;
2766        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
2767        validate_string_length("name", name, 1, 64)?;
2768
2769        let state = self.state.read();
2770        let dest = state.api_destinations.get(name).ok_or_else(|| {
2771            AwsServiceError::aws_error(
2772                StatusCode::BAD_REQUEST,
2773                "ResourceNotFoundException",
2774                format!("An api-destination '{name}' does not exist."),
2775            )
2776        })?;
2777
2778        let mut resp = json!({
2779            "ApiDestinationArn": dest.arn,
2780            "Name": dest.name,
2781            "ConnectionArn": dest.connection_arn,
2782            "InvocationEndpoint": dest.invocation_endpoint,
2783            "HttpMethod": dest.http_method,
2784            "ApiDestinationState": dest.state,
2785            "CreationTime": dest.creation_time.timestamp() as f64,
2786            "LastModifiedTime": dest.last_modified_time.timestamp() as f64,
2787        });
2788        if let Some(ref desc) = dest.description {
2789            resp["Description"] = json!(desc);
2790        }
2791        if let Some(rate) = dest.invocation_rate_limit_per_second {
2792            resp["InvocationRateLimitPerSecond"] = json!(rate);
2793        }
2794
2795        Ok(AwsResponse::ok_json(resp))
2796    }
2797
2798    fn list_api_destinations(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2799        let body = req.json_body();
2800        validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 64)?;
2801        validate_optional_string_length("connectionArn", body["ConnectionArn"].as_str(), 1, 1600)?;
2802        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
2803        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
2804
2805        let name_prefix = body["NamePrefix"].as_str();
2806        let connection_arn = body["ConnectionArn"].as_str();
2807        let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
2808
2809        let state = self.state.read();
2810        let all: Vec<Value> = state
2811            .api_destinations
2812            .values()
2813            .filter(|d| {
2814                if let Some(prefix) = name_prefix {
2815                    if !d.name.starts_with(prefix) {
2816                        return false;
2817                    }
2818                }
2819                if let Some(arn) = connection_arn {
2820                    if d.connection_arn != arn {
2821                        return false;
2822                    }
2823                }
2824                true
2825            })
2826            .map(|d| {
2827                let mut obj = json!({
2828                    "ApiDestinationArn": d.arn,
2829                    "Name": d.name,
2830                    "ConnectionArn": d.connection_arn,
2831                    "InvocationEndpoint": d.invocation_endpoint,
2832                    "HttpMethod": d.http_method,
2833                    "ApiDestinationState": d.state,
2834                    "CreationTime": d.creation_time.timestamp() as f64,
2835                    "LastModifiedTime": d.last_modified_time.timestamp() as f64,
2836                });
2837                if let Some(rate) = d.invocation_rate_limit_per_second {
2838                    obj["InvocationRateLimitPerSecond"] = json!(rate);
2839                }
2840                obj
2841            })
2842            .collect();
2843
2844        let (dests, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
2845        let mut resp = json!({ "ApiDestinations": dests });
2846        if let Some(token) = next_token {
2847            resp["NextToken"] = json!(token);
2848        }
2849
2850        Ok(AwsResponse::ok_json(resp))
2851    }
2852
2853    fn update_api_destination(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2854        let body = req.json_body();
2855        validate_required("Name", &body["Name"])?;
2856        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
2857        validate_string_length("name", name, 1, 64)?;
2858        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2859        validate_optional_string_length("connectionArn", body["ConnectionArn"].as_str(), 1, 1600)?;
2860        validate_optional_string_length(
2861            "invocationEndpoint",
2862            body["InvocationEndpoint"].as_str(),
2863            1,
2864            2048,
2865        )?;
2866        validate_optional_enum(
2867            "httpMethod",
2868            body["HttpMethod"].as_str(),
2869            &["POST", "GET", "HEAD", "OPTIONS", "PUT", "PATCH", "DELETE"],
2870        )?;
2871        if let Some(r) = body["InvocationRateLimitPerSecond"].as_i64() {
2872            validate_range_i64("invocationRateLimitPerSecond", r, 1, i64::MAX)?;
2873        }
2874
2875        let mut state = self.state.write();
2876        let dest = state.api_destinations.get_mut(name).ok_or_else(|| {
2877            AwsServiceError::aws_error(
2878                StatusCode::BAD_REQUEST,
2879                "ResourceNotFoundException",
2880                format!("An api-destination '{name}' does not exist."),
2881            )
2882        })?;
2883
2884        if let Some(desc) = body["Description"].as_str() {
2885            dest.description = Some(desc.to_string());
2886        }
2887        if let Some(endpoint) = body["InvocationEndpoint"].as_str() {
2888            dest.invocation_endpoint = endpoint.to_string();
2889        }
2890        if let Some(method) = body["HttpMethod"].as_str() {
2891            dest.http_method = method.to_string();
2892        }
2893        if let Some(rate) = body["InvocationRateLimitPerSecond"].as_i64() {
2894            dest.invocation_rate_limit_per_second = Some(rate);
2895        }
2896        if let Some(conn) = body["ConnectionArn"].as_str() {
2897            dest.connection_arn = conn.to_string();
2898        }
2899        dest.last_modified_time = Utc::now();
2900
2901        Ok(AwsResponse::ok_json(json!({
2902            "ApiDestinationArn": dest.arn,
2903            "ApiDestinationState": dest.state,
2904            "CreationTime": dest.creation_time.timestamp() as f64,
2905            "LastModifiedTime": dest.last_modified_time.timestamp() as f64,
2906        })))
2907    }
2908
2909    fn delete_api_destination(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2910        let body = req.json_body();
2911        validate_required("Name", &body["Name"])?;
2912        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
2913        validate_string_length("name", name, 1, 64)?;
2914
2915        let mut state = self.state.write();
2916        if !state.api_destinations.contains_key(name) {
2917            return Err(AwsServiceError::aws_error(
2918                StatusCode::BAD_REQUEST,
2919                "ResourceNotFoundException",
2920                format!("An api-destination '{name}' does not exist."),
2921            ));
2922        }
2923        state.api_destinations.remove(name);
2924
2925        Ok(AwsResponse::ok_json(json!({})))
2926    }
2927
2928    // ─── Replay Operations ──────────────────────────────────────────────
2929
2930    fn start_replay(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2931        let input = StartReplayInput::from_body(&req.json_body())?;
2932
2933        let mut state = self.state.write();
2934
2935        // Validate event bus + archive, in the order the real service validates them.
2936        let bus_name = state.resolve_bus_name(&input.destination_arn);
2937        if !state.buses.contains_key(&bus_name) {
2938            return Err(AwsServiceError::aws_error(
2939                StatusCode::BAD_REQUEST,
2940                "ResourceNotFoundException",
2941                format!("Event bus {bus_name} does not exist."),
2942            ));
2943        }
2944
2945        let archive_name = input
2946            .event_source_arn
2947            .rsplit_once("archive/")
2948            .map(|(_, n)| n.to_string())
2949            .unwrap_or_default();
2950        let archive = state.archives.get(&archive_name).ok_or_else(|| {
2951            AwsServiceError::aws_error(
2952                StatusCode::BAD_REQUEST,
2953                "ValidationException",
2954                format!(
2955                    "Parameter EventSourceArn is not valid. Reason: Archive {archive_name} does not exist."
2956                ),
2957            )
2958        })?;
2959        let archive_bus = state.resolve_bus_name(&archive.event_source_arn);
2960        if archive_bus != bus_name {
2961            return Err(AwsServiceError::aws_error(
2962                StatusCode::BAD_REQUEST,
2963                "ValidationException",
2964                "Parameter Destination.Arn is not valid. Reason: Cross event bus replay is not permitted.",
2965            ));
2966        }
2967
2968        if input.event_end_time <= input.event_start_time {
2969            return Err(AwsServiceError::aws_error(
2970                StatusCode::BAD_REQUEST,
2971                "ValidationException",
2972                "Parameter EventEndTime is not valid. Reason: EventStartTime must be before EventEndTime.",
2973            ));
2974        }
2975
2976        if state.replays.contains_key(&input.name) {
2977            return Err(AwsServiceError::aws_error(
2978                StatusCode::BAD_REQUEST,
2979                "ResourceAlreadyExistsException",
2980                format!("Replay {} already exists.", input.name),
2981            ));
2982        }
2983
2984        let now = Utc::now();
2985        let arn = format!(
2986            "arn:aws:events:{}:{}:replay/{}",
2987            req.region, state.account_id, input.name
2988        );
2989
2990        let events_to_deliver = collect_replay_events_with_targets(
2991            &state,
2992            &archive_name,
2993            &bus_name,
2994            input.event_start_time,
2995            input.event_end_time,
2996            &req.account_id,
2997            &req.region,
2998        );
2999
3000        let replay = Replay {
3001            name: input.name.clone(),
3002            arn: arn.clone(),
3003            description: input.description,
3004            event_source_arn: input.event_source_arn,
3005            destination: input.destination,
3006            event_start_time: input.event_start_time,
3007            event_end_time: input.event_end_time,
3008            state: "COMPLETED".to_string(),
3009            replay_start_time: now,
3010            replay_end_time: Some(now),
3011        };
3012        state.replays.insert(input.name, replay);
3013
3014        drop(state);
3015
3016        for (event, targets) in events_to_deliver {
3017            let detail_value: Value = serde_json::from_str(&event.detail).unwrap_or(json!({}));
3018            let event_json = json!({
3019                "version": "0",
3020                "id": event.event_id,
3021                "source": event.source,
3022                "account": req.account_id,
3023                "detail-type": event.detail_type,
3024                "detail": detail_value,
3025                "time": event.time.format("%Y-%m-%dT%H:%M:%SZ").to_string(),
3026                "region": req.region,
3027                "resources": event.resources,
3028                "replay-name": arn,
3029            });
3030            let event_str = event_json.to_string();
3031
3032            for target in targets {
3033                self.deliver_replay_event_to_target(&target, &event, &event_json, &event_str);
3034            }
3035        }
3036
3037        Ok(AwsResponse::ok_json(json!({
3038            "ReplayArn": arn,
3039            "ReplayStartTime": now.timestamp() as f64,
3040            "State": "STARTING",
3041        })))
3042    }
3043
3044    fn deliver_replay_event_to_target(
3045        &self,
3046        target: &EventTarget,
3047        event: &PutEvent,
3048        event_json: &Value,
3049        event_str: &str,
3050    ) {
3051        let target_arn = &target.arn;
3052        let body_str = if let Some(ref transformer) = target.input_transformer {
3053            apply_input_transformer(transformer, event_json)
3054        } else if let Some(ref input) = target.input {
3055            input.clone()
3056        } else if let Some(ref input_path) = target.input_path {
3057            resolve_json_path(event_json, input_path)
3058                .map(|v| v.to_string())
3059                .unwrap_or_else(|| event_str.to_string())
3060        } else {
3061            event_str.to_string()
3062        };
3063
3064        if target_arn.contains(":sqs:") {
3065            let group_id = target
3066                .sqs_parameters
3067                .as_ref()
3068                .and_then(|p| p["MessageGroupId"].as_str())
3069                .map(|s| s.to_string());
3070            if group_id.is_some() {
3071                self.delivery.send_to_sqs_with_attrs(
3072                    target_arn,
3073                    &body_str,
3074                    &HashMap::new(),
3075                    group_id.as_deref(),
3076                    None,
3077                );
3078            } else {
3079                self.delivery
3080                    .send_to_sqs(target_arn, &body_str, &HashMap::new());
3081            }
3082        } else if target_arn.contains(":sns:") {
3083            self.delivery
3084                .publish_to_sns(target_arn, &body_str, Some(&event.detail_type));
3085        } else if target_arn.contains(":lambda:") {
3086            let mut state = self.state.write();
3087            state
3088                .lambda_invocations
3089                .push(crate::state::LambdaInvocation {
3090                    function_arn: target_arn.clone(),
3091                    payload: body_str.clone(),
3092                    timestamp: Utc::now(),
3093                });
3094            drop(state);
3095            if let Some(ref ls) = self.lambda_state {
3096                ls.write().invocations.push(LambdaInvocation {
3097                    function_arn: target_arn.clone(),
3098                    payload: body_str.clone(),
3099                    timestamp: Utc::now(),
3100                    source: "aws:events".to_string(),
3101                });
3102            }
3103            invoke_lambda_async(
3104                &self.container_runtime,
3105                &self.lambda_state,
3106                target_arn,
3107                &body_str,
3108            );
3109        } else if target_arn.contains(":logs:") {
3110            let mut state = self.state.write();
3111            state.log_deliveries.push(crate::state::LogDelivery {
3112                log_group_arn: target_arn.clone(),
3113                payload: body_str.clone(),
3114                timestamp: Utc::now(),
3115            });
3116            drop(state);
3117            if let Some(ref log_state) = self.logs_state {
3118                deliver_to_logs(log_state, target_arn, &body_str, Utc::now());
3119            }
3120        } else if target_arn.contains(":states:") {
3121            self.delivery
3122                .start_stepfunctions_execution(target_arn, &body_str);
3123            let mut state = self.state.write();
3124            state
3125                .step_function_executions
3126                .push(crate::state::StepFunctionExecution {
3127                    state_machine_arn: target_arn.clone(),
3128                    payload: body_str.clone(),
3129                    timestamp: Utc::now(),
3130                });
3131        } else if target_arn.starts_with("https://") || target_arn.starts_with("http://") {
3132            let url = target_arn.clone();
3133            let payload = body_str.clone();
3134            tokio::spawn(async move {
3135                let client = reqwest::Client::new();
3136                let _ = client
3137                    .post(&url)
3138                    .header("Content-Type", "application/json")
3139                    .body(payload)
3140                    .send()
3141                    .await;
3142            });
3143        }
3144    }
3145
3146    fn describe_replay(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3147        let body = req.json_body();
3148        validate_required("ReplayName", &body["ReplayName"])?;
3149        let name = body["ReplayName"]
3150            .as_str()
3151            .ok_or_else(|| missing("ReplayName"))?;
3152        validate_string_length("replayName", name, 1, 64)?;
3153
3154        let state = self.state.read();
3155        let replay = state.replays.get(name).ok_or_else(|| {
3156            AwsServiceError::aws_error(
3157                StatusCode::BAD_REQUEST,
3158                "ResourceNotFoundException",
3159                format!("Replay {name} does not exist."),
3160            )
3161        })?;
3162
3163        let mut resp = json!({
3164            "Destination": replay.destination,
3165            "EventSourceArn": replay.event_source_arn,
3166            "EventStartTime": replay.event_start_time.timestamp() as f64,
3167            "EventEndTime": replay.event_end_time.timestamp() as f64,
3168            "ReplayArn": replay.arn,
3169            "ReplayName": replay.name,
3170            "ReplayStartTime": replay.replay_start_time.timestamp() as f64,
3171            "State": replay.state,
3172        });
3173        if let Some(ref desc) = replay.description {
3174            resp["Description"] = json!(desc);
3175        }
3176        if let Some(ref end) = replay.replay_end_time {
3177            resp["ReplayEndTime"] = json!(end.timestamp() as f64);
3178        }
3179
3180        Ok(AwsResponse::ok_json(resp))
3181    }
3182
3183    fn list_replays(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3184        let body = req.json_body();
3185        validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 64)?;
3186        validate_optional_string_length(
3187            "eventSourceArn",
3188            body["EventSourceArn"].as_str(),
3189            1,
3190            1600,
3191        )?;
3192        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
3193        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
3194        let name_prefix = body["NamePrefix"].as_str();
3195        let source_arn = body["EventSourceArn"].as_str();
3196        let replay_state = body["State"].as_str();
3197
3198        // Validate at most one filter
3199        let filter_count = [
3200            name_prefix.is_some(),
3201            source_arn.is_some(),
3202            replay_state.is_some(),
3203        ]
3204        .iter()
3205        .filter(|&&x| x)
3206        .count();
3207        if filter_count > 1 {
3208            return Err(AwsServiceError::aws_error(
3209                StatusCode::BAD_REQUEST,
3210                "ValidationException",
3211                "At most one filter is allowed for ListReplays. Use either : State, EventSourceArn, or NamePrefix.",
3212            ));
3213        }
3214
3215        // Validate state
3216        if let Some(s) = replay_state {
3217            let valid = [
3218                "CANCELLED",
3219                "CANCELLING",
3220                "COMPLETED",
3221                "FAILED",
3222                "RUNNING",
3223                "STARTING",
3224            ];
3225            if !valid.contains(&s) {
3226                return Err(AwsServiceError::aws_error(
3227                    StatusCode::BAD_REQUEST,
3228                    "ValidationException",
3229                    format!(
3230                        "1 validation error detected: Value '{}' at 'state' failed to satisfy constraint: Member must satisfy enum value set: [CANCELLED, CANCELLING, COMPLETED, FAILED, RUNNING, STARTING]",
3231                        s
3232                    ),
3233                ));
3234            }
3235        }
3236
3237        let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
3238
3239        let state = self.state.read();
3240        let all: Vec<Value> = state
3241            .replays
3242            .values()
3243            .filter(|r| {
3244                if let Some(prefix) = name_prefix {
3245                    r.name.starts_with(prefix)
3246                } else if let Some(arn) = source_arn {
3247                    r.event_source_arn == arn
3248                } else if let Some(s) = replay_state {
3249                    r.state == s
3250                } else {
3251                    true
3252                }
3253            })
3254            .map(|r| {
3255                let mut obj = json!({
3256                    "EventSourceArn": r.event_source_arn,
3257                    "EventStartTime": r.event_start_time.timestamp() as f64,
3258                    "EventEndTime": r.event_end_time.timestamp() as f64,
3259                    "ReplayName": r.name,
3260                    "ReplayStartTime": r.replay_start_time.timestamp() as f64,
3261                    "State": r.state,
3262                });
3263                if let Some(ref end) = r.replay_end_time {
3264                    obj["ReplayEndTime"] = json!(end.timestamp() as f64);
3265                }
3266                obj
3267            })
3268            .collect();
3269
3270        let (replays, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
3271        let mut resp = json!({ "Replays": replays });
3272        if let Some(token) = next_token {
3273            resp["NextToken"] = json!(token);
3274        }
3275
3276        Ok(AwsResponse::ok_json(resp))
3277    }
3278
3279    fn cancel_replay(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3280        let body = req.json_body();
3281        validate_required("ReplayName", &body["ReplayName"])?;
3282        let name = body["ReplayName"]
3283            .as_str()
3284            .ok_or_else(|| missing("ReplayName"))?;
3285        validate_string_length("replayName", name, 1, 64)?;
3286
3287        let mut state = self.state.write();
3288        let replay = state.replays.get_mut(name).ok_or_else(|| {
3289            AwsServiceError::aws_error(
3290                StatusCode::BAD_REQUEST,
3291                "ResourceNotFoundException",
3292                format!("Replay {name} does not exist."),
3293            )
3294        })?;
3295
3296        // Can only cancel STARTING or RUNNING replays (or COMPLETED in our mock)
3297        if replay.state == "CANCELLED" || replay.state == "CANCELLING" {
3298            return Err(AwsServiceError::aws_error(
3299                StatusCode::BAD_REQUEST,
3300                "IllegalStatusException",
3301                format!("Replay {name} is not in a valid state for this operation."),
3302            ));
3303        }
3304
3305        let arn = replay.arn.clone();
3306        replay.state = "CANCELLED".to_string();
3307
3308        Ok(AwsResponse::ok_json(json!({
3309            "ReplayArn": arn,
3310            "State": "CANCELLING",
3311        })))
3312    }
3313}
3314
3315// ─── Tag Lookup Helpers ─────────────────────────────────────────────────
3316
3317fn find_tags_mut<'a>(
3318    state: &'a mut crate::state::EventBridgeState,
3319    arn: &str,
3320) -> Result<&'a mut HashMap<String, String>, AwsServiceError> {
3321    // Check buses
3322    for bus in state.buses.values_mut() {
3323        if bus.arn == arn {
3324            return Ok(&mut bus.tags);
3325        }
3326    }
3327    // Check rules
3328    for rule in state.rules.values_mut() {
3329        if rule.arn == arn {
3330            return Ok(&mut rule.tags);
3331        }
3332    }
3333
3334    // Parse ARN to give better error messages
3335    let error_msg = if arn.contains(":rule/") {
3336        // Extract rule name and bus from ARN
3337        let parts: Vec<&str> = arn.rsplitn(2, ":rule/").collect();
3338        if let Some(rule_path) = parts.first() {
3339            if let Some((bus, rule_name)) = rule_path.rsplit_once('/') {
3340                format!("Rule {rule_name} does not exist on EventBus {bus}.")
3341            } else {
3342                format!("Rule {} does not exist on EventBus default.", rule_path)
3343            }
3344        } else {
3345            format!("Resource {arn} not found.")
3346        }
3347    } else {
3348        format!("Resource {arn} not found.")
3349    };
3350
3351    Err(AwsServiceError::aws_error(
3352        StatusCode::BAD_REQUEST,
3353        "ResourceNotFoundException",
3354        error_msg,
3355    ))
3356}
3357
3358fn find_tags<'a>(
3359    state: &'a crate::state::EventBridgeState,
3360    arn: &str,
3361) -> Result<&'a HashMap<String, String>, AwsServiceError> {
3362    for bus in state.buses.values() {
3363        if bus.arn == arn {
3364            return Ok(&bus.tags);
3365        }
3366    }
3367    for rule in state.rules.values() {
3368        if rule.arn == arn {
3369            return Ok(&rule.tags);
3370        }
3371    }
3372
3373    let error_msg = if arn.contains(":rule/") {
3374        let parts: Vec<&str> = arn.rsplitn(2, ":rule/").collect();
3375        if let Some(rule_path) = parts.first() {
3376            if let Some((bus, rule_name)) = rule_path.rsplit_once('/') {
3377                format!("Rule {rule_name} does not exist on EventBus {bus}.")
3378            } else {
3379                format!("Rule {} does not exist on EventBus default.", rule_path)
3380            }
3381        } else {
3382            format!("Resource {arn} not found.")
3383        }
3384    } else {
3385        format!("Resource {arn} not found.")
3386    };
3387
3388    Err(AwsServiceError::aws_error(
3389        StatusCode::BAD_REQUEST,
3390        "ResourceNotFoundException",
3391        error_msg,
3392    ))
3393}
3394
3395// ─── Event Pattern Validation ────────────────────────────────────────
3396
3397fn validate_event_pattern(pattern: &str) -> Result<(), AwsServiceError> {
3398    let parsed: Value = serde_json::from_str(pattern).map_err(|_| {
3399        AwsServiceError::aws_error(
3400            StatusCode::BAD_REQUEST,
3401            "InvalidEventPatternException",
3402            "Event pattern is not valid. Reason: Invalid JSON",
3403        )
3404    })?;
3405
3406    validate_pattern_values(&parsed, "")?;
3407    Ok(())
3408}
3409
3410fn validate_pattern_values(value: &Value, path: &str) -> Result<(), AwsServiceError> {
3411    match value {
3412        Value::Object(obj) => {
3413            for (key, val) in obj {
3414                let new_path = if path.is_empty() {
3415                    key.clone()
3416                } else {
3417                    format!("{path}.{key}")
3418                };
3419                match val {
3420                    Value::Object(_) => validate_pattern_values(val, &new_path)?,
3421                    Value::Array(_) => {} // Arrays are fine at leaf level
3422                    _ => {
3423                        return Err(AwsServiceError::aws_error(
3424                            StatusCode::BAD_REQUEST,
3425                            "InvalidEventPatternException",
3426                            format!(
3427                                "Event pattern is not valid. Reason: '{}' must be an object or an array",
3428                                key
3429                            ),
3430                        ));
3431                    }
3432                }
3433            }
3434            Ok(())
3435        }
3436        _ => Ok(()),
3437    }
3438}
3439
3440// ─── Connection Auth Params Response Builder ────────────────────────
3441
3442fn build_auth_params_response(auth_type: &str, params: &Value) -> Value {
3443    match auth_type {
3444        "API_KEY" => {
3445            let mut resp = json!({});
3446            if let Some(api_key) = params.get("ApiKeyAuthParameters") {
3447                resp["ApiKeyAuthParameters"] = json!({
3448                    "ApiKeyName": api_key["ApiKeyName"],
3449                });
3450            }
3451            resp
3452        }
3453        "BASIC" => {
3454            let mut resp = json!({});
3455            if let Some(basic) = params.get("BasicAuthParameters") {
3456                resp["BasicAuthParameters"] = json!({
3457                    "Username": basic["Username"],
3458                });
3459            }
3460            resp
3461        }
3462        "OAUTH_CLIENT_CREDENTIALS" => {
3463            let mut resp = json!({});
3464            if let Some(oauth) = params.get("OAuthParameters") {
3465                resp["OAuthParameters"] = json!({
3466                    "AuthorizationEndpoint": oauth["AuthorizationEndpoint"],
3467                    "HttpMethod": oauth["HttpMethod"],
3468                    "ClientParameters": {
3469                        "ClientID": oauth.get("ClientParameters").and_then(|c| c.get("ClientID")),
3470                    },
3471                });
3472            }
3473            resp
3474        }
3475        _ => params.clone(),
3476    }
3477}
3478
3479// ─── Event Pattern Matching ─────────────────────────────────────────
3480
3481/// Match an event against an EventBridge event pattern.
3482pub fn matches_pattern(
3483    pattern_json: Option<&str>,
3484    source: &str,
3485    detail_type: &str,
3486    detail: &str,
3487    account: &str,
3488    region: &str,
3489    resources: &[String],
3490) -> bool {
3491    let pattern_json = match pattern_json {
3492        Some(p) => p,
3493        None => return true,
3494    };
3495
3496    let pattern: Value = match serde_json::from_str(pattern_json) {
3497        Ok(v) => v,
3498        Err(_) => return false,
3499    };
3500
3501    let pattern_obj = match pattern.as_object() {
3502        Some(o) => o,
3503        None => return false,
3504    };
3505
3506    let detail_value: Value = serde_json::from_str(detail).unwrap_or(json!({}));
3507    let event = json!({
3508        "source": source,
3509        "detail-type": detail_type,
3510        "detail": detail_value,
3511        "account": account,
3512        "region": region,
3513        "resources": resources,
3514    });
3515
3516    for (key, pattern_value) in pattern_obj {
3517        let event_value = &event[key];
3518        if !matches_value(pattern_value, event_value) {
3519            return false;
3520        }
3521    }
3522
3523    true
3524}
3525
3526fn matches_value(pattern: &Value, event_value: &Value) -> bool {
3527    match pattern {
3528        Value::Object(obj) => {
3529            for (key, sub_pattern) in obj {
3530                let sub_value = &event_value[key];
3531                if !matches_value(sub_pattern, sub_value) {
3532                    return false;
3533                }
3534            }
3535            true
3536        }
3537        Value::Array(arr) => arr.iter().any(|elem| matches_single(elem, event_value)),
3538        _ => false,
3539    }
3540}
3541
3542fn matches_single(pattern_elem: &Value, event_value: &Value) -> bool {
3543    match pattern_elem {
3544        Value::Object(obj) => {
3545            if let Some(prefix_val) = obj.get("prefix") {
3546                if let (Some(prefix), Some(actual)) = (prefix_val.as_str(), event_value.as_str()) {
3547                    return actual.starts_with(prefix);
3548                }
3549                return false;
3550            }
3551            if let Some(exists_val) = obj.get("exists") {
3552                let should_exist = exists_val.as_bool().unwrap_or(true);
3553                let does_exist = !event_value.is_null();
3554                return should_exist == does_exist;
3555            }
3556            if let Some(anything_but_val) = obj.get("anything-but") {
3557                return match anything_but_val {
3558                    Value::String(s) => event_value.as_str() != Some(s.as_str()),
3559                    Value::Array(arr) => !arr.iter().any(|v| values_equal(v, event_value)),
3560                    Value::Number(_) => event_value != anything_but_val,
3561                    _ => true,
3562                };
3563            }
3564            if let Some(numeric_val) = obj.get("numeric") {
3565                return matches_numeric(numeric_val, event_value);
3566            }
3567            false
3568        }
3569        _ => values_equal(pattern_elem, event_value),
3570    }
3571}
3572
3573/// For each archive on `event_bus_name` whose event pattern matches the
3574/// event, append a clone of it to the archive's stored events and bump
3575/// the archive's counters.
3576#[allow(clippy::too_many_arguments)]
3577fn archive_matching_event(
3578    state: &mut crate::state::EventBridgeState,
3579    event: &PutEvent,
3580    event_bus_name: &str,
3581    source: &str,
3582    detail_type: &str,
3583    detail: &str,
3584    account_id: &str,
3585    region: &str,
3586    resources: &[String],
3587) {
3588    let archive_keys: Vec<String> = state.archives.keys().cloned().collect();
3589    for akey in archive_keys {
3590        let (archive_bus, archive_pattern, archive_enabled) = {
3591            let a = &state.archives[&akey];
3592            (
3593                state.resolve_bus_name(&a.event_source_arn),
3594                a.event_pattern.clone(),
3595                a.state == "ENABLED",
3596            )
3597        };
3598        if archive_bus != event_bus_name || !archive_enabled {
3599            continue;
3600        }
3601        let pattern_matches = matches_pattern(
3602            archive_pattern.as_deref(),
3603            source,
3604            detail_type,
3605            detail,
3606            account_id,
3607            region,
3608            resources,
3609        );
3610        if !pattern_matches {
3611            continue;
3612        }
3613        if let Some(archive) = state.archives.get_mut(&akey) {
3614            archive.event_count += 1;
3615            archive.size_bytes += detail.len() as i64;
3616            archive.events.push(event.clone());
3617        }
3618    }
3619}
3620
3621/// Parsed + validated inputs for `StartReplay`.
3622struct StartReplayInput {
3623    name: String,
3624    description: Option<String>,
3625    event_source_arn: String,
3626    destination: Value,
3627    destination_arn: String,
3628    event_start_time: DateTime<Utc>,
3629    event_end_time: DateTime<Utc>,
3630}
3631
3632impl StartReplayInput {
3633    fn from_body(body: &Value) -> Result<Self, AwsServiceError> {
3634        validate_required("ReplayName", &body["ReplayName"])?;
3635        let name = body["ReplayName"]
3636            .as_str()
3637            .ok_or_else(|| missing("ReplayName"))?
3638            .to_string();
3639        validate_string_length("replayName", &name, 1, 64)?;
3640        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
3641        validate_required("EventSourceArn", &body["EventSourceArn"])?;
3642        let description = body["Description"].as_str().map(|s| s.to_string());
3643        let event_source_arn = body["EventSourceArn"]
3644            .as_str()
3645            .ok_or_else(|| missing("EventSourceArn"))?
3646            .to_string();
3647        validate_string_length("eventSourceArn", &event_source_arn, 1, 1600)?;
3648        validate_required("EventStartTime", &body["EventStartTime"])?;
3649        validate_required("EventEndTime", &body["EventEndTime"])?;
3650        validate_required("Destination", &body["Destination"])?;
3651        let destination = body["Destination"].clone();
3652
3653        let event_start_time = body["EventStartTime"]
3654            .as_f64()
3655            .and_then(|f| DateTime::from_timestamp(f as i64, 0))
3656            .unwrap_or_else(Utc::now);
3657        let event_end_time = body["EventEndTime"]
3658            .as_f64()
3659            .and_then(|f| DateTime::from_timestamp(f as i64, 0))
3660            .unwrap_or_else(Utc::now);
3661
3662        let destination_arn = destination["Arn"].as_str().unwrap_or("").to_string();
3663        if !destination_arn.contains(":event-bus/") {
3664            return Err(AwsServiceError::aws_error(
3665                StatusCode::BAD_REQUEST,
3666                "ValidationException",
3667                "Parameter Destination.Arn is not valid. Reason: Must contain an event bus ARN.",
3668            ));
3669        }
3670
3671        Ok(Self {
3672            name,
3673            description,
3674            event_source_arn,
3675            destination,
3676            destination_arn,
3677            event_start_time,
3678            event_end_time,
3679        })
3680    }
3681}
3682
3683/// Walk the named archive, filter events into the replay window, then
3684/// fan out each event against rules on `bus_name` to collect its
3685/// matching targets. Returns only events that matched at least one
3686/// target.
3687#[allow(clippy::too_many_arguments)]
3688fn collect_replay_events_with_targets(
3689    state: &crate::state::EventBridgeState,
3690    archive_name: &str,
3691    bus_name: &str,
3692    event_start_time: DateTime<Utc>,
3693    event_end_time: DateTime<Utc>,
3694    account_id: &str,
3695    region: &str,
3696) -> Vec<(PutEvent, Vec<EventTarget>)> {
3697    let Some(archive) = state.archives.get(archive_name) else {
3698        return Vec::new();
3699    };
3700
3701    let replay_events: Vec<PutEvent> = archive
3702        .events
3703        .iter()
3704        .filter(|e| e.time >= event_start_time && e.time < event_end_time)
3705        .cloned()
3706        .collect();
3707
3708    let mut events_to_deliver: Vec<(PutEvent, Vec<EventTarget>)> = Vec::new();
3709    for event in replay_events {
3710        let matching_targets: Vec<EventTarget> = state
3711            .rules
3712            .values()
3713            .filter(|r| {
3714                r.event_bus_name == bus_name
3715                    && r.state == "ENABLED"
3716                    && matches_pattern(
3717                        r.event_pattern.as_deref(),
3718                        &event.source,
3719                        &event.detail_type,
3720                        &event.detail,
3721                        account_id,
3722                        region,
3723                        &event.resources,
3724                    )
3725            })
3726            .flat_map(|r| r.targets.clone())
3727            .collect();
3728
3729        if !matching_targets.is_empty() {
3730            events_to_deliver.push((event, matching_targets));
3731        }
3732    }
3733    events_to_deliver
3734}
3735
3736fn matches_numeric(numeric_arr: &Value, event_value: &Value) -> bool {
3737    let arr = match numeric_arr.as_array() {
3738        Some(a) => a,
3739        None => return false,
3740    };
3741    let actual = match event_value.as_f64() {
3742        Some(n) => n,
3743        None => return false,
3744    };
3745    let mut i = 0;
3746    while i + 1 < arr.len() {
3747        let op = match arr[i].as_str() {
3748            Some(s) => s,
3749            None => return false,
3750        };
3751        let threshold = match arr[i + 1].as_f64() {
3752            Some(n) => n,
3753            None => return false,
3754        };
3755        let ok = match op {
3756            ">" => actual > threshold,
3757            ">=" => actual >= threshold,
3758            "<" => actual < threshold,
3759            "<=" => actual <= threshold,
3760            "=" => (actual - threshold).abs() < f64::EPSILON,
3761            _ => return false,
3762        };
3763        if !ok {
3764            return false;
3765        }
3766        i += 2;
3767    }
3768    true
3769}
3770
3771fn values_equal(a: &Value, b: &Value) -> bool {
3772    a == b
3773}
3774
3775/// Resolve a simple JSON path like `$.detail.name` against an event JSON value.
3776fn resolve_json_path(event: &Value, path: &str) -> Option<Value> {
3777    let path = path.strip_prefix('$').unwrap_or(path);
3778    let mut current = event;
3779    for segment in path.split('.') {
3780        if segment.is_empty() {
3781            continue;
3782        }
3783        current = current.get(segment)?;
3784    }
3785    Some(current.clone())
3786}
3787
3788/// Apply an EventBridge InputTransformer to an event.
3789fn apply_input_transformer(transformer: &Value, event: &Value) -> String {
3790    let input_paths_map = transformer
3791        .get("InputPathsMap")
3792        .and_then(|v| v.as_object())
3793        .cloned()
3794        .unwrap_or_default();
3795    let template = transformer
3796        .get("InputTemplate")
3797        .and_then(|v| v.as_str())
3798        .unwrap_or("")
3799        .to_string();
3800
3801    // Resolve all input paths
3802    let mut resolved: HashMap<String, Value> = HashMap::new();
3803    for (var_name, path_val) in &input_paths_map {
3804        if let Some(path_str) = path_val.as_str() {
3805            if let Some(val) = resolve_json_path(event, path_str) {
3806                resolved.insert(var_name.clone(), val);
3807            }
3808        }
3809    }
3810
3811    // Replace <varName> placeholders in template
3812    let mut result = template;
3813    for (var_name, val) in &resolved {
3814        let placeholder = format!("<{var_name}>");
3815        let replacement = match val {
3816            Value::String(s) => s.clone(),
3817            other => other.to_string(),
3818        };
3819        result = result.replace(&placeholder, &replacement);
3820    }
3821
3822    result
3823}
3824
3825fn missing(name: &str) -> AwsServiceError {
3826    AwsServiceError::aws_error(
3827        StatusCode::BAD_REQUEST,
3828        "ValidationException",
3829        format!("The request must contain the parameter {name}"),
3830    )
3831}
3832
3833/// Extract a Lambda function name from its ARN.
3834///
3835/// Handles both unqualified (`arn:aws:lambda:region:account:function:NAME`)
3836/// and qualified (`arn:aws:lambda:region:account:function:NAME:alias`) ARNs.
3837fn function_name_from_arn(arn: &str) -> &str {
3838    let parts: Vec<&str> = arn.split(':').collect();
3839    if parts.len() >= 7 && parts[5] == "function" {
3840        parts[6]
3841    } else {
3842        arn
3843    }
3844}
3845
3846/// Spawn a background task to invoke a Lambda function via ContainerRuntime.
3847/// This is fire-and-forget: EventBridge delivery is asynchronous.
3848pub fn invoke_lambda_async(
3849    container_runtime: &Option<Arc<ContainerRuntime>>,
3850    lambda_state: &Option<SharedLambdaState>,
3851    function_arn: &str,
3852    payload: &str,
3853) {
3854    let runtime = match container_runtime {
3855        Some(rt) => rt.clone(),
3856        None => return,
3857    };
3858    let lambda_state = match lambda_state {
3859        Some(ls) => ls.clone(),
3860        None => return,
3861    };
3862    let func_name = function_name_from_arn(function_arn).to_string();
3863    let payload = payload.as_bytes().to_vec();
3864
3865    tokio::spawn(async move {
3866        let func = {
3867            let state = lambda_state.read();
3868            state.functions.get(&func_name).cloned()
3869        };
3870        let func = match func {
3871            Some(f) => f,
3872            None => {
3873                tracing::warn!(
3874                    function = %func_name,
3875                    "EventBridge Lambda target not found, skipping invocation"
3876                );
3877                return;
3878            }
3879        };
3880        match runtime.invoke(&func, &payload).await {
3881            Ok(_) => {
3882                tracing::info!(function = %func_name, "EventBridge Lambda invocation succeeded");
3883            }
3884            Err(e) => {
3885                tracing::warn!(
3886                    function = %func_name,
3887                    error = %e,
3888                    "EventBridge Lambda invocation failed"
3889                );
3890            }
3891        }
3892    });
3893}
3894
3895/// Deliver an EventBridge event to CloudWatch Logs by writing a log event
3896/// to the appropriate log group and stream.
3897pub fn deliver_to_logs(
3898    logs_state: &SharedLogsState,
3899    log_group_arn: &str,
3900    payload: &str,
3901    timestamp: chrono::DateTime<chrono::Utc>,
3902) {
3903    // Extract log group name from ARN: arn:aws:logs:region:account:log-group:NAME
3904    // or just the name if it's not an ARN
3905    let group_name = if log_group_arn.contains(":log-group:") {
3906        log_group_arn
3907            .split(":log-group:")
3908            .nth(1)
3909            .unwrap_or(log_group_arn)
3910            .trim_end_matches(":*")
3911    } else {
3912        log_group_arn
3913    };
3914
3915    let stream_name = "events".to_string();
3916    let ts_millis = timestamp.timestamp_millis();
3917
3918    let mut state = logs_state.write();
3919    let region = state.region.clone();
3920    let account_id = state.account_id.clone();
3921
3922    // Auto-create log group and stream if they don't exist
3923    let group = state
3924        .log_groups
3925        .entry(group_name.to_string())
3926        .or_insert_with(|| fakecloud_logs::state::LogGroup {
3927            name: group_name.to_string(),
3928            arn: Arn::new(
3929                "logs",
3930                &region,
3931                &account_id,
3932                &format!("log-group:{group_name}"),
3933            )
3934            .to_string(),
3935            creation_time: ts_millis,
3936            retention_in_days: None,
3937            kms_key_id: None,
3938            tags: HashMap::new(),
3939            log_streams: HashMap::new(),
3940            stored_bytes: 0,
3941            subscription_filters: Vec::new(),
3942            data_protection_policy: None,
3943            index_policies: Vec::new(),
3944            transformer: None,
3945            deletion_protection: false,
3946            log_group_class: Some("STANDARD".to_string()),
3947        });
3948
3949    let stream = group
3950        .log_streams
3951        .entry(stream_name.clone())
3952        .or_insert_with(|| fakecloud_logs::state::LogStream {
3953            name: stream_name,
3954            arn: format!("{}:log-stream:events", group.arn),
3955            creation_time: ts_millis,
3956            first_event_timestamp: None,
3957            last_event_timestamp: None,
3958            last_ingestion_time: None,
3959            upload_sequence_token: "1".to_string(),
3960            events: Vec::new(),
3961        });
3962
3963    stream.events.push(fakecloud_logs::state::LogEvent {
3964        timestamp: ts_millis,
3965        message: payload.to_string(),
3966        ingestion_time: ts_millis,
3967    });
3968    stream.last_event_timestamp = Some(ts_millis);
3969    stream.last_ingestion_time = Some(ts_millis);
3970    if stream.first_event_timestamp.is_none() {
3971        stream.first_event_timestamp = Some(ts_millis);
3972    }
3973}
3974
3975/// Apply connection auth parameters to an outgoing HTTP request.
3976fn apply_connection_auth(
3977    mut builder: reqwest::RequestBuilder,
3978    conn: &Connection,
3979) -> reqwest::RequestBuilder {
3980    match conn.authorization_type.as_str() {
3981        "API_KEY" => {
3982            if let Some(params) = conn.auth_parameters.get("ApiKeyAuthParameters") {
3983                if let (Some(name), Some(value)) = (
3984                    params["ApiKeyName"].as_str(),
3985                    params["ApiKeyValue"].as_str(),
3986                ) {
3987                    builder = builder.header(name, value);
3988                }
3989            }
3990        }
3991        "BASIC" => {
3992            if let Some(params) = conn.auth_parameters.get("BasicAuthParameters") {
3993                if let (Some(user), Some(pass)) =
3994                    (params["Username"].as_str(), params["Password"].as_str())
3995                {
3996                    builder = builder.basic_auth(user, Some(pass));
3997                }
3998            }
3999        }
4000        "OAUTH_CLIENT_CREDENTIALS" => {
4001            // For OAuth, in a real implementation we'd exchange credentials for a token.
4002            // Here we pass client credentials as basic auth as a reasonable approximation.
4003            if let Some(params) = conn.auth_parameters.get("OAuthParameters") {
4004                if let (Some(client_id), Some(client_secret)) = (
4005                    params["ClientParameters"]["ClientID"].as_str(),
4006                    params["ClientParameters"]["ClientSecret"].as_str(),
4007                ) {
4008                    builder = builder.basic_auth(client_id, Some(client_secret));
4009                }
4010            }
4011        }
4012        _ => {}
4013    }
4014    builder
4015}
4016
4017#[cfg(test)]
4018mod tests {
4019    use super::*;
4020
4021    /// Test helper that calls matches_pattern with default account/region/resources
4022    fn test_matches(
4023        pattern_json: Option<&str>,
4024        source: &str,
4025        detail_type: &str,
4026        detail: &str,
4027    ) -> bool {
4028        matches_pattern(
4029            pattern_json,
4030            source,
4031            detail_type,
4032            detail,
4033            "123456789012",
4034            "us-east-1",
4035            &[],
4036        )
4037    }
4038
4039    #[test]
4040    fn pattern_matches_source() {
4041        assert!(test_matches(
4042            Some(r#"{"source": ["my.app"]}"#),
4043            "my.app",
4044            "OrderPlaced",
4045            "{}"
4046        ));
4047        assert!(!test_matches(
4048            Some(r#"{"source": ["other.app"]}"#),
4049            "my.app",
4050            "OrderPlaced",
4051            "{}"
4052        ));
4053    }
4054
4055    #[test]
4056    fn pattern_matches_detail_type() {
4057        assert!(test_matches(
4058            Some(r#"{"detail-type": ["OrderPlaced"]}"#),
4059            "my.app",
4060            "OrderPlaced",
4061            "{}"
4062        ));
4063        assert!(!test_matches(
4064            Some(r#"{"detail-type": ["OrderShipped"]}"#),
4065            "my.app",
4066            "OrderPlaced",
4067            "{}"
4068        ));
4069    }
4070
4071    #[test]
4072    fn pattern_matches_detail_field() {
4073        assert!(test_matches(
4074            Some(r#"{"detail": {"status": ["ACTIVE"]}}"#),
4075            "my.app",
4076            "StatusChange",
4077            r#"{"status": "ACTIVE"}"#
4078        ));
4079        assert!(!test_matches(
4080            Some(r#"{"detail": {"status": ["ACTIVE"]}}"#),
4081            "my.app",
4082            "StatusChange",
4083            r#"{"status": "INACTIVE"}"#
4084        ));
4085    }
4086
4087    #[test]
4088    fn no_pattern_matches_everything() {
4089        assert!(test_matches(None, "any", "any", "{}"));
4090    }
4091
4092    #[test]
4093    fn combined_pattern() {
4094        let pattern = r#"{"source": ["orders"], "detail-type": ["OrderPlaced"]}"#;
4095        assert!(test_matches(Some(pattern), "orders", "OrderPlaced", "{}"));
4096        assert!(!test_matches(Some(pattern), "orders", "OrderShipped", "{}"));
4097        assert!(!test_matches(Some(pattern), "other", "OrderPlaced", "{}"));
4098    }
4099
4100    #[test]
4101    fn nested_detail_pattern() {
4102        let pattern = r#"{"detail": {"order": {"status": ["PLACED"]}}}"#;
4103        assert!(test_matches(
4104            Some(pattern),
4105            "my.app",
4106            "OrderEvent",
4107            r#"{"order": {"status": "PLACED", "id": "123"}}"#
4108        ));
4109        assert!(!test_matches(
4110            Some(pattern),
4111            "my.app",
4112            "OrderEvent",
4113            r#"{"order": {"status": "SHIPPED", "id": "123"}}"#
4114        ));
4115        assert!(!test_matches(
4116            Some(pattern),
4117            "my.app",
4118            "OrderEvent",
4119            r#"{"order": {"id": "123"}}"#
4120        ));
4121    }
4122
4123    #[test]
4124    fn deeply_nested_detail_pattern() {
4125        let pattern = r#"{"detail": {"a": {"b": {"c": ["deep"]}}}}"#;
4126        assert!(test_matches(
4127            Some(pattern),
4128            "src",
4129            "type",
4130            r#"{"a": {"b": {"c": "deep"}}}"#
4131        ));
4132        assert!(!test_matches(
4133            Some(pattern),
4134            "src",
4135            "type",
4136            r#"{"a": {"b": {"c": "shallow"}}}"#
4137        ));
4138    }
4139
4140    #[test]
4141    fn prefix_matcher() {
4142        let pattern = r#"{"source": [{"prefix": "com.myapp"}]}"#;
4143        assert!(test_matches(
4144            Some(pattern),
4145            "com.myapp.orders",
4146            "OrderPlaced",
4147            "{}"
4148        ));
4149        assert!(test_matches(
4150            Some(pattern),
4151            "com.myapp",
4152            "OrderPlaced",
4153            "{}"
4154        ));
4155        assert!(!test_matches(
4156            Some(pattern),
4157            "com.other",
4158            "OrderPlaced",
4159            "{}"
4160        ));
4161    }
4162
4163    #[test]
4164    fn prefix_matcher_in_detail() {
4165        let pattern = r#"{"detail": {"region": [{"prefix": "us-"}]}}"#;
4166        assert!(test_matches(
4167            Some(pattern),
4168            "src",
4169            "type",
4170            r#"{"region": "us-east-1"}"#
4171        ));
4172        assert!(!test_matches(
4173            Some(pattern),
4174            "src",
4175            "type",
4176            r#"{"region": "eu-west-1"}"#
4177        ));
4178    }
4179
4180    #[test]
4181    fn exists_matcher() {
4182        let pattern = r#"{"detail": {"error": [{"exists": true}]}}"#;
4183        assert!(test_matches(
4184            Some(pattern),
4185            "src",
4186            "type",
4187            r#"{"error": "something broke"}"#
4188        ));
4189        assert!(!test_matches(
4190            Some(pattern),
4191            "src",
4192            "type",
4193            r#"{"status": "ok"}"#
4194        ));
4195
4196        let pattern = r#"{"detail": {"error": [{"exists": false}]}}"#;
4197        assert!(test_matches(
4198            Some(pattern),
4199            "src",
4200            "type",
4201            r#"{"status": "ok"}"#
4202        ));
4203        assert!(!test_matches(
4204            Some(pattern),
4205            "src",
4206            "type",
4207            r#"{"error": "something broke"}"#
4208        ));
4209    }
4210
4211    #[test]
4212    fn anything_but_matcher() {
4213        let pattern = r#"{"source": [{"anything-but": "internal"}]}"#;
4214        assert!(test_matches(Some(pattern), "external", "Event", "{}"));
4215        assert!(!test_matches(Some(pattern), "internal", "Event", "{}"));
4216
4217        let pattern = r#"{"source": [{"anything-but": ["internal", "test"]}]}"#;
4218        assert!(test_matches(Some(pattern), "external", "Event", "{}"));
4219        assert!(!test_matches(Some(pattern), "internal", "Event", "{}"));
4220        assert!(!test_matches(Some(pattern), "test", "Event", "{}"));
4221    }
4222
4223    #[test]
4224    fn anything_but_in_detail() {
4225        let pattern = r#"{"detail": {"env": [{"anything-but": "prod"}]}}"#;
4226        assert!(test_matches(
4227            Some(pattern),
4228            "src",
4229            "type",
4230            r#"{"env": "staging"}"#
4231        ));
4232        assert!(!test_matches(
4233            Some(pattern),
4234            "src",
4235            "type",
4236            r#"{"env": "prod"}"#
4237        ));
4238    }
4239
4240    #[test]
4241    fn numeric_greater_than() {
4242        let pattern = r#"{"detail": {"count": [{"numeric": [">", 100]}]}}"#;
4243        assert!(test_matches(
4244            Some(pattern),
4245            "src",
4246            "type",
4247            r#"{"count": 150}"#
4248        ));
4249        assert!(!test_matches(
4250            Some(pattern),
4251            "src",
4252            "type",
4253            r#"{"count": 100}"#
4254        ));
4255        assert!(!test_matches(
4256            Some(pattern),
4257            "src",
4258            "type",
4259            r#"{"count": 50}"#
4260        ));
4261    }
4262
4263    #[test]
4264    fn numeric_less_than() {
4265        let pattern = r#"{"detail": {"count": [{"numeric": ["<", 10]}]}}"#;
4266        assert!(test_matches(
4267            Some(pattern),
4268            "src",
4269            "type",
4270            r#"{"count": 5}"#
4271        ));
4272        assert!(!test_matches(
4273            Some(pattern),
4274            "src",
4275            "type",
4276            r#"{"count": 10}"#
4277        ));
4278        assert!(!test_matches(
4279            Some(pattern),
4280            "src",
4281            "type",
4282            r#"{"count": 15}"#
4283        ));
4284    }
4285
4286    #[test]
4287    fn numeric_range() {
4288        let pattern = r#"{"detail": {"count": [{"numeric": [">=", 50, "<", 200]}]}}"#;
4289        assert!(test_matches(
4290            Some(pattern),
4291            "src",
4292            "type",
4293            r#"{"count": 50}"#
4294        ));
4295        assert!(test_matches(
4296            Some(pattern),
4297            "src",
4298            "type",
4299            r#"{"count": 100}"#
4300        ));
4301        assert!(!test_matches(
4302            Some(pattern),
4303            "src",
4304            "type",
4305            r#"{"count": 200}"#
4306        ));
4307        assert!(!test_matches(
4308            Some(pattern),
4309            "src",
4310            "type",
4311            r#"{"count": 49}"#
4312        ));
4313    }
4314
4315    #[test]
4316    fn mixed_matchers_and_literals() {
4317        let pattern = r#"{"source": ["exact.match", {"prefix": "com.myapp"}]}"#;
4318        assert!(test_matches(Some(pattern), "exact.match", "Event", "{}"));
4319        assert!(test_matches(
4320            Some(pattern),
4321            "com.myapp.orders",
4322            "Event",
4323            "{}"
4324        ));
4325        assert!(!test_matches(Some(pattern), "other.source", "Event", "{}"));
4326    }
4327
4328    // ---- list_connections / list_api_destinations filtering & pagination ----
4329
4330    use crate::state::EventBridgeState;
4331    use fakecloud_core::delivery::DeliveryBus;
4332    use parking_lot::RwLock;
4333
4334    fn make_service() -> EventBridgeService {
4335        let state = Arc::new(RwLock::new(EventBridgeState::new(
4336            "123456789012",
4337            "us-east-1",
4338        )));
4339        let delivery = Arc::new(DeliveryBus::new());
4340        EventBridgeService::new(state, delivery)
4341    }
4342
4343    fn make_request(action: &str, body: Value) -> AwsRequest {
4344        AwsRequest {
4345            service: "events".to_string(),
4346            action: action.to_string(),
4347            region: "us-east-1".to_string(),
4348            account_id: "123456789012".to_string(),
4349            request_id: "test-id".to_string(),
4350            headers: http::HeaderMap::new(),
4351            query_params: HashMap::new(),
4352            body: serde_json::to_vec(&body).unwrap().into(),
4353            path_segments: vec![],
4354            raw_path: "/".to_string(),
4355            raw_query: String::new(),
4356            method: http::Method::POST,
4357            is_query_protocol: false,
4358            access_key_id: None,
4359            principal: None,
4360        }
4361    }
4362
4363    fn create_connection(svc: &EventBridgeService, name: &str) {
4364        let req = make_request(
4365            "CreateConnection",
4366            json!({
4367                "Name": name,
4368                "AuthorizationType": "API_KEY",
4369                "AuthParameters": {
4370                    "ApiKeyAuthParameters": {
4371                        "ApiKeyName": "x-api-key",
4372                        "ApiKeyValue": "secret"
4373                    }
4374                }
4375            }),
4376        );
4377        svc.create_connection(&req).unwrap();
4378    }
4379
4380    fn create_api_destination(svc: &EventBridgeService, name: &str, conn_name: &str) {
4381        let conn_arn_field = {
4382            let state = svc.state.read();
4383            state.connections.get(conn_name).unwrap().arn.clone()
4384        };
4385        let req = make_request(
4386            "CreateApiDestination",
4387            json!({
4388                "Name": name,
4389                "ConnectionArn": conn_arn_field,
4390                "InvocationEndpoint": "https://example.com",
4391                "HttpMethod": "POST"
4392            }),
4393        );
4394        svc.create_api_destination(&req).unwrap();
4395    }
4396
4397    // -- ListConnections tests --
4398
4399    #[test]
4400    fn list_connections_returns_all_by_default() {
4401        let svc = make_service();
4402        create_connection(&svc, "conn-alpha");
4403        create_connection(&svc, "conn-beta");
4404        create_connection(&svc, "conn-gamma");
4405
4406        let req = make_request("ListConnections", json!({}));
4407        let resp = svc.list_connections(&req).unwrap();
4408        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4409        assert_eq!(body["Connections"].as_array().unwrap().len(), 3);
4410        assert!(body["NextToken"].is_null());
4411    }
4412
4413    #[test]
4414    fn list_connections_name_prefix_filter() {
4415        let svc = make_service();
4416        create_connection(&svc, "prod-conn-1");
4417        create_connection(&svc, "prod-conn-2");
4418        create_connection(&svc, "dev-conn-1");
4419
4420        let req = make_request("ListConnections", json!({ "NamePrefix": "prod-" }));
4421        let resp = svc.list_connections(&req).unwrap();
4422        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4423        let names: Vec<&str> = body["Connections"]
4424            .as_array()
4425            .unwrap()
4426            .iter()
4427            .map(|c| c["Name"].as_str().unwrap())
4428            .collect();
4429        assert_eq!(names.len(), 2);
4430        assert!(names.iter().all(|n| n.starts_with("prod-")));
4431    }
4432
4433    #[test]
4434    fn list_connections_state_filter() {
4435        let svc = make_service();
4436        create_connection(&svc, "conn-a");
4437        create_connection(&svc, "conn-b");
4438
4439        // All connections start as AUTHORIZED; change one
4440        {
4441            let mut state = svc.state.write();
4442            state
4443                .connections
4444                .get_mut("conn-b")
4445                .unwrap()
4446                .connection_state = "DEAUTHORIZED".to_string();
4447        }
4448
4449        let req = make_request(
4450            "ListConnections",
4451            json!({ "ConnectionState": "AUTHORIZED" }),
4452        );
4453        let resp = svc.list_connections(&req).unwrap();
4454        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4455        let conns = body["Connections"].as_array().unwrap();
4456        assert_eq!(conns.len(), 1);
4457        assert_eq!(conns[0]["Name"].as_str().unwrap(), "conn-a");
4458    }
4459
4460    #[test]
4461    fn list_connections_pagination() {
4462        let svc = make_service();
4463        for i in 0..5 {
4464            create_connection(&svc, &format!("conn-{i:02}"));
4465        }
4466
4467        // First page: limit 2
4468        let req = make_request("ListConnections", json!({ "Limit": 2 }));
4469        let resp = svc.list_connections(&req).unwrap();
4470        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4471        assert_eq!(body["Connections"].as_array().unwrap().len(), 2);
4472        let token = body["NextToken"].as_str().unwrap();
4473        assert_eq!(token, "2");
4474
4475        // Second page
4476        let req = make_request("ListConnections", json!({ "Limit": 2, "NextToken": token }));
4477        let resp = svc.list_connections(&req).unwrap();
4478        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4479        assert_eq!(body["Connections"].as_array().unwrap().len(), 2);
4480        let token = body["NextToken"].as_str().unwrap();
4481        assert_eq!(token, "4");
4482
4483        // Third page (only 1 remaining)
4484        let req = make_request("ListConnections", json!({ "Limit": 2, "NextToken": token }));
4485        let resp = svc.list_connections(&req).unwrap();
4486        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4487        assert_eq!(body["Connections"].as_array().unwrap().len(), 1);
4488        assert!(body["NextToken"].is_null());
4489    }
4490
4491    #[test]
4492    fn list_connections_pagination_with_filter() {
4493        let svc = make_service();
4494        for i in 0..4 {
4495            create_connection(&svc, &format!("prod-{i:02}"));
4496        }
4497        create_connection(&svc, "dev-00");
4498
4499        let req = make_request(
4500            "ListConnections",
4501            json!({ "NamePrefix": "prod-", "Limit": 2 }),
4502        );
4503        let resp = svc.list_connections(&req).unwrap();
4504        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4505        assert_eq!(body["Connections"].as_array().unwrap().len(), 2);
4506        assert!(body["NextToken"].as_str().is_some());
4507    }
4508
4509    // -- ListApiDestinations tests --
4510
4511    #[test]
4512    fn list_api_destinations_returns_all_by_default() {
4513        let svc = make_service();
4514        create_connection(&svc, "my-conn");
4515        create_api_destination(&svc, "dest-alpha", "my-conn");
4516        create_api_destination(&svc, "dest-beta", "my-conn");
4517
4518        let req = make_request("ListApiDestinations", json!({}));
4519        let resp = svc.list_api_destinations(&req).unwrap();
4520        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4521        assert_eq!(body["ApiDestinations"].as_array().unwrap().len(), 2);
4522        assert!(body["NextToken"].is_null());
4523    }
4524
4525    #[test]
4526    fn list_api_destinations_name_prefix_filter() {
4527        let svc = make_service();
4528        create_connection(&svc, "my-conn");
4529        create_api_destination(&svc, "prod-dest-1", "my-conn");
4530        create_api_destination(&svc, "prod-dest-2", "my-conn");
4531        create_api_destination(&svc, "dev-dest-1", "my-conn");
4532
4533        let req = make_request("ListApiDestinations", json!({ "NamePrefix": "prod-" }));
4534        let resp = svc.list_api_destinations(&req).unwrap();
4535        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4536        let names: Vec<&str> = body["ApiDestinations"]
4537            .as_array()
4538            .unwrap()
4539            .iter()
4540            .map(|d| d["Name"].as_str().unwrap())
4541            .collect();
4542        assert_eq!(names.len(), 2);
4543        assert!(names.iter().all(|n| n.starts_with("prod-")));
4544    }
4545
4546    #[test]
4547    fn list_api_destinations_connection_arn_filter() {
4548        let svc = make_service();
4549        create_connection(&svc, "conn-a");
4550        create_connection(&svc, "conn-b");
4551        create_api_destination(&svc, "dest-1", "conn-a");
4552        create_api_destination(&svc, "dest-2", "conn-b");
4553        create_api_destination(&svc, "dest-3", "conn-a");
4554
4555        let conn_a_arn = {
4556            let state = svc.state.read();
4557            state.connections.get("conn-a").unwrap().arn.clone()
4558        };
4559
4560        let req = make_request(
4561            "ListApiDestinations",
4562            json!({ "ConnectionArn": conn_a_arn }),
4563        );
4564        let resp = svc.list_api_destinations(&req).unwrap();
4565        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4566        let names: Vec<&str> = body["ApiDestinations"]
4567            .as_array()
4568            .unwrap()
4569            .iter()
4570            .map(|d| d["Name"].as_str().unwrap())
4571            .collect();
4572        assert_eq!(names.len(), 2);
4573        assert!(names.contains(&"dest-1"));
4574        assert!(names.contains(&"dest-3"));
4575    }
4576
4577    #[test]
4578    fn list_api_destinations_pagination() {
4579        let svc = make_service();
4580        create_connection(&svc, "my-conn");
4581        for i in 0..5 {
4582            create_api_destination(&svc, &format!("dest-{i:02}"), "my-conn");
4583        }
4584
4585        // First page
4586        let req = make_request("ListApiDestinations", json!({ "Limit": 2 }));
4587        let resp = svc.list_api_destinations(&req).unwrap();
4588        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4589        assert_eq!(body["ApiDestinations"].as_array().unwrap().len(), 2);
4590        let token = body["NextToken"].as_str().unwrap();
4591        assert_eq!(token, "2");
4592
4593        // Second page
4594        let req = make_request(
4595            "ListApiDestinations",
4596            json!({ "Limit": 2, "NextToken": token }),
4597        );
4598        let resp = svc.list_api_destinations(&req).unwrap();
4599        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4600        assert_eq!(body["ApiDestinations"].as_array().unwrap().len(), 2);
4601        let token = body["NextToken"].as_str().unwrap();
4602        assert_eq!(token, "4");
4603
4604        // Last page
4605        let req = make_request(
4606            "ListApiDestinations",
4607            json!({ "Limit": 2, "NextToken": token }),
4608        );
4609        let resp = svc.list_api_destinations(&req).unwrap();
4610        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4611        assert_eq!(body["ApiDestinations"].as_array().unwrap().len(), 1);
4612        assert!(body["NextToken"].is_null());
4613    }
4614
4615    // -- ListEventBuses pagination tests --
4616
4617    fn create_event_bus(svc: &EventBridgeService, name: &str) {
4618        let req = make_request("CreateEventBus", json!({ "Name": name }));
4619        svc.create_event_bus(&req).unwrap();
4620    }
4621
4622    #[test]
4623    fn list_event_buses_pagination() {
4624        let svc = make_service();
4625        // "default" bus already exists, create 4 more
4626        for i in 0..4 {
4627            create_event_bus(&svc, &format!("bus-{i:02}"));
4628        }
4629
4630        // First page: limit 2
4631        let req = make_request("ListEventBuses", json!({ "Limit": 2 }));
4632        let resp = svc.list_event_buses(&req).unwrap();
4633        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4634        assert_eq!(body["EventBuses"].as_array().unwrap().len(), 2);
4635        let token = body["NextToken"].as_str().unwrap();
4636        assert_eq!(token, "2");
4637
4638        // Second page
4639        let req = make_request("ListEventBuses", json!({ "Limit": 2, "NextToken": token }));
4640        let resp = svc.list_event_buses(&req).unwrap();
4641        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4642        assert_eq!(body["EventBuses"].as_array().unwrap().len(), 2);
4643        let token = body["NextToken"].as_str().unwrap();
4644        assert_eq!(token, "4");
4645
4646        // Third page (only 1 remaining)
4647        let req = make_request("ListEventBuses", json!({ "Limit": 2, "NextToken": token }));
4648        let resp = svc.list_event_buses(&req).unwrap();
4649        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4650        assert_eq!(body["EventBuses"].as_array().unwrap().len(), 1);
4651        assert!(body["NextToken"].is_null());
4652    }
4653
4654    #[test]
4655    fn list_event_buses_no_pagination_returns_all() {
4656        let svc = make_service();
4657        create_event_bus(&svc, "bus-alpha");
4658        create_event_bus(&svc, "bus-beta");
4659
4660        let req = make_request("ListEventBuses", json!({}));
4661        let resp = svc.list_event_buses(&req).unwrap();
4662        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4663        // default + 2 custom = 3
4664        assert_eq!(body["EventBuses"].as_array().unwrap().len(), 3);
4665        assert!(body["NextToken"].is_null());
4666    }
4667
4668    // -- PutEvents EndpointId tests --
4669
4670    #[test]
4671    fn put_events_never_includes_endpoint_id_in_response() {
4672        let svc = make_service();
4673        // Even when EndpointId is provided in the request, it must not appear in the response
4674        let req = make_request(
4675            "PutEvents",
4676            json!({
4677                "EndpointId": "my-endpoint.abc123",
4678                "Entries": [{
4679                    "Source": "my.source",
4680                    "DetailType": "MyType",
4681                    "Detail": "{}",
4682                    "EventBusName": "default"
4683                }]
4684            }),
4685        );
4686        let resp = svc.put_events(&req).unwrap();
4687        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4688        assert!(
4689            !body.as_object().unwrap().contains_key("EndpointId"),
4690            "EndpointId should never be in the PutEvents response"
4691        );
4692        assert_eq!(body["FailedEntryCount"], 0);
4693    }
4694
4695    // -- ListArchives pagination tests --
4696
4697    fn create_archive(svc: &EventBridgeService, name: &str) {
4698        let req = make_request(
4699            "CreateArchive",
4700            json!({
4701                "ArchiveName": name,
4702                "EventSourceArn": "arn:aws:events:us-east-1:123456789012:event-bus/default"
4703            }),
4704        );
4705        svc.create_archive(&req).unwrap();
4706    }
4707
4708    #[test]
4709    fn list_archives_pagination() {
4710        let svc = make_service();
4711        for i in 0..5 {
4712            create_archive(&svc, &format!("archive-{i:02}"));
4713        }
4714
4715        // First page: limit 2
4716        let req = make_request("ListArchives", json!({ "Limit": 2 }));
4717        let resp = svc.list_archives(&req).unwrap();
4718        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4719        assert_eq!(body["Archives"].as_array().unwrap().len(), 2);
4720        let token = body["NextToken"].as_str().unwrap();
4721        assert_eq!(token, "2");
4722
4723        // Second page
4724        let req = make_request("ListArchives", json!({ "Limit": 2, "NextToken": token }));
4725        let resp = svc.list_archives(&req).unwrap();
4726        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4727        assert_eq!(body["Archives"].as_array().unwrap().len(), 2);
4728        let token = body["NextToken"].as_str().unwrap();
4729        assert_eq!(token, "4");
4730
4731        // Third page (only 1 remaining)
4732        let req = make_request("ListArchives", json!({ "Limit": 2, "NextToken": token }));
4733        let resp = svc.list_archives(&req).unwrap();
4734        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4735        assert_eq!(body["Archives"].as_array().unwrap().len(), 1);
4736        assert!(body["NextToken"].is_null());
4737    }
4738
4739    // -- ListReplays pagination tests --
4740
4741    fn create_replay(svc: &EventBridgeService, name: &str) {
4742        // Need an archive first for the replay's event source
4743        let archive_arn = {
4744            let state = svc.state.read();
4745            if state.archives.contains_key("replay-archive") {
4746                state.archives["replay-archive"].arn.clone()
4747            } else {
4748                drop(state);
4749                create_archive(svc, "replay-archive");
4750                svc.state.read().archives["replay-archive"].arn.clone()
4751            }
4752        };
4753        let req = make_request(
4754            "StartReplay",
4755            json!({
4756                "ReplayName": name,
4757                "EventSourceArn": archive_arn,
4758                "EventStartTime": 1000000.0,
4759                "EventEndTime": 2000000.0,
4760                "Destination": {
4761                    "Arn": "arn:aws:events:us-east-1:123456789012:event-bus/default"
4762                }
4763            }),
4764        );
4765        svc.start_replay(&req).unwrap();
4766    }
4767
4768    #[test]
4769    fn list_replays_pagination() {
4770        let svc = make_service();
4771        for i in 0..5 {
4772            create_replay(&svc, &format!("replay-{i:02}"));
4773        }
4774
4775        // First page: limit 2
4776        let req = make_request("ListReplays", json!({ "Limit": 2 }));
4777        let resp = svc.list_replays(&req).unwrap();
4778        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4779        assert_eq!(body["Replays"].as_array().unwrap().len(), 2);
4780        let token = body["NextToken"].as_str().unwrap();
4781        assert_eq!(token, "2");
4782
4783        // Second page
4784        let req = make_request("ListReplays", json!({ "Limit": 2, "NextToken": token }));
4785        let resp = svc.list_replays(&req).unwrap();
4786        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4787        assert_eq!(body["Replays"].as_array().unwrap().len(), 2);
4788        let token = body["NextToken"].as_str().unwrap();
4789        assert_eq!(token, "4");
4790
4791        // Third page (only 1 remaining)
4792        let req = make_request("ListReplays", json!({ "Limit": 2, "NextToken": token }));
4793        let resp = svc.list_replays(&req).unwrap();
4794        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4795        assert_eq!(body["Replays"].as_array().unwrap().len(), 1);
4796        assert!(body["NextToken"].is_null());
4797    }
4798
4799    #[test]
4800    fn list_event_buses_invalid_next_token_returns_error() {
4801        let svc = make_service();
4802
4803        let req = make_request("ListEventBuses", json!({ "NextToken": "not-a-number" }));
4804        let result = svc.list_event_buses(&req);
4805        assert!(
4806            result.is_err(),
4807            "non-numeric NextToken should return an error"
4808        );
4809    }
4810
4811    // ---- TestEventPattern tests ----
4812
4813    #[test]
4814    fn test_event_pattern_match() {
4815        let svc = make_service();
4816        let req = make_request(
4817            "TestEventPattern",
4818            json!({
4819                "EventPattern": r#"{"source": ["my.app"]}"#,
4820                "Event": r#"{"source": "my.app", "detail-type": "Test", "detail": {}}"#
4821            }),
4822        );
4823        let resp = svc.test_event_pattern(&req).unwrap();
4824        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4825        assert_eq!(body["Result"], true);
4826    }
4827
4828    #[test]
4829    fn test_event_pattern_no_match() {
4830        let svc = make_service();
4831        let req = make_request(
4832            "TestEventPattern",
4833            json!({
4834                "EventPattern": r#"{"source": ["other.app"]}"#,
4835                "Event": r#"{"source": "my.app", "detail-type": "Test", "detail": {}}"#
4836            }),
4837        );
4838        let resp = svc.test_event_pattern(&req).unwrap();
4839        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4840        assert_eq!(body["Result"], false);
4841    }
4842
4843    #[test]
4844    fn test_event_pattern_detail_match() {
4845        let svc = make_service();
4846        let req = make_request(
4847            "TestEventPattern",
4848            json!({
4849                "EventPattern": r#"{"detail": {"status": ["PLACED"]}}"#,
4850                "Event": r#"{"source": "my.app", "detail-type": "Order", "detail": {"status": "PLACED", "id": "123"}}"#
4851            }),
4852        );
4853        let resp = svc.test_event_pattern(&req).unwrap();
4854        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4855        assert_eq!(body["Result"], true);
4856    }
4857
4858    // ---- UpdateEventBus tests ----
4859
4860    #[test]
4861    fn update_event_bus_description() {
4862        let svc = make_service();
4863        create_event_bus(&svc, "my-bus");
4864
4865        let req = make_request(
4866            "UpdateEventBus",
4867            json!({ "Name": "my-bus", "Description": "Updated desc" }),
4868        );
4869        let resp = svc.update_event_bus(&req).unwrap();
4870        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4871        assert_eq!(body["Name"], "my-bus");
4872
4873        // Verify via describe
4874        let req = make_request("DescribeEventBus", json!({ "Name": "my-bus" }));
4875        let resp = svc.describe_event_bus(&req).unwrap();
4876        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4877        assert_eq!(body["Description"], "Updated desc");
4878    }
4879
4880    #[test]
4881    fn update_event_bus_not_found() {
4882        let svc = make_service();
4883        let req = make_request(
4884            "UpdateEventBus",
4885            json!({ "Name": "ghost-bus", "Description": "nope" }),
4886        );
4887        assert!(svc.update_event_bus(&req).is_err());
4888    }
4889
4890    // ---- Endpoint CRUD tests ----
4891
4892    fn create_endpoint_helper(svc: &EventBridgeService, name: &str) {
4893        let req = make_request(
4894            "CreateEndpoint",
4895            json!({
4896                "Name": name,
4897                "RoutingConfig": {
4898                    "FailoverConfig": {
4899                        "Primary": { "HealthCheck": "" },
4900                        "Secondary": { "Route": "us-west-2" }
4901                    }
4902                },
4903                "EventBuses": [
4904                    { "EventBusArn": "arn:aws:events:us-east-1:123456789012:event-bus/default" }
4905                ]
4906            }),
4907        );
4908        svc.create_endpoint(&req).unwrap();
4909    }
4910
4911    #[test]
4912    fn endpoint_create_describe_delete() {
4913        let svc = make_service();
4914        create_endpoint_helper(&svc, "my-endpoint");
4915
4916        // Describe
4917        let req = make_request("DescribeEndpoint", json!({ "Name": "my-endpoint" }));
4918        let resp = svc.describe_endpoint(&req).unwrap();
4919        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4920        assert_eq!(body["Name"], "my-endpoint");
4921        assert_eq!(body["State"], "ACTIVE");
4922        assert!(body["EndpointId"].as_str().unwrap().contains("my-endpoint"));
4923
4924        // Delete
4925        let req = make_request("DeleteEndpoint", json!({ "Name": "my-endpoint" }));
4926        svc.delete_endpoint(&req).unwrap();
4927
4928        // Verify gone
4929        let req = make_request("DescribeEndpoint", json!({ "Name": "my-endpoint" }));
4930        assert!(svc.describe_endpoint(&req).is_err());
4931    }
4932
4933    #[test]
4934    fn endpoint_list_and_update() {
4935        let svc = make_service();
4936        create_endpoint_helper(&svc, "ep-alpha");
4937        create_endpoint_helper(&svc, "ep-beta");
4938
4939        // List all
4940        let req = make_request("ListEndpoints", json!({}));
4941        let resp = svc.list_endpoints(&req).unwrap();
4942        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4943        assert_eq!(body["Endpoints"].as_array().unwrap().len(), 2);
4944
4945        // Update
4946        let req = make_request(
4947            "UpdateEndpoint",
4948            json!({ "Name": "ep-alpha", "Description": "updated" }),
4949        );
4950        let resp = svc.update_endpoint(&req).unwrap();
4951        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4952        assert_eq!(body["Name"], "ep-alpha");
4953
4954        // Verify description
4955        let req = make_request("DescribeEndpoint", json!({ "Name": "ep-alpha" }));
4956        let resp = svc.describe_endpoint(&req).unwrap();
4957        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4958        assert_eq!(body["Description"], "updated");
4959    }
4960
4961    #[test]
4962    fn endpoint_duplicate_fails() {
4963        let svc = make_service();
4964        create_endpoint_helper(&svc, "dup-ep");
4965        let req = make_request(
4966            "CreateEndpoint",
4967            json!({
4968                "Name": "dup-ep",
4969                "RoutingConfig": {},
4970                "EventBuses": []
4971            }),
4972        );
4973        assert!(svc.create_endpoint(&req).is_err());
4974    }
4975
4976    // ---- DeauthorizeConnection tests ----
4977
4978    #[test]
4979    fn deauthorize_connection_sets_state() {
4980        let svc = make_service();
4981        create_connection(&svc, "deauth-conn");
4982
4983        let req = make_request("DeauthorizeConnection", json!({ "Name": "deauth-conn" }));
4984        let resp = svc.deauthorize_connection(&req).unwrap();
4985        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4986        assert_eq!(body["ConnectionState"], "DEAUTHORIZING");
4987        assert!(body["ConnectionArn"]
4988            .as_str()
4989            .unwrap()
4990            .contains("deauth-conn"));
4991
4992        // Verify via describe
4993        let req = make_request("DescribeConnection", json!({ "Name": "deauth-conn" }));
4994        let resp = svc.describe_connection(&req).unwrap();
4995        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4996        assert_eq!(body["ConnectionState"], "DEAUTHORIZING");
4997    }
4998
4999    #[test]
5000    fn deauthorize_connection_not_found() {
5001        let svc = make_service();
5002        let req = make_request("DeauthorizeConnection", json!({ "Name": "ghost-conn" }));
5003        assert!(svc.deauthorize_connection(&req).is_err());
5004    }
5005
5006    // ---- Partner event source tests ----
5007
5008    #[test]
5009    fn partner_event_source_crud() {
5010        let svc = make_service();
5011
5012        // Create
5013        let req = make_request(
5014            "CreatePartnerEventSource",
5015            json!({ "Name": "partner/test", "Account": "123456789012" }),
5016        );
5017        svc.create_partner_event_source(&req).unwrap();
5018
5019        // Describe
5020        let req = make_request(
5021            "DescribePartnerEventSource",
5022            json!({ "Name": "partner/test" }),
5023        );
5024        let resp = svc.describe_partner_event_source(&req).unwrap();
5025        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5026        assert_eq!(body["Name"], "partner/test");
5027
5028        // List
5029        let req = make_request("ListPartnerEventSources", json!({"NamePrefix": "partner/"}));
5030        let resp = svc.list_partner_event_sources(&req).unwrap();
5031        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5032        assert_eq!(body["PartnerEventSources"].as_array().unwrap().len(), 1);
5033
5034        // ListPartnerEventSourceAccounts
5035        let req = make_request(
5036            "ListPartnerEventSourceAccounts",
5037            json!({ "EventSourceName": "partner/test" }),
5038        );
5039        let resp = svc.list_partner_event_source_accounts(&req).unwrap();
5040        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5041        assert_eq!(
5042            body["PartnerEventSourceAccounts"].as_array().unwrap().len(),
5043            1
5044        );
5045
5046        // DescribeEventSource
5047        let req = make_request("DescribeEventSource", json!({ "Name": "partner/test" }));
5048        let resp = svc.describe_event_source(&req).unwrap();
5049        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5050        assert_eq!(body["Name"], "partner/test");
5051        assert_eq!(body["State"], "ACTIVE");
5052
5053        // ListEventSources
5054        let req = make_request("ListEventSources", json!({}));
5055        let resp = svc.list_event_sources(&req).unwrap();
5056        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5057        assert_eq!(body["EventSources"].as_array().unwrap().len(), 1);
5058
5059        // Delete
5060        let req = make_request(
5061            "DeletePartnerEventSource",
5062            json!({ "Name": "partner/test", "Account": "123456789012" }),
5063        );
5064        svc.delete_partner_event_source(&req).unwrap();
5065
5066        // Verify gone
5067        let req = make_request(
5068            "DescribePartnerEventSource",
5069            json!({ "Name": "partner/test" }),
5070        );
5071        assert!(svc.describe_partner_event_source(&req).is_err());
5072    }
5073
5074    #[test]
5075    fn activate_deactivate_event_source() {
5076        let svc = make_service();
5077
5078        // Create a partner event source first
5079        let req = make_request(
5080            "CreatePartnerEventSource",
5081            json!({ "Name": "aws.partner/test", "Account": "123456789012" }),
5082        );
5083        svc.create_partner_event_source(&req).unwrap();
5084
5085        // Deactivate it
5086        let req = make_request(
5087            "DeactivateEventSource",
5088            json!({ "Name": "aws.partner/test" }),
5089        );
5090        svc.deactivate_event_source(&req).unwrap();
5091        {
5092            let state = svc.state.read();
5093            assert_eq!(
5094                state.partner_event_sources["aws.partner/test"].state,
5095                "INACTIVE"
5096            );
5097        }
5098
5099        // Activate it
5100        let req = make_request("ActivateEventSource", json!({ "Name": "aws.partner/test" }));
5101        svc.activate_event_source(&req).unwrap();
5102        {
5103            let state = svc.state.read();
5104            assert_eq!(
5105                state.partner_event_sources["aws.partner/test"].state,
5106                "ACTIVE"
5107            );
5108        }
5109
5110        // Not-found returns error
5111        let req = make_request("ActivateEventSource", json!({ "Name": "nonexistent" }));
5112        assert!(svc.activate_event_source(&req).is_err());
5113
5114        let req = make_request("DeactivateEventSource", json!({ "Name": "nonexistent" }));
5115        assert!(svc.deactivate_event_source(&req).is_err());
5116    }
5117
5118    #[test]
5119    fn delete_partner_event_source_verifies_account() {
5120        let svc = make_service();
5121
5122        // Create a partner event source
5123        let req = make_request(
5124            "CreatePartnerEventSource",
5125            json!({ "Name": "aws.partner/test", "Account": "123456789012" }),
5126        );
5127        svc.create_partner_event_source(&req).unwrap();
5128
5129        // Deleting with wrong account fails
5130        let req = make_request(
5131            "DeletePartnerEventSource",
5132            json!({ "Name": "aws.partner/test", "Account": "999999999999" }),
5133        );
5134        assert!(svc.delete_partner_event_source(&req).is_err());
5135        // Source still exists
5136        assert!(svc
5137            .state
5138            .read()
5139            .partner_event_sources
5140            .contains_key("aws.partner/test"));
5141
5142        // Deleting with correct account succeeds
5143        let req = make_request(
5144            "DeletePartnerEventSource",
5145            json!({ "Name": "aws.partner/test", "Account": "123456789012" }),
5146        );
5147        svc.delete_partner_event_source(&req).unwrap();
5148        assert!(!svc
5149            .state
5150            .read()
5151            .partner_event_sources
5152            .contains_key("aws.partner/test"));
5153
5154        // Deleting non-existent source returns error
5155        let req = make_request(
5156            "DeletePartnerEventSource",
5157            json!({ "Name": "aws.partner/test", "Account": "123456789012" }),
5158        );
5159        assert!(svc.delete_partner_event_source(&req).is_err());
5160    }
5161
5162    #[test]
5163    fn put_partner_events() {
5164        let svc = make_service();
5165        let req = make_request(
5166            "PutPartnerEvents",
5167            json!({
5168                "Entries": [
5169                    { "Source": "partner.app", "DetailType": "Test", "Detail": "{}" }
5170                ]
5171            }),
5172        );
5173        let resp = svc.put_partner_events(&req).unwrap();
5174        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5175        assert_eq!(body["FailedEntryCount"], 0);
5176        assert_eq!(body["Entries"].as_array().unwrap().len(), 1);
5177        assert!(body["Entries"][0]["EventId"].as_str().is_some());
5178    }
5179
5180    // ---- Archive + Replay delivery tests ----
5181
5182    /// Helper: create a service with a mock SQS delivery that records messages.
5183    #[allow(clippy::type_complexity)]
5184    fn make_service_with_sqs_recorder() -> (
5185        EventBridgeService,
5186        Arc<parking_lot::Mutex<Vec<(String, String)>>>,
5187    ) {
5188        use fakecloud_core::delivery::SqsDelivery;
5189
5190        struct RecordingSqsDelivery {
5191            messages: Arc<parking_lot::Mutex<Vec<(String, String)>>>,
5192        }
5193
5194        impl SqsDelivery for RecordingSqsDelivery {
5195            fn deliver_to_queue(
5196                &self,
5197                queue_arn: &str,
5198                message_body: &str,
5199                _attributes: &HashMap<String, String>,
5200            ) {
5201                self.messages
5202                    .lock()
5203                    .push((queue_arn.to_string(), message_body.to_string()));
5204            }
5205        }
5206
5207        let messages: Arc<parking_lot::Mutex<Vec<(String, String)>>> =
5208            Arc::new(parking_lot::Mutex::new(Vec::new()));
5209        let state = Arc::new(RwLock::new(EventBridgeState::new(
5210            "123456789012",
5211            "us-east-1",
5212        )));
5213        let delivery = Arc::new(DeliveryBus::new().with_sqs(Arc::new(RecordingSqsDelivery {
5214            messages: messages.clone(),
5215        })));
5216        let svc = EventBridgeService::new(state, delivery);
5217        (svc, messages)
5218    }
5219
5220    #[test]
5221    fn start_replay_delivers_archived_events_to_sqs_target() {
5222        let (svc, messages) = make_service_with_sqs_recorder();
5223        let queue_arn = "arn:aws:sqs:us-east-1:123456789012:replay-queue";
5224
5225        // Create a rule with an SQS target
5226        let req = make_request(
5227            "PutRule",
5228            json!({
5229                "Name": "replay-test-rule",
5230                "EventPattern": r#"{"source": ["my.app"]}"#,
5231                "State": "ENABLED"
5232            }),
5233        );
5234        svc.put_rule(&req).unwrap();
5235
5236        let req = make_request(
5237            "PutTargets",
5238            json!({
5239                "Rule": "replay-test-rule",
5240                "Targets": [{
5241                    "Id": "sqs-target",
5242                    "Arn": queue_arn
5243                }]
5244            }),
5245        );
5246        svc.put_targets(&req).unwrap();
5247
5248        // Create an archive on the default bus
5249        let req = make_request(
5250            "CreateArchive",
5251            json!({
5252                "ArchiveName": "test-archive",
5253                "EventSourceArn": "arn:aws:events:us-east-1:123456789012:event-bus/default"
5254            }),
5255        );
5256        svc.create_archive(&req).unwrap();
5257
5258        // PutEvents: these should get archived and delivered
5259        let req = make_request(
5260            "PutEvents",
5261            json!({
5262                "Entries": [
5263                    {
5264                        "Source": "my.app",
5265                        "DetailType": "OrderCreated",
5266                        "Detail": "{\"orderId\": \"1\"}",
5267                        "EventBusName": "default"
5268                    },
5269                    {
5270                        "Source": "my.app",
5271                        "DetailType": "OrderShipped",
5272                        "Detail": "{\"orderId\": \"2\"}",
5273                        "EventBusName": "default"
5274                    }
5275                ]
5276            }),
5277        );
5278        let resp = svc.put_events(&req).unwrap();
5279        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5280        assert_eq!(body["FailedEntryCount"], 0);
5281
5282        // Verify archive has 2 events
5283        {
5284            let state = svc.state.read();
5285            let archive = state.archives.get("test-archive").unwrap();
5286            assert_eq!(archive.events.len(), 2);
5287            assert_eq!(archive.event_count, 2);
5288        }
5289
5290        // Clear recorded messages from PutEvents delivery
5291        messages.lock().clear();
5292
5293        // StartReplay: should re-deliver the archived events
5294        let archive_arn = {
5295            let state = svc.state.read();
5296            state.archives.get("test-archive").unwrap().arn.clone()
5297        };
5298
5299        // Use a wide time range to capture all events
5300        let start_ts = 0.0_f64;
5301        let end_ts = (chrono::Utc::now().timestamp() + 3600) as f64;
5302
5303        let req = make_request(
5304            "StartReplay",
5305            json!({
5306                "ReplayName": "my-replay",
5307                "EventSourceArn": archive_arn,
5308                "Destination": {
5309                    "Arn": "arn:aws:events:us-east-1:123456789012:event-bus/default"
5310                },
5311                "EventStartTime": start_ts,
5312                "EventEndTime": end_ts
5313            }),
5314        );
5315        let resp = svc.start_replay(&req).unwrap();
5316        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5317        assert_eq!(body["State"], "STARTING");
5318
5319        // Verify the replay delivered events to SQS
5320        let delivered = messages.lock();
5321        assert_eq!(
5322            delivered.len(),
5323            2,
5324            "expected 2 replayed events delivered to SQS"
5325        );
5326        for (arn, msg) in delivered.iter() {
5327            assert_eq!(arn, queue_arn);
5328            let event: Value = serde_json::from_str(msg).unwrap();
5329            assert_eq!(event["source"], "my.app");
5330            // Replayed events should include replay-name
5331            assert!(event["replay-name"].as_str().is_some());
5332        }
5333
5334        // Verify replay is marked as COMPLETED
5335        let state = svc.state.read();
5336        let replay = state.replays.get("my-replay").unwrap();
5337        assert_eq!(replay.state, "COMPLETED");
5338    }
5339
5340    #[test]
5341    fn apply_connection_auth_api_key() {
5342        let conn = Connection {
5343            name: "test-conn".to_string(),
5344            arn: "arn:aws:events:us-east-1:123456789012:connection/test-conn/uuid".to_string(),
5345            description: None,
5346            authorization_type: "API_KEY".to_string(),
5347            auth_parameters: json!({
5348                "ApiKeyAuthParameters": {
5349                    "ApiKeyName": "x-api-key",
5350                    "ApiKeyValue": "my-secret"
5351                }
5352            }),
5353            connection_state: "AUTHORIZED".to_string(),
5354            secret_arn: "arn:aws:secretsmanager:us-east-1:123456789012:secret:test".to_string(),
5355            creation_time: Utc::now(),
5356            last_modified_time: Utc::now(),
5357            last_authorized_time: Utc::now(),
5358        };
5359
5360        let client = reqwest::Client::new();
5361        let builder = client
5362            .post("http://localhost:12345/test")
5363            .header("Content-Type", "application/json");
5364        let builder = apply_connection_auth(builder, &conn);
5365
5366        // Build and verify the header was applied
5367        let request = builder.body("{}").build().unwrap();
5368        assert_eq!(
5369            request
5370                .headers()
5371                .get("x-api-key")
5372                .unwrap()
5373                .to_str()
5374                .unwrap(),
5375            "my-secret"
5376        );
5377    }
5378
5379    #[test]
5380    fn apply_connection_auth_basic() {
5381        let conn = Connection {
5382            name: "basic-conn".to_string(),
5383            arn: "arn:aws:events:us-east-1:123456789012:connection/basic-conn/uuid".to_string(),
5384            description: None,
5385            authorization_type: "BASIC".to_string(),
5386            auth_parameters: json!({
5387                "BasicAuthParameters": {
5388                    "Username": "user",
5389                    "Password": "pass"
5390                }
5391            }),
5392            connection_state: "AUTHORIZED".to_string(),
5393            secret_arn: "arn:aws:secretsmanager:us-east-1:123456789012:secret:test".to_string(),
5394            creation_time: Utc::now(),
5395            last_modified_time: Utc::now(),
5396            last_authorized_time: Utc::now(),
5397        };
5398
5399        let client = reqwest::Client::new();
5400        let builder = client.post("http://localhost:12345/test");
5401        let builder = apply_connection_auth(builder, &conn);
5402
5403        let request = builder.body("{}").build().unwrap();
5404        let auth_header = request
5405            .headers()
5406            .get("authorization")
5407            .unwrap()
5408            .to_str()
5409            .unwrap();
5410        assert!(
5411            auth_header.starts_with("Basic "),
5412            "Expected Basic auth header, got: {auth_header}"
5413        );
5414    }
5415
5416    #[tokio::test]
5417    async fn put_events_with_api_destination_target_resolves_destination() {
5418        // This test verifies that the PutEvents code path correctly identifies
5419        // api-destination ARN targets and resolves the destination metadata.
5420        // The actual HTTP call goes to a non-existent host (fire-and-forget).
5421        let state = Arc::new(RwLock::new(EventBridgeState::new(
5422            "123456789012",
5423            "us-east-1",
5424        )));
5425        let delivery = Arc::new(DeliveryBus::new());
5426        let svc = EventBridgeService::new(state, delivery);
5427
5428        // Create connection and api destination
5429        create_connection(&svc, "my-conn");
5430        let conn_arn = {
5431            let state = svc.state.read();
5432            state.connections.get("my-conn").unwrap().arn.clone()
5433        };
5434        let req = make_request(
5435            "CreateApiDestination",
5436            json!({
5437                "Name": "my-dest",
5438                "ConnectionArn": conn_arn,
5439                "InvocationEndpoint": "http://127.0.0.1:1/noop",
5440                "HttpMethod": "POST"
5441            }),
5442        );
5443        svc.create_api_destination(&req).unwrap();
5444
5445        let dest_arn = {
5446            let state = svc.state.read();
5447            state.api_destinations.get("my-dest").unwrap().arn.clone()
5448        };
5449
5450        // Create a rule that targets the api-destination
5451        let req = make_request(
5452            "PutRule",
5453            json!({
5454                "Name": "api-dest-rule",
5455                "EventPattern": r#"{"source":["test.app"]}"#,
5456                "State": "ENABLED"
5457            }),
5458        );
5459        svc.put_rule(&req).unwrap();
5460
5461        let req = make_request(
5462            "PutTargets",
5463            json!({
5464                "Rule": "api-dest-rule",
5465                "Targets": [{ "Id": "dest-target", "Arn": dest_arn }]
5466            }),
5467        );
5468        svc.put_targets(&req).unwrap();
5469
5470        // PutEvents - should match the rule and attempt delivery to ApiDestination
5471        let req = make_request(
5472            "PutEvents",
5473            json!({
5474                "Entries": [{
5475                    "Source": "test.app",
5476                    "DetailType": "TestEvent",
5477                    "Detail": r#"{"key":"value"}"#
5478                }]
5479            }),
5480        );
5481        let resp = svc.put_events(&req).unwrap();
5482        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
5483        assert_eq!(body["FailedEntryCount"], 0);
5484        assert_eq!(body["Entries"].as_array().unwrap().len(), 1);
5485        assert!(body["Entries"][0]["EventId"].as_str().is_some());
5486    }
5487
5488    #[test]
5489    fn test_function_name_from_arn() {
5490        // Unqualified ARN
5491        assert_eq!(
5492            super::function_name_from_arn("arn:aws:lambda:us-east-1:123456789012:function:my-func"),
5493            "my-func"
5494        );
5495        // Qualified ARN with alias
5496        assert_eq!(
5497            super::function_name_from_arn(
5498                "arn:aws:lambda:us-east-1:123456789012:function:my-func:prod"
5499            ),
5500            "my-func"
5501        );
5502        // Qualified ARN with version
5503        assert_eq!(
5504            super::function_name_from_arn(
5505                "arn:aws:lambda:us-east-1:123456789012:function:my-func:42"
5506            ),
5507            "my-func"
5508        );
5509        // Plain function name (not an ARN)
5510        assert_eq!(super::function_name_from_arn("my-func"), "my-func");
5511    }
5512}