Skip to main content

fakecloud_eventbridge/
service.rs

1use async_trait::async_trait;
2use chrono::{DateTime, Utc};
3use http::StatusCode;
4use serde_json::{json, Value};
5
6use std::collections::HashMap;
7use std::sync::Arc;
8
9use fakecloud_core::delivery::DeliveryBus;
10use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
11use fakecloud_core::validation::*;
12
13use fakecloud_lambda::state::{LambdaInvocation, SharedLambdaState};
14use fakecloud_logs::state::SharedLogsState;
15
16use crate::state::{
17    ApiDestination, Archive, Connection, Endpoint, EventBus, EventRule, EventTarget,
18    PartnerEventSource, PutEvent, Replay, SharedEventBridgeState,
19};
20
21pub struct EventBridgeService {
22    state: SharedEventBridgeState,
23    delivery: Arc<DeliveryBus>,
24    lambda_state: Option<SharedLambdaState>,
25    logs_state: Option<SharedLogsState>,
26}
27
28impl EventBridgeService {
29    pub fn new(state: SharedEventBridgeState, delivery: Arc<DeliveryBus>) -> Self {
30        Self {
31            state,
32            delivery,
33            lambda_state: None,
34            logs_state: None,
35        }
36    }
37
38    pub fn with_lambda(mut self, lambda_state: SharedLambdaState) -> Self {
39        self.lambda_state = Some(lambda_state);
40        self
41    }
42
43    pub fn with_logs(mut self, logs_state: SharedLogsState) -> Self {
44        self.logs_state = Some(logs_state);
45        self
46    }
47}
48
49#[async_trait]
50impl AwsService for EventBridgeService {
51    fn service_name(&self) -> &str {
52        "events"
53    }
54
55    async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
56        match req.action.as_str() {
57            "CreateEventBus" => self.create_event_bus(&req),
58            "DeleteEventBus" => self.delete_event_bus(&req),
59            "ListEventBuses" => self.list_event_buses(&req),
60            "DescribeEventBus" => self.describe_event_bus(&req),
61            "PutRule" => self.put_rule(&req),
62            "DeleteRule" => self.delete_rule(&req),
63            "ListRules" => self.list_rules(&req),
64            "DescribeRule" => self.describe_rule(&req),
65            "EnableRule" => self.enable_rule(&req),
66            "DisableRule" => self.disable_rule(&req),
67            "PutTargets" => self.put_targets(&req),
68            "RemoveTargets" => self.remove_targets(&req),
69            "ListTargetsByRule" => self.list_targets_by_rule(&req),
70            "ListRuleNamesByTarget" => self.list_rule_names_by_target(&req),
71            "PutEvents" => self.put_events(&req),
72            "PutPermission" => self.put_permission(&req),
73            "RemovePermission" => self.remove_permission(&req),
74            "TagResource" => self.tag_resource(&req),
75            "UntagResource" => self.untag_resource(&req),
76            "ListTagsForResource" => self.list_tags_for_resource(&req),
77            "CreateArchive" => self.create_archive(&req),
78            "DescribeArchive" => self.describe_archive(&req),
79            "ListArchives" => self.list_archives(&req),
80            "UpdateArchive" => self.update_archive(&req),
81            "DeleteArchive" => self.delete_archive(&req),
82            "CreateConnection" => self.create_connection(&req),
83            "DescribeConnection" => self.describe_connection(&req),
84            "ListConnections" => self.list_connections(&req),
85            "UpdateConnection" => self.update_connection(&req),
86            "DeleteConnection" => self.delete_connection(&req),
87            "CreateApiDestination" => self.create_api_destination(&req),
88            "DescribeApiDestination" => self.describe_api_destination(&req),
89            "ListApiDestinations" => self.list_api_destinations(&req),
90            "UpdateApiDestination" => self.update_api_destination(&req),
91            "DeleteApiDestination" => self.delete_api_destination(&req),
92            "StartReplay" => self.start_replay(&req),
93            "DescribeReplay" => self.describe_replay(&req),
94            "ListReplays" => self.list_replays(&req),
95            "CancelReplay" => self.cancel_replay(&req),
96            "CreatePartnerEventSource" => self.create_partner_event_source(&req),
97            "DeletePartnerEventSource" => self.delete_partner_event_source(&req),
98            "DescribePartnerEventSource" => self.describe_partner_event_source(&req),
99            "ListPartnerEventSources" => self.list_partner_event_sources(&req),
100            "ListPartnerEventSourceAccounts" => self.list_partner_event_source_accounts(&req),
101            "ActivateEventSource" => self.activate_event_source(&req),
102            "DeactivateEventSource" => self.deactivate_event_source(&req),
103            "DescribeEventSource" => self.describe_event_source(&req),
104            "ListEventSources" => self.list_event_sources(&req),
105            "PutPartnerEvents" => self.put_partner_events(&req),
106            "TestEventPattern" => self.test_event_pattern(&req),
107            "UpdateEventBus" => self.update_event_bus(&req),
108            "CreateEndpoint" => self.create_endpoint(&req),
109            "DeleteEndpoint" => self.delete_endpoint(&req),
110            "DescribeEndpoint" => self.describe_endpoint(&req),
111            "ListEndpoints" => self.list_endpoints(&req),
112            "UpdateEndpoint" => self.update_endpoint(&req),
113            "DeauthorizeConnection" => self.deauthorize_connection(&req),
114            _ => Err(AwsServiceError::action_not_implemented(
115                "events",
116                &req.action,
117            )),
118        }
119    }
120
121    fn supported_actions(&self) -> &[&str] {
122        &[
123            "CreateEventBus",
124            "DeleteEventBus",
125            "ListEventBuses",
126            "DescribeEventBus",
127            "PutRule",
128            "DeleteRule",
129            "ListRules",
130            "DescribeRule",
131            "EnableRule",
132            "DisableRule",
133            "PutTargets",
134            "RemoveTargets",
135            "ListTargetsByRule",
136            "ListRuleNamesByTarget",
137            "PutEvents",
138            "PutPermission",
139            "RemovePermission",
140            "TagResource",
141            "UntagResource",
142            "ListTagsForResource",
143            "CreateArchive",
144            "DescribeArchive",
145            "ListArchives",
146            "UpdateArchive",
147            "DeleteArchive",
148            "CreateConnection",
149            "DescribeConnection",
150            "ListConnections",
151            "UpdateConnection",
152            "DeleteConnection",
153            "CreateApiDestination",
154            "DescribeApiDestination",
155            "ListApiDestinations",
156            "UpdateApiDestination",
157            "DeleteApiDestination",
158            "StartReplay",
159            "DescribeReplay",
160            "ListReplays",
161            "CancelReplay",
162            "CreatePartnerEventSource",
163            "DeletePartnerEventSource",
164            "DescribePartnerEventSource",
165            "ListPartnerEventSources",
166            "ListPartnerEventSourceAccounts",
167            "ActivateEventSource",
168            "DeactivateEventSource",
169            "DescribeEventSource",
170            "ListEventSources",
171            "PutPartnerEvents",
172            "TestEventPattern",
173            "UpdateEventBus",
174            "CreateEndpoint",
175            "DeleteEndpoint",
176            "DescribeEndpoint",
177            "ListEndpoints",
178            "UpdateEndpoint",
179            "DeauthorizeConnection",
180        ]
181    }
182}
183
184fn parse_body(req: &AwsRequest) -> Value {
185    serde_json::from_slice(&req.body).unwrap_or(Value::Object(Default::default()))
186}
187
188fn json_resp(body: Value) -> AwsResponse {
189    AwsResponse::json(StatusCode::OK, serde_json::to_string(&body).unwrap())
190}
191
192fn parse_tags(body: &Value) -> HashMap<String, String> {
193    let mut tags = HashMap::new();
194    if let Some(arr) = body["Tags"].as_array() {
195        for tag in arr {
196            if let (Some(key), Some(val)) = (tag["Key"].as_str(), tag["Value"].as_str()) {
197                tags.insert(key.to_string(), val.to_string());
198            }
199        }
200    }
201    tags
202}
203
204fn parse_target(target: &Value) -> EventTarget {
205    EventTarget {
206        id: target["Id"].as_str().unwrap_or("").to_string(),
207        arn: target["Arn"].as_str().unwrap_or("").to_string(),
208        input: target["Input"].as_str().map(|s| s.to_string()),
209        input_path: target["InputPath"].as_str().map(|s| s.to_string()),
210        input_transformer: target.get("InputTransformer").cloned(),
211        sqs_parameters: target.get("SqsParameters").cloned(),
212    }
213}
214
215fn target_to_json(t: &EventTarget) -> Value {
216    let mut obj = json!({ "Id": t.id, "Arn": t.arn });
217    if let Some(ref input) = t.input {
218        obj["Input"] = json!(input);
219    }
220    if let Some(ref input_path) = t.input_path {
221        obj["InputPath"] = json!(input_path);
222    }
223    if let Some(ref it) = t.input_transformer {
224        obj["InputTransformer"] = it.clone();
225    }
226    if let Some(ref sp) = t.sqs_parameters {
227        obj["SqsParameters"] = sp.clone();
228    }
229    obj
230}
231
232// ─── Event Bus Operations ───────────────────────────────────────────
233impl EventBridgeService {
234    fn create_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
235        let body = parse_body(req);
236        validate_required("Name", &body["Name"])?;
237        let name = body["Name"]
238            .as_str()
239            .ok_or_else(|| missing("Name"))?
240            .to_string();
241        validate_string_length("name", &name, 1, 256)?;
242        validate_optional_string_length(
243            "eventSourceName",
244            body["EventSourceName"].as_str(),
245            1,
246            256,
247        )?;
248        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
249        validate_optional_string_length(
250            "kmsKeyIdentifier",
251            body["KmsKeyIdentifier"].as_str(),
252            0,
253            2048,
254        )?;
255
256        // Validate name doesn't contain '/' (unless partner bus)
257        if name.contains('/') && !name.starts_with("aws.partner/") {
258            return Err(AwsServiceError::aws_error(
259                StatusCode::BAD_REQUEST,
260                "ValidationException",
261                "Event bus name must not contain '/'.",
262            ));
263        }
264
265        // Partner event bus validation
266        if name.starts_with("aws.partner/") {
267            let event_source = body["EventSourceName"].as_str().unwrap_or("");
268            let state_r = self.state.read();
269            let has_source = state_r.partner_event_sources.contains_key(event_source);
270            drop(state_r);
271            if !has_source {
272                return Err(AwsServiceError::aws_error(
273                    StatusCode::BAD_REQUEST,
274                    "ResourceNotFoundException",
275                    format!("Event source {event_source} does not exist."),
276                ));
277            }
278        }
279
280        let mut state = self.state.write();
281
282        if state.buses.contains_key(&name) {
283            return Err(AwsServiceError::aws_error(
284                StatusCode::BAD_REQUEST,
285                "ResourceAlreadyExistsException",
286                format!("Event bus {name} already exists."),
287            ));
288        }
289
290        let arn = format!(
291            "arn:aws:events:{}:{}:event-bus/{}",
292            req.region, state.account_id, name
293        );
294        let now = Utc::now();
295        let description = body["Description"].as_str().map(|s| s.to_string());
296        let kms_key_identifier = body["KmsKeyIdentifier"].as_str().map(|s| s.to_string());
297        let dead_letter_config = body.get("DeadLetterConfig").cloned();
298
299        let tags = parse_tags(&body);
300
301        let bus = EventBus {
302            name: name.clone(),
303            arn: arn.clone(),
304            tags,
305            policy: None,
306            description,
307            kms_key_identifier,
308            dead_letter_config,
309            creation_time: now,
310            last_modified_time: now,
311        };
312        state.buses.insert(name, bus);
313
314        Ok(json_resp(json!({ "EventBusArn": arn })))
315    }
316
317    fn delete_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
318        let body = parse_body(req);
319        validate_required("Name", &body["Name"])?;
320        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
321        validate_string_length("name", name, 1, 256)?;
322
323        if name == "default" {
324            return Err(AwsServiceError::aws_error(
325                StatusCode::BAD_REQUEST,
326                "ValidationException",
327                format!("Cannot delete event bus {name}."),
328            ));
329        }
330
331        let mut state = self.state.write();
332        state.buses.remove(name);
333        state.rules.retain(|k, _| k.0 != name);
334
335        Ok(json_resp(json!({})))
336    }
337
338    fn list_event_buses(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
339        let body = parse_body(req);
340        validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 256)?;
341        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
342        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
343        let name_prefix = body["NamePrefix"].as_str();
344        let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
345        let offset: usize = match body["NextToken"].as_str() {
346            Some(t) => t.parse().map_err(|_| {
347                AwsServiceError::aws_error(
348                    StatusCode::BAD_REQUEST,
349                    "InvalidNextTokenException",
350                    format!("Invalid NextToken value: '{t}'"),
351                )
352            })?,
353            None => 0,
354        };
355
356        let state = self.state.read();
357        let filtered: Vec<Value> = state
358            .buses
359            .values()
360            .filter(|b| match name_prefix {
361                Some(prefix) => b.name.starts_with(prefix),
362                None => true,
363            })
364            .skip(offset)
365            .take(limit + 1)
366            .map(|b| json!({ "Name": b.name, "Arn": b.arn }))
367            .collect();
368
369        let has_more = filtered.len() > limit;
370        let buses: Vec<Value> = filtered.into_iter().take(limit).collect();
371        let mut resp = json!({ "EventBuses": buses });
372        if has_more {
373            resp["NextToken"] = json!((offset + limit).to_string());
374        }
375
376        Ok(json_resp(resp))
377    }
378
379    fn describe_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
380        let body = parse_body(req);
381        validate_optional_string_length("name", body["Name"].as_str(), 1, 1600)?;
382        let name = body["Name"].as_str().unwrap_or("default");
383
384        let state = self.state.read();
385        let bus = state.buses.get(name).ok_or_else(|| {
386            AwsServiceError::aws_error(
387                StatusCode::BAD_REQUEST,
388                "ResourceNotFoundException",
389                format!("Event bus {name} does not exist."),
390            )
391        })?;
392
393        let mut resp = json!({
394            "Name": bus.name,
395            "Arn": bus.arn,
396            "CreationTime": bus.creation_time.timestamp() as f64,
397            "LastModifiedTime": bus.last_modified_time.timestamp() as f64,
398        });
399
400        if let Some(ref policy) = bus.policy {
401            resp["Policy"] = Value::String(serde_json::to_string(policy).unwrap());
402        }
403        if let Some(ref desc) = bus.description {
404            resp["Description"] = json!(desc);
405        }
406        if let Some(ref kms) = bus.kms_key_identifier {
407            resp["KmsKeyIdentifier"] = json!(kms);
408        }
409        if let Some(ref dlc) = bus.dead_letter_config {
410            resp["DeadLetterConfig"] = dlc.clone();
411        }
412
413        Ok(json_resp(resp))
414    }
415
416    // ─── Permission Operations ──────────────────────────────────────────
417
418    fn put_permission(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
419        let body = parse_body(req);
420        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 256)?;
421        validate_optional_string_length("action", body["Action"].as_str(), 1, 64)?;
422        validate_optional_string_length("principal", body["Principal"].as_str(), 1, 12)?;
423        validate_optional_string_length("statementId", body["StatementId"].as_str(), 1, 64)?;
424        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
425
426        let mut state = self.state.write();
427
428        let bus = state.buses.get_mut(event_bus_name).ok_or_else(|| {
429            AwsServiceError::aws_error(
430                StatusCode::BAD_REQUEST,
431                "ResourceNotFoundException",
432                format!("Event bus {event_bus_name} does not exist."),
433            )
434        })?;
435
436        // Check if Policy is provided (new-style)
437        if let Some(policy_str) = body["Policy"].as_str() {
438            if let Ok(policy) = serde_json::from_str::<Value>(policy_str) {
439                bus.policy = Some(policy);
440                return Ok(json_resp(json!({})));
441            }
442        }
443
444        // Old-style: Action, Principal, StatementId
445        let action = body["Action"].as_str().unwrap_or("");
446        let principal = body["Principal"].as_str().unwrap_or("");
447        let statement_id = body["StatementId"].as_str().unwrap_or("");
448
449        // Validate action
450        if action != "events:PutEvents" {
451            return Err(AwsServiceError::aws_error(
452                StatusCode::BAD_REQUEST,
453                "ValidationException",
454                "Provided value in parameter 'action' is not supported.",
455            ));
456        }
457
458        let statement = json!({
459            "Sid": statement_id,
460            "Effect": "Allow",
461            "Principal": { "AWS": format!("arn:aws:iam::{principal}:root") },
462            "Action": action,
463            "Resource": bus.arn,
464        });
465
466        let policy = bus.policy.get_or_insert_with(|| {
467            json!({
468                "Version": "2012-10-17",
469                "Statement": [],
470            })
471        });
472
473        if let Some(stmts) = policy["Statement"].as_array_mut() {
474            stmts.push(statement);
475        }
476
477        Ok(json_resp(json!({})))
478    }
479
480    fn remove_permission(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
481        let body = parse_body(req);
482        validate_optional_string_length("statementId", body["StatementId"].as_str(), 1, 64)?;
483        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 256)?;
484        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
485        let statement_id = body["StatementId"].as_str().unwrap_or("");
486        let remove_all = body["RemoveAllPermissions"].as_bool().unwrap_or(false);
487
488        let mut state = self.state.write();
489
490        let bus = state.buses.get_mut(event_bus_name).ok_or_else(|| {
491            AwsServiceError::aws_error(
492                StatusCode::BAD_REQUEST,
493                "ResourceNotFoundException",
494                format!("Event bus {event_bus_name} does not exist."),
495            )
496        })?;
497
498        if remove_all {
499            bus.policy = None;
500            return Ok(json_resp(json!({})));
501        }
502
503        let policy = bus.policy.as_mut().ok_or_else(|| {
504            AwsServiceError::aws_error(
505                StatusCode::BAD_REQUEST,
506                "ResourceNotFoundException",
507                "EventBus does not have a policy.",
508            )
509        })?;
510
511        if let Some(stmts) = policy["Statement"].as_array_mut() {
512            let before = stmts.len();
513            stmts.retain(|s| s["Sid"].as_str() != Some(statement_id));
514            if stmts.len() == before {
515                return Err(AwsServiceError::aws_error(
516                    StatusCode::BAD_REQUEST,
517                    "ResourceNotFoundException",
518                    "Statement with the provided id does not exist.",
519                ));
520            }
521            if stmts.is_empty() {
522                bus.policy = None;
523            }
524        }
525
526        Ok(json_resp(json!({})))
527    }
528
529    // ─── Rule Operations ────────────────────────────────────────────────
530
531    fn put_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
532        let body = parse_body(req);
533        validate_required("Name", &body["Name"])?;
534        let name = body["Name"]
535            .as_str()
536            .ok_or_else(|| missing("Name"))?
537            .to_string();
538        validate_string_length("name", &name, 1, 64)?;
539        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
540        validate_optional_string_length(
541            "scheduleExpression",
542            body["ScheduleExpression"].as_str(),
543            0,
544            256,
545        )?;
546        validate_optional_string_length("eventPattern", body["EventPattern"].as_str(), 0, 4096)?;
547        validate_optional_enum(
548            "state",
549            body["State"].as_str(),
550            &[
551                "ENABLED",
552                "DISABLED",
553                "ENABLED_WITH_ALL_CLOUDTRAIL_MANAGEMENT_EVENTS",
554            ],
555        )?;
556        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
557        validate_optional_string_length("roleArn", body["RoleArn"].as_str(), 1, 1600)?;
558
559        let raw_bus = body["EventBusName"]
560            .as_str()
561            .unwrap_or("default")
562            .to_string();
563
564        let mut state = self.state.write();
565        let event_bus_name = state.resolve_bus_name(&raw_bus);
566
567        let event_pattern = body["EventPattern"].as_str().and_then(|s| {
568            if s.is_empty() {
569                None
570            } else {
571                Some(s.to_string())
572            }
573        });
574        let schedule_expression = body["ScheduleExpression"].as_str().and_then(|s| {
575            if s.is_empty() {
576                None
577            } else {
578                Some(s.to_string())
579            }
580        });
581        let description = body["Description"].as_str().map(|s| s.to_string());
582        let role_arn = body["RoleArn"].as_str().map(|s| s.to_string());
583        let rule_state = body["State"].as_str().unwrap_or("ENABLED").to_string();
584
585        // Validate: schedule expressions only on default bus
586        if schedule_expression.is_some() && event_bus_name != "default" {
587            return Err(AwsServiceError::aws_error(
588                StatusCode::BAD_REQUEST,
589                "ValidationException",
590                "ScheduleExpression is supported only on the default event bus.",
591            ));
592        }
593
594        if !state.buses.contains_key(&event_bus_name) {
595            return Err(AwsServiceError::aws_error(
596                StatusCode::BAD_REQUEST,
597                "ResourceNotFoundException",
598                format!("Event bus {event_bus_name} does not exist."),
599            ));
600        }
601
602        let arn = if event_bus_name == "default" {
603            format!(
604                "arn:aws:events:{}:{}:rule/{}",
605                req.region, state.account_id, name
606            )
607        } else {
608            format!(
609                "arn:aws:events:{}:{}:rule/{}/{}",
610                req.region, state.account_id, event_bus_name, name
611            )
612        };
613
614        let key = (event_bus_name.clone(), name.clone());
615        let targets = state
616            .rules
617            .get(&key)
618            .map(|r| r.targets.clone())
619            .unwrap_or_default();
620
621        let tags = parse_tags(&body);
622
623        let rule = EventRule {
624            name: name.clone(),
625            arn: arn.clone(),
626            event_bus_name,
627            event_pattern,
628            schedule_expression,
629            state: rule_state,
630            description,
631            role_arn,
632            managed_by: None,
633            created_by: None,
634            targets,
635            tags,
636            last_fired: None,
637        };
638
639        state.rules.insert(key, rule);
640        Ok(json_resp(json!({ "RuleArn": arn })))
641    }
642
643    fn delete_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
644        let body = parse_body(req);
645        validate_required("Name", &body["Name"])?;
646        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
647        validate_string_length("name", name, 1, 64)?;
648        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
649        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
650
651        let mut state = self.state.write();
652        let bus_name = state.resolve_bus_name(event_bus_name);
653        let key = (bus_name, name.to_string());
654
655        // Check if rule has targets
656        if let Some(rule) = state.rules.get(&key) {
657            if !rule.targets.is_empty() {
658                return Err(AwsServiceError::aws_error(
659                    StatusCode::BAD_REQUEST,
660                    "ValidationException",
661                    "Rule can't be deleted since it has targets.",
662                ));
663            }
664        }
665
666        state.rules.remove(&key);
667        Ok(json_resp(json!({})))
668    }
669
670    fn list_rules(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
671        let body = parse_body(req);
672        validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 64)?;
673        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
674        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
675        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
676        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
677        let name_prefix = body["NamePrefix"].as_str();
678        let limit = body["Limit"].as_u64().map(|n| n as usize);
679        let next_token = body["NextToken"].as_str();
680
681        let state = self.state.read();
682        let bus_name = state.resolve_bus_name(event_bus_name);
683
684        let mut rules: Vec<&EventRule> = state
685            .rules
686            .values()
687            .filter(|r| r.event_bus_name == bus_name)
688            .filter(|r| match name_prefix {
689                Some(prefix) => r.name.starts_with(prefix),
690                None => true,
691            })
692            .collect();
693        rules.sort_by(|a, b| a.name.cmp(&b.name));
694
695        // Pagination
696        let start = next_token
697            .and_then(|t| t.parse::<usize>().ok())
698            .unwrap_or(0)
699            .min(rules.len());
700        let rules_slice = &rules[start..];
701
702        let (page, new_next_token) = if let Some(lim) = limit {
703            if rules_slice.len() > lim {
704                (&rules_slice[..lim], Some((start + lim).to_string()))
705            } else {
706                (rules_slice, None)
707            }
708        } else {
709            (rules_slice, None)
710        };
711
712        let rules_json: Vec<Value> = page
713            .iter()
714            .map(|r| {
715                let mut obj = json!({
716                    "Name": r.name,
717                    "Arn": r.arn,
718                    "EventBusName": r.event_bus_name,
719                    "State": r.state,
720                });
721                if let Some(ref desc) = r.description {
722                    obj["Description"] = json!(desc);
723                }
724                if let Some(ref ep) = r.event_pattern {
725                    obj["EventPattern"] = json!(ep);
726                }
727                if let Some(ref se) = r.schedule_expression {
728                    obj["ScheduleExpression"] = json!(se);
729                }
730                if let Some(ref mb) = r.managed_by {
731                    obj["ManagedBy"] = json!(mb);
732                }
733                obj
734            })
735            .collect();
736
737        let mut resp = json!({ "Rules": rules_json });
738        if let Some(token) = new_next_token {
739            resp["NextToken"] = json!(token);
740        }
741
742        Ok(json_resp(resp))
743    }
744
745    fn describe_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
746        let body = parse_body(req);
747        validate_required("Name", &body["Name"])?;
748        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
749        validate_string_length("name", name, 1, 64)?;
750        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
751        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
752
753        let state = self.state.read();
754        let bus_name = state.resolve_bus_name(event_bus_name);
755        let key = (bus_name.clone(), name.to_string());
756
757        let rule = state.rules.get(&key).ok_or_else(|| {
758            AwsServiceError::aws_error(
759                StatusCode::BAD_REQUEST,
760                "ResourceNotFoundException",
761                format!("Rule {name} does not exist."),
762            )
763        })?;
764
765        let mut resp = json!({
766            "Name": rule.name,
767            "Arn": rule.arn,
768            "EventBusName": rule.event_bus_name,
769            "State": rule.state,
770        });
771
772        if let Some(ref desc) = rule.description {
773            resp["Description"] = json!(desc);
774        }
775        if let Some(ref ep) = rule.event_pattern {
776            resp["EventPattern"] = json!(ep);
777        }
778        if let Some(ref se) = rule.schedule_expression {
779            resp["ScheduleExpression"] = json!(se);
780        }
781        if let Some(ref role) = rule.role_arn {
782            resp["RoleArn"] = json!(role);
783        }
784        if let Some(ref mb) = rule.managed_by {
785            resp["ManagedBy"] = json!(mb);
786        }
787        if let Some(ref cb) = rule.created_by {
788            resp["CreatedBy"] = json!(cb);
789        }
790        // If non-default bus, set CreatedBy to account_id
791        if rule.event_bus_name != "default" && rule.created_by.is_none() {
792            resp["CreatedBy"] = json!(state.account_id);
793        }
794
795        Ok(json_resp(resp))
796    }
797
798    fn enable_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
799        let body = parse_body(req);
800        validate_required("Name", &body["Name"])?;
801        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
802        validate_string_length("name", name, 1, 64)?;
803        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
804        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
805
806        let mut state = self.state.write();
807        let bus_name = state.resolve_bus_name(event_bus_name);
808        let key = (bus_name, name.to_string());
809
810        let rule = state.rules.get_mut(&key).ok_or_else(|| {
811            AwsServiceError::aws_error(
812                StatusCode::BAD_REQUEST,
813                "ResourceNotFoundException",
814                format!("Rule {name} does not exist."),
815            )
816        })?;
817
818        rule.state = "ENABLED".to_string();
819        Ok(json_resp(json!({})))
820    }
821
822    fn disable_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
823        let body = parse_body(req);
824        validate_required("Name", &body["Name"])?;
825        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
826        validate_string_length("name", name, 1, 64)?;
827        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
828        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
829
830        let mut state = self.state.write();
831        let bus_name = state.resolve_bus_name(event_bus_name);
832        let key = (bus_name, name.to_string());
833
834        let rule = state.rules.get_mut(&key).ok_or_else(|| {
835            AwsServiceError::aws_error(
836                StatusCode::BAD_REQUEST,
837                "ResourceNotFoundException",
838                format!("Rule {name} does not exist."),
839            )
840        })?;
841
842        rule.state = "DISABLED".to_string();
843        Ok(json_resp(json!({})))
844    }
845
846    // ─── Target Operations ──────────────────────────────────────────────
847
848    fn put_targets(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
849        let body = parse_body(req);
850        validate_required("Rule", &body["Rule"])?;
851        let rule_name = body["Rule"].as_str().ok_or_else(|| missing("Rule"))?;
852        validate_string_length("rule", rule_name, 1, 64)?;
853        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
854        validate_required("Targets", &body["Targets"])?;
855        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
856        let targets = body["Targets"]
857            .as_array()
858            .ok_or_else(|| missing("Targets"))?;
859
860        // Validate targets - check for FIFO SQS without SqsParameters
861        for target in targets {
862            let target_id = target["Id"].as_str().unwrap_or("");
863            let target_arn = target["Arn"].as_str().unwrap_or("");
864
865            if target_arn.ends_with(".fifo") && target.get("SqsParameters").is_none() {
866                return Err(AwsServiceError::aws_error(
867                    StatusCode::BAD_REQUEST,
868                    "ValidationException",
869                    format!(
870                        "Parameter(s) SqsParameters must be specified for target: {target_id}."
871                    ),
872                ));
873            }
874
875            // Validate ARN format
876            if !target_arn.starts_with("arn:") {
877                return Err(AwsServiceError::aws_error(
878                    StatusCode::BAD_REQUEST,
879                    "ValidationException",
880                    format!(
881                        "Parameter {target_arn} is not valid. Reason: Provided Arn is not in correct format."
882                    ),
883                ));
884            }
885        }
886
887        let mut state = self.state.write();
888        let bus_name = state.resolve_bus_name(event_bus_name);
889        let key = (bus_name.clone(), rule_name.to_string());
890
891        let rule = state.rules.get_mut(&key).ok_or_else(|| {
892            AwsServiceError::aws_error(
893                StatusCode::BAD_REQUEST,
894                "ResourceNotFoundException",
895                format!("Rule {rule_name} does not exist on EventBus {bus_name}."),
896            )
897        })?;
898
899        for target in targets {
900            let et = parse_target(target);
901            // Remove existing target with same ID
902            rule.targets.retain(|t| t.id != et.id);
903            rule.targets.push(et);
904        }
905
906        Ok(json_resp(json!({
907            "FailedEntryCount": 0,
908            "FailedEntries": [],
909        })))
910    }
911
912    fn remove_targets(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
913        let body = parse_body(req);
914        validate_required("Rule", &body["Rule"])?;
915        let rule_name = body["Rule"].as_str().ok_or_else(|| missing("Rule"))?;
916        validate_string_length("rule", rule_name, 1, 64)?;
917        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
918        validate_required("Ids", &body["Ids"])?;
919        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
920        let ids = body["Ids"].as_array().ok_or_else(|| missing("Ids"))?;
921
922        let target_ids: Vec<String> = ids
923            .iter()
924            .filter_map(|v| v.as_str().map(|s| s.to_string()))
925            .collect();
926
927        let mut state = self.state.write();
928        let bus_name = state.resolve_bus_name(event_bus_name);
929        let key = (bus_name.clone(), rule_name.to_string());
930
931        let rule = state.rules.get_mut(&key).ok_or_else(|| {
932            AwsServiceError::aws_error(
933                StatusCode::BAD_REQUEST,
934                "ResourceNotFoundException",
935                format!("Rule {rule_name} does not exist on EventBus {bus_name}."),
936            )
937        })?;
938
939        rule.targets.retain(|t| !target_ids.contains(&t.id));
940
941        Ok(json_resp(json!({
942            "FailedEntryCount": 0,
943            "FailedEntries": [],
944        })))
945    }
946
947    fn list_targets_by_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
948        let body = parse_body(req);
949        validate_required("Rule", &body["Rule"])?;
950        let rule_name = body["Rule"].as_str().ok_or_else(|| missing("Rule"))?;
951        validate_string_length("rule", rule_name, 1, 64)?;
952        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
953        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
954        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
955        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
956        let limit = body["Limit"].as_u64().map(|n| n as usize);
957        let next_token = body["NextToken"].as_str();
958
959        let state = self.state.read();
960        let bus_name = state.resolve_bus_name(event_bus_name);
961        let key = (bus_name, rule_name.to_string());
962
963        let rule = state.rules.get(&key).ok_or_else(|| {
964            AwsServiceError::aws_error(
965                StatusCode::BAD_REQUEST,
966                "ResourceNotFoundException",
967                format!("Rule {rule_name} does not exist."),
968            )
969        })?;
970
971        let all_targets = &rule.targets;
972        let start = next_token
973            .and_then(|t| t.parse::<usize>().ok())
974            .unwrap_or(0)
975            .min(all_targets.len());
976        let slice = &all_targets[start..];
977
978        let (page, new_next_token) = if let Some(lim) = limit {
979            if slice.len() > lim {
980                (&slice[..lim], Some((start + lim).to_string()))
981            } else {
982                (slice, None)
983            }
984        } else {
985            (slice, None)
986        };
987
988        let targets: Vec<Value> = page.iter().map(target_to_json).collect();
989
990        let mut resp = json!({ "Targets": targets });
991        if let Some(token) = new_next_token {
992            resp["NextToken"] = json!(token);
993        }
994
995        Ok(json_resp(resp))
996    }
997
998    fn list_rule_names_by_target(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
999        let body = parse_body(req);
1000        validate_required("TargetArn", &body["TargetArn"])?;
1001        let target_arn = body["TargetArn"]
1002            .as_str()
1003            .ok_or_else(|| missing("TargetArn"))?;
1004        validate_string_length("targetArn", target_arn, 1, 1600)?;
1005        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
1006        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1007        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1008        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
1009        let limit = body["Limit"].as_u64().map(|n| n as usize);
1010        let next_token = body["NextToken"].as_str();
1011
1012        let state = self.state.read();
1013        let bus_name = state.resolve_bus_name(event_bus_name);
1014
1015        // Deduplicate rule names
1016        let mut rule_names: Vec<String> = Vec::new();
1017        for rule in state.rules.values() {
1018            if rule.event_bus_name == bus_name
1019                && rule.targets.iter().any(|t| t.arn == target_arn)
1020                && !rule_names.contains(&rule.name)
1021            {
1022                rule_names.push(rule.name.clone());
1023            }
1024        }
1025        rule_names.sort();
1026
1027        let start = next_token
1028            .and_then(|t| t.parse::<usize>().ok())
1029            .unwrap_or(0)
1030            .min(rule_names.len());
1031        let slice = &rule_names[start..];
1032
1033        let (page, new_next_token) = if let Some(lim) = limit {
1034            if slice.len() > lim {
1035                (&slice[..lim], Some((start + lim).to_string()))
1036            } else {
1037                (slice, None)
1038            }
1039        } else {
1040            (slice, None)
1041        };
1042
1043        let mut resp = json!({ "RuleNames": page });
1044        if let Some(token) = new_next_token {
1045            resp["NextToken"] = json!(token);
1046        }
1047
1048        Ok(json_resp(resp))
1049    }
1050
1051    // ─── Partner Event Sources ────────────���───────────────────────────
1052
1053    fn create_partner_event_source(
1054        &self,
1055        req: &AwsRequest,
1056    ) -> Result<AwsResponse, AwsServiceError> {
1057        let body = parse_body(req);
1058        validate_required("Name", &body["Name"])?;
1059        let name = body["Name"]
1060            .as_str()
1061            .ok_or_else(|| missing("Name"))?
1062            .to_string();
1063        validate_string_length("name", &name, 1, 256)?;
1064        validate_required("Account", &body["Account"])?;
1065        let account = body["Account"]
1066            .as_str()
1067            .ok_or_else(|| missing("Account"))?
1068            .to_string();
1069        validate_string_length("account", &account, 12, 12)?;
1070
1071        let mut state = self.state.write();
1072        if state.partner_event_sources.contains_key(&name) {
1073            return Err(AwsServiceError::aws_error(
1074                StatusCode::CONFLICT,
1075                "ResourceAlreadyExistsException",
1076                format!("Partner event source {name} already exists."),
1077            ));
1078        }
1079        let arn = format!(
1080            "arn:aws:events:{}::event-source/aws.partner/{}",
1081            state.region, name
1082        );
1083        let now = Utc::now();
1084        let ps = PartnerEventSource {
1085            name: name.clone(),
1086            arn: arn.clone(),
1087            account,
1088            creation_time: now,
1089            expiration_time: None,
1090            state: "ACTIVE".to_string(),
1091        };
1092        state.partner_event_sources.insert(name.clone(), ps);
1093
1094        Ok(json_resp(json!({ "EventSourceArn": arn })))
1095    }
1096
1097    fn delete_partner_event_source(
1098        &self,
1099        req: &AwsRequest,
1100    ) -> Result<AwsResponse, AwsServiceError> {
1101        let body = parse_body(req);
1102        validate_required("Name", &body["Name"])?;
1103        let name = body["Name"]
1104            .as_str()
1105            .ok_or_else(|| missing("Name"))?
1106            .to_string();
1107        validate_required("Account", &body["Account"])?;
1108        let account = body["Account"]
1109            .as_str()
1110            .ok_or_else(|| missing("Account"))?
1111            .to_string();
1112
1113        let mut state = self.state.write();
1114        match state.partner_event_sources.get(&name) {
1115            Some(ps) if ps.account == account => {
1116                state.partner_event_sources.remove(&name);
1117            }
1118            Some(_) => {
1119                return Err(AwsServiceError::aws_error(
1120                    StatusCode::NOT_FOUND,
1121                    "ResourceNotFoundException",
1122                    format!("Partner event source {name} does not exist for account {account}."),
1123                ));
1124            }
1125            None => {
1126                return Err(AwsServiceError::aws_error(
1127                    StatusCode::NOT_FOUND,
1128                    "ResourceNotFoundException",
1129                    format!("Partner event source {name} does not exist."),
1130                ));
1131            }
1132        }
1133
1134        Ok(json_resp(json!({})))
1135    }
1136
1137    fn describe_partner_event_source(
1138        &self,
1139        req: &AwsRequest,
1140    ) -> Result<AwsResponse, AwsServiceError> {
1141        let body = parse_body(req);
1142        validate_required("Name", &body["Name"])?;
1143        let name = body["Name"]
1144            .as_str()
1145            .ok_or_else(|| missing("Name"))?
1146            .to_string();
1147        validate_string_length("name", &name, 1, 256)?;
1148
1149        let state = self.state.read();
1150        let ps = state.partner_event_sources.get(&name).ok_or_else(|| {
1151            AwsServiceError::aws_error(
1152                StatusCode::NOT_FOUND,
1153                "ResourceNotFoundException",
1154                format!("Partner event source {name} does not exist."),
1155            )
1156        })?;
1157
1158        Ok(json_resp(json!({
1159            "Arn": ps.arn,
1160            "Name": ps.name,
1161        })))
1162    }
1163
1164    fn list_partner_event_sources(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1165        let body = parse_body(req);
1166        let name_prefix = body["NamePrefix"].as_str();
1167        let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
1168        let offset: usize = body["NextToken"]
1169            .as_str()
1170            .map(|t| t.parse().unwrap_or(0))
1171            .unwrap_or(0);
1172
1173        let state = self.state.read();
1174        let filtered: Vec<Value> = state
1175            .partner_event_sources
1176            .values()
1177            .filter(|ps| match name_prefix {
1178                Some(prefix) => ps.name.starts_with(prefix),
1179                None => true,
1180            })
1181            .skip(offset)
1182            .take(limit + 1)
1183            .map(|ps| {
1184                json!({
1185                    "Arn": ps.arn,
1186                    "Name": ps.name,
1187                    "CreationTime": ps.creation_time.timestamp() as f64,
1188                })
1189            })
1190            .collect();
1191
1192        let has_more = filtered.len() > limit;
1193        let sources: Vec<Value> = filtered.into_iter().take(limit).collect();
1194        let mut resp = json!({ "PartnerEventSources": sources });
1195        if has_more {
1196            resp["NextToken"] = json!((offset + limit).to_string());
1197        }
1198
1199        Ok(json_resp(resp))
1200    }
1201
1202    fn list_partner_event_source_accounts(
1203        &self,
1204        req: &AwsRequest,
1205    ) -> Result<AwsResponse, AwsServiceError> {
1206        let body = parse_body(req);
1207        validate_required("EventSourceName", &body["EventSourceName"])?;
1208        let event_source_name = body["EventSourceName"]
1209            .as_str()
1210            .ok_or_else(|| missing("EventSourceName"))?;
1211
1212        let state = self.state.read();
1213        let accounts: Vec<Value> = state
1214            .partner_event_sources
1215            .values()
1216            .filter(|ps| ps.name == event_source_name)
1217            .map(|ps| json!({ "Account": ps.account }))
1218            .collect();
1219
1220        Ok(json_resp(json!({
1221            "PartnerEventSourceAccounts": accounts
1222        })))
1223    }
1224
1225    fn activate_event_source(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1226        let body = parse_body(req);
1227        validate_required("Name", &body["Name"])?;
1228        let name = body["Name"]
1229            .as_str()
1230            .ok_or_else(|| missing("Name"))?
1231            .to_string();
1232
1233        let mut state = self.state.write();
1234        let ps = state.partner_event_sources.get_mut(&name).ok_or_else(|| {
1235            AwsServiceError::aws_error(
1236                StatusCode::NOT_FOUND,
1237                "ResourceNotFoundException",
1238                format!("Event source {name} does not exist."),
1239            )
1240        })?;
1241        ps.state = "ACTIVE".to_string();
1242
1243        Ok(json_resp(json!({})))
1244    }
1245
1246    fn deactivate_event_source(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1247        let body = parse_body(req);
1248        validate_required("Name", &body["Name"])?;
1249        let name = body["Name"]
1250            .as_str()
1251            .ok_or_else(|| missing("Name"))?
1252            .to_string();
1253
1254        let mut state = self.state.write();
1255        let ps = state.partner_event_sources.get_mut(&name).ok_or_else(|| {
1256            AwsServiceError::aws_error(
1257                StatusCode::NOT_FOUND,
1258                "ResourceNotFoundException",
1259                format!("Event source {name} does not exist."),
1260            )
1261        })?;
1262        ps.state = "INACTIVE".to_string();
1263
1264        Ok(json_resp(json!({})))
1265    }
1266
1267    fn describe_event_source(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1268        let body = parse_body(req);
1269        validate_required("Name", &body["Name"])?;
1270        let name = body["Name"]
1271            .as_str()
1272            .ok_or_else(|| missing("Name"))?
1273            .to_string();
1274
1275        let state = self.state.read();
1276        let ps = state.partner_event_sources.get(&name).ok_or_else(|| {
1277            AwsServiceError::aws_error(
1278                StatusCode::NOT_FOUND,
1279                "ResourceNotFoundException",
1280                format!("Event source {name} does not exist."),
1281            )
1282        })?;
1283
1284        Ok(json_resp(json!({
1285            "Arn": ps.arn,
1286            "Name": ps.name,
1287            "CreatedBy": ps.account,
1288            "CreationTime": ps.creation_time.timestamp() as f64,
1289            "State": ps.state,
1290        })))
1291    }
1292
1293    fn list_event_sources(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1294        let body = parse_body(req);
1295        let name_prefix = body["NamePrefix"].as_str();
1296        let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
1297        let offset: usize = body["NextToken"]
1298            .as_str()
1299            .map(|t| t.parse().unwrap_or(0))
1300            .unwrap_or(0);
1301
1302        let state = self.state.read();
1303        let filtered: Vec<Value> = state
1304            .partner_event_sources
1305            .values()
1306            .filter(|ps| match name_prefix {
1307                Some(prefix) => ps.name.starts_with(prefix),
1308                None => true,
1309            })
1310            .skip(offset)
1311            .take(limit + 1)
1312            .map(|ps| {
1313                json!({
1314                    "Arn": ps.arn,
1315                    "Name": ps.name,
1316                    "CreatedBy": ps.account,
1317                    "CreationTime": ps.creation_time.timestamp() as f64,
1318                    "State": ps.state,
1319                })
1320            })
1321            .collect();
1322
1323        let has_more = filtered.len() > limit;
1324        let sources: Vec<Value> = filtered.into_iter().take(limit).collect();
1325        let mut resp = json!({ "EventSources": sources });
1326        if has_more {
1327            resp["NextToken"] = json!((offset + limit).to_string());
1328        }
1329
1330        Ok(json_resp(resp))
1331    }
1332
1333    fn put_partner_events(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1334        let body = parse_body(req);
1335        validate_required("Entries", &body["Entries"])?;
1336        let entries = body["Entries"]
1337            .as_array()
1338            .ok_or_else(|| missing("Entries"))?;
1339
1340        let mut result_entries = Vec::new();
1341        for _entry in entries {
1342            let event_id = uuid::Uuid::new_v4().to_string();
1343            result_entries.push(json!({ "EventId": event_id }));
1344        }
1345
1346        Ok(json_resp(json!({
1347            "FailedEntryCount": 0,
1348            "Entries": result_entries,
1349        })))
1350    }
1351
1352    // ─── TestEventPattern ────────────────────────────────────────────────
1353
1354    fn test_event_pattern(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1355        let body = parse_body(req);
1356        validate_required("EventPattern", &body["EventPattern"])?;
1357        validate_required("Event", &body["Event"])?;
1358        let event_pattern = body["EventPattern"]
1359            .as_str()
1360            .ok_or_else(|| missing("EventPattern"))?;
1361        let event_str = body["Event"].as_str().ok_or_else(|| missing("Event"))?;
1362
1363        // Parse the event JSON
1364        let event: Value = serde_json::from_str(event_str).map_err(|_| {
1365            AwsServiceError::aws_error(
1366                StatusCode::BAD_REQUEST,
1367                "InvalidEventPatternException",
1368                "Event is not valid JSON.",
1369            )
1370        })?;
1371
1372        // Parse the pattern JSON
1373        let _pattern: Value = serde_json::from_str(event_pattern).map_err(|_| {
1374            AwsServiceError::aws_error(
1375                StatusCode::BAD_REQUEST,
1376                "InvalidEventPatternException",
1377                "Event pattern is not valid JSON.",
1378            )
1379        })?;
1380
1381        let source = event["source"].as_str().unwrap_or("");
1382        let detail_type = event["detail-type"].as_str().unwrap_or("");
1383        let detail = event
1384            .get("detail")
1385            .map(|v| serde_json::to_string(v).unwrap_or_default())
1386            .unwrap_or_else(|| "{}".to_string());
1387        let account = event["account"].as_str().unwrap_or("");
1388        let region = event["region"].as_str().unwrap_or("");
1389        let resources: Vec<String> = event["resources"]
1390            .as_array()
1391            .map(|arr| {
1392                arr.iter()
1393                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
1394                    .collect()
1395            })
1396            .unwrap_or_default();
1397
1398        let result = matches_pattern(
1399            Some(event_pattern),
1400            source,
1401            detail_type,
1402            &detail,
1403            account,
1404            region,
1405            &resources,
1406        );
1407
1408        Ok(json_resp(json!({ "Result": result })))
1409    }
1410
1411    // ─── UpdateEventBus ─────────────────────────────────────────────────
1412
1413    fn update_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1414        let body = parse_body(req);
1415        let name = body["Name"].as_str().unwrap_or("default");
1416
1417        let mut state = self.state.write();
1418        let bus = state.buses.get_mut(name).ok_or_else(|| {
1419            AwsServiceError::aws_error(
1420                StatusCode::BAD_REQUEST,
1421                "ResourceNotFoundException",
1422                format!("Event bus {name} does not exist."),
1423            )
1424        })?;
1425
1426        if let Some(desc) = body["Description"].as_str() {
1427            bus.description = Some(desc.to_string());
1428        }
1429        if let Some(kms) = body["KmsKeyIdentifier"].as_str() {
1430            bus.kms_key_identifier = Some(kms.to_string());
1431        }
1432        if let Some(dlc) = body.get("DeadLetterConfig") {
1433            bus.dead_letter_config = Some(dlc.clone());
1434        }
1435        bus.last_modified_time = Utc::now();
1436
1437        let arn = bus.arn.clone();
1438        let bus_name = bus.name.clone();
1439
1440        Ok(json_resp(json!({
1441            "Arn": arn,
1442            "Name": bus_name,
1443        })))
1444    }
1445
1446    // ─── Endpoint Operations ────────────────────────────────────────────
1447
1448    fn create_endpoint(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1449        let body = parse_body(req);
1450        validate_required("Name", &body["Name"])?;
1451        let name = body["Name"]
1452            .as_str()
1453            .ok_or_else(|| missing("Name"))?
1454            .to_string();
1455        validate_string_length("name", &name, 1, 64)?;
1456        validate_required("RoutingConfig", &body["RoutingConfig"])?;
1457        validate_required("EventBuses", &body["EventBuses"])?;
1458
1459        let description = body["Description"].as_str().map(|s| s.to_string());
1460        let routing_config = body["RoutingConfig"].clone();
1461        let replication_config = body.get("ReplicationConfig").cloned();
1462        let event_buses = body["EventBuses"].as_array().cloned().unwrap_or_default();
1463        let role_arn = body["RoleArn"].as_str().map(|s| s.to_string());
1464
1465        let mut state = self.state.write();
1466        if state.endpoints.contains_key(&name) {
1467            return Err(AwsServiceError::aws_error(
1468                StatusCode::CONFLICT,
1469                "ResourceAlreadyExistsException",
1470                format!("Endpoint {name} already exists."),
1471            ));
1472        }
1473
1474        let endpoint_id = format!("{}.abc123", name);
1475        let arn = format!(
1476            "arn:aws:events:{}:{}:endpoint/{}",
1477            req.region, state.account_id, name
1478        );
1479        let endpoint_url = format!(
1480            "https://{}.endpoint.events.{}.amazonaws.com",
1481            endpoint_id, req.region
1482        );
1483        let now = Utc::now();
1484
1485        let endpoint = Endpoint {
1486            name: name.clone(),
1487            arn: arn.clone(),
1488            endpoint_id: endpoint_id.clone(),
1489            endpoint_url: Some(endpoint_url),
1490            description,
1491            routing_config: routing_config.clone(),
1492            replication_config: replication_config.clone(),
1493            event_buses: event_buses.clone(),
1494            role_arn: role_arn.clone(),
1495            state: "ACTIVE".to_string(),
1496            creation_time: now,
1497            last_modified_time: now,
1498        };
1499        state.endpoints.insert(name.clone(), endpoint);
1500
1501        let mut resp = json!({
1502            "Name": name,
1503            "Arn": arn,
1504            "State": "ACTIVE",
1505            "RoutingConfig": routing_config,
1506            "EventBuses": event_buses,
1507        });
1508        if let Some(ref rc) = replication_config {
1509            resp["ReplicationConfig"] = rc.clone();
1510        }
1511        if let Some(ref ra) = role_arn {
1512            resp["RoleArn"] = json!(ra);
1513        }
1514
1515        Ok(json_resp(resp))
1516    }
1517
1518    fn delete_endpoint(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1519        let body = parse_body(req);
1520        validate_required("Name", &body["Name"])?;
1521        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
1522
1523        let mut state = self.state.write();
1524        state.endpoints.remove(name).ok_or_else(|| {
1525            AwsServiceError::aws_error(
1526                StatusCode::BAD_REQUEST,
1527                "ResourceNotFoundException",
1528                format!("Endpoint '{name}' does not exist."),
1529            )
1530        })?;
1531
1532        Ok(json_resp(json!({})))
1533    }
1534
1535    fn describe_endpoint(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1536        let body = parse_body(req);
1537        validate_required("Name", &body["Name"])?;
1538        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
1539
1540        let state = self.state.read();
1541        let ep = state.endpoints.get(name).ok_or_else(|| {
1542            AwsServiceError::aws_error(
1543                StatusCode::BAD_REQUEST,
1544                "ResourceNotFoundException",
1545                format!("Endpoint '{name}' does not exist."),
1546            )
1547        })?;
1548
1549        let mut resp = json!({
1550            "Name": ep.name,
1551            "Arn": ep.arn,
1552            "EndpointId": ep.endpoint_id,
1553            "State": ep.state,
1554            "RoutingConfig": ep.routing_config,
1555            "EventBuses": ep.event_buses,
1556            "CreationTime": ep.creation_time.timestamp() as f64,
1557            "LastModifiedTime": ep.last_modified_time.timestamp() as f64,
1558        });
1559        if let Some(ref url) = ep.endpoint_url {
1560            resp["EndpointUrl"] = json!(url);
1561        }
1562        if let Some(ref desc) = ep.description {
1563            resp["Description"] = json!(desc);
1564        }
1565        if let Some(ref rc) = ep.replication_config {
1566            resp["ReplicationConfig"] = rc.clone();
1567        }
1568        if let Some(ref ra) = ep.role_arn {
1569            resp["RoleArn"] = json!(ra);
1570        }
1571
1572        Ok(json_resp(resp))
1573    }
1574
1575    fn list_endpoints(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1576        let body = parse_body(req);
1577        let name_prefix = body["NamePrefix"].as_str();
1578        let limit = body["MaxResults"].as_i64().unwrap_or(100) as usize;
1579        let offset: usize = body["NextToken"]
1580            .as_str()
1581            .map(|t| t.parse().unwrap_or(0))
1582            .unwrap_or(0);
1583
1584        let state = self.state.read();
1585        let filtered: Vec<Value> = state
1586            .endpoints
1587            .values()
1588            .filter(|ep| match name_prefix {
1589                Some(prefix) => ep.name.starts_with(prefix),
1590                None => true,
1591            })
1592            .skip(offset)
1593            .take(limit + 1)
1594            .map(|ep| {
1595                let mut obj = json!({
1596                    "Name": ep.name,
1597                    "Arn": ep.arn,
1598                    "EndpointId": ep.endpoint_id,
1599                    "State": ep.state,
1600                    "RoutingConfig": ep.routing_config,
1601                    "EventBuses": ep.event_buses,
1602                    "CreationTime": ep.creation_time.timestamp() as f64,
1603                    "LastModifiedTime": ep.last_modified_time.timestamp() as f64,
1604                });
1605                if let Some(ref url) = ep.endpoint_url {
1606                    obj["EndpointUrl"] = json!(url);
1607                }
1608                obj
1609            })
1610            .collect();
1611
1612        let has_more = filtered.len() > limit;
1613        let endpoints: Vec<Value> = filtered.into_iter().take(limit).collect();
1614        let mut resp = json!({ "Endpoints": endpoints });
1615        if has_more {
1616            resp["NextToken"] = json!((offset + limit).to_string());
1617        }
1618
1619        Ok(json_resp(resp))
1620    }
1621
1622    fn update_endpoint(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1623        let body = parse_body(req);
1624        validate_required("Name", &body["Name"])?;
1625        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
1626
1627        let mut state = self.state.write();
1628        let ep = state.endpoints.get_mut(name).ok_or_else(|| {
1629            AwsServiceError::aws_error(
1630                StatusCode::BAD_REQUEST,
1631                "ResourceNotFoundException",
1632                format!("Endpoint '{name}' does not exist."),
1633            )
1634        })?;
1635
1636        if let Some(desc) = body["Description"].as_str() {
1637            ep.description = Some(desc.to_string());
1638        }
1639        if !body["RoutingConfig"].is_null() {
1640            ep.routing_config = body["RoutingConfig"].clone();
1641        }
1642        if let Some(rc) = body.get("ReplicationConfig") {
1643            ep.replication_config = Some(rc.clone());
1644        }
1645        if let Some(buses) = body["EventBuses"].as_array() {
1646            ep.event_buses = buses.clone();
1647        }
1648        if let Some(ra) = body["RoleArn"].as_str() {
1649            ep.role_arn = Some(ra.to_string());
1650        }
1651        ep.last_modified_time = Utc::now();
1652
1653        let resp = json!({
1654            "Name": ep.name,
1655            "Arn": ep.arn,
1656            "EndpointId": ep.endpoint_id,
1657            "State": ep.state,
1658            "RoutingConfig": ep.routing_config,
1659            "EventBuses": ep.event_buses,
1660        });
1661
1662        Ok(json_resp(resp))
1663    }
1664
1665    // ─── DeauthorizeConnection ──────────────────────────────────────────
1666
1667    fn deauthorize_connection(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1668        let body = parse_body(req);
1669        validate_required("Name", &body["Name"])?;
1670        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
1671        validate_string_length("name", name, 1, 64)?;
1672
1673        let mut state = self.state.write();
1674        let conn = state.connections.get_mut(name).ok_or_else(|| {
1675            AwsServiceError::aws_error(
1676                StatusCode::BAD_REQUEST,
1677                "ResourceNotFoundException",
1678                format!("Connection '{name}' does not exist."),
1679            )
1680        })?;
1681
1682        conn.connection_state = "DEAUTHORIZING".to_string();
1683        conn.last_modified_time = Utc::now();
1684
1685        let resp = json!({
1686            "ConnectionArn": conn.arn,
1687            "ConnectionState": conn.connection_state,
1688            "CreationTime": conn.creation_time.timestamp() as f64,
1689            "LastModifiedTime": conn.last_modified_time.timestamp() as f64,
1690            "LastAuthorizedTime": conn.last_authorized_time.timestamp() as f64,
1691        });
1692
1693        Ok(json_resp(resp))
1694    }
1695
1696    // ─── PutEvents ──────────────────────────────────────────────────────
1697
1698    fn put_events(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1699        let body = parse_body(req);
1700        validate_required("Entries", &body["Entries"])?;
1701        validate_optional_string_length("endpointId", body["EndpointId"].as_str(), 1, 50)?;
1702        let entries = body["Entries"]
1703            .as_array()
1704            .ok_or_else(|| missing("Entries"))?;
1705
1706        // Validate entries count
1707        if entries.is_empty() {
1708            return Err(AwsServiceError::aws_error(
1709                StatusCode::BAD_REQUEST,
1710                "ValidationException",
1711                "1 validation error detected: Value '[PutEventsRequestEntry]' at 'entries' failed to satisfy constraint: Member must have length greater than or equal to 1",
1712            ));
1713        }
1714        if entries.len() > 10 {
1715            return Err(AwsServiceError::aws_error(
1716                StatusCode::BAD_REQUEST,
1717                "ValidationException",
1718                "1 validation error detected: Value '[PutEventsRequestEntry]' at 'entries' failed to satisfy constraint: Member must have length less than or equal to 10",
1719            ));
1720        }
1721
1722        let mut state = self.state.write();
1723        let mut result_entries = Vec::new();
1724        let mut events_to_deliver = Vec::new();
1725        let mut failed_count = 0;
1726
1727        for entry in entries {
1728            let source = entry["Source"].as_str().unwrap_or("").to_string();
1729            let detail_type = entry["DetailType"].as_str().unwrap_or("").to_string();
1730            let detail = entry["Detail"].as_str().unwrap_or("").to_string();
1731
1732            // Validate required fields
1733            if source.is_empty() {
1734                failed_count += 1;
1735                result_entries.push(json!({
1736                    "ErrorCode": "InvalidArgument",
1737                    "ErrorMessage": "Parameter Source is not valid. Reason: Source is a required argument.",
1738                }));
1739                continue;
1740            }
1741            if detail_type.is_empty() {
1742                failed_count += 1;
1743                result_entries.push(json!({
1744                    "ErrorCode": "InvalidArgument",
1745                    "ErrorMessage": "Parameter DetailType is not valid. Reason: DetailType is a required argument.",
1746                }));
1747                continue;
1748            }
1749            if detail.is_empty() {
1750                failed_count += 1;
1751                result_entries.push(json!({
1752                    "ErrorCode": "InvalidArgument",
1753                    "ErrorMessage": "Parameter Detail is not valid. Reason: Detail is a required argument.",
1754                }));
1755                continue;
1756            }
1757
1758            // Validate Detail is valid JSON
1759            if serde_json::from_str::<Value>(&detail).is_err() {
1760                failed_count += 1;
1761                result_entries.push(json!({
1762                    "ErrorCode": "MalformedDetail",
1763                    "ErrorMessage": "Detail is malformed.",
1764                }));
1765                continue;
1766            }
1767
1768            let event_id = uuid::Uuid::new_v4().to_string();
1769            let raw_bus = entry["EventBusName"]
1770                .as_str()
1771                .unwrap_or("default")
1772                .to_string();
1773            let event_bus_name = state.resolve_bus_name(&raw_bus);
1774            let time = if let Some(s) = entry["Time"].as_str() {
1775                DateTime::parse_from_rfc3339(s)
1776                    .map(|dt| dt.with_timezone(&Utc))
1777                    .unwrap_or_else(|_| Utc::now())
1778            } else if let Some(ts) = entry["Time"].as_f64() {
1779                DateTime::from_timestamp(ts as i64, ((ts.fract()) * 1_000_000_000.0) as u32)
1780                    .unwrap_or_else(Utc::now)
1781            } else if let Some(ts) = entry["Time"].as_i64() {
1782                DateTime::from_timestamp(ts, 0).unwrap_or_else(Utc::now)
1783            } else {
1784                Utc::now()
1785            };
1786            let resources: Vec<String> = entry["Resources"]
1787                .as_array()
1788                .map(|arr| {
1789                    arr.iter()
1790                        .filter_map(|v| v.as_str().map(|s| s.to_string()))
1791                        .collect()
1792                })
1793                .unwrap_or_default();
1794
1795            let event = PutEvent {
1796                event_id: event_id.clone(),
1797                source: source.clone(),
1798                detail_type: detail_type.clone(),
1799                detail: detail.clone(),
1800                event_bus_name: event_bus_name.clone(),
1801                time,
1802                resources: resources.clone(),
1803            };
1804
1805            // Archive matching events
1806            let archive_keys: Vec<String> = state.archives.keys().cloned().collect();
1807            for akey in archive_keys {
1808                let (archive_bus, archive_pattern, archive_enabled) = {
1809                    let a = &state.archives[&akey];
1810                    (
1811                        state.resolve_bus_name(&a.event_source_arn),
1812                        a.event_pattern.clone(),
1813                        a.state == "ENABLED",
1814                    )
1815                };
1816                if archive_bus == event_bus_name && archive_enabled {
1817                    // Check if event matches archive's event pattern
1818                    let pattern_matches = matches_pattern(
1819                        archive_pattern.as_deref(),
1820                        &source,
1821                        &detail_type,
1822                        &detail,
1823                        &req.account_id,
1824                        &req.region,
1825                        &resources,
1826                    );
1827                    if pattern_matches {
1828                        if let Some(archive) = state.archives.get_mut(&akey) {
1829                            archive.event_count += 1;
1830                            archive.size_bytes += detail.len() as i64;
1831                            archive.events.push(event.clone());
1832                        }
1833                    }
1834                }
1835            }
1836
1837            state.events.push(event);
1838
1839            // Find matching rules and their targets
1840            let matching_targets: Vec<EventTarget> = state
1841                .rules
1842                .values()
1843                .filter(|r| {
1844                    r.event_bus_name == event_bus_name
1845                        && r.state == "ENABLED"
1846                        && matches_pattern(
1847                            r.event_pattern.as_deref(),
1848                            &source,
1849                            &detail_type,
1850                            &detail,
1851                            &req.account_id,
1852                            &req.region,
1853                            &resources,
1854                        )
1855                })
1856                .flat_map(|r| r.targets.clone())
1857                .collect();
1858
1859            if !matching_targets.is_empty() {
1860                events_to_deliver.push((
1861                    event_id.clone(),
1862                    source,
1863                    detail_type,
1864                    detail,
1865                    time,
1866                    resources,
1867                    matching_targets,
1868                ));
1869            }
1870
1871            result_entries.push(json!({ "EventId": event_id }));
1872        }
1873
1874        // Drop the lock before delivering
1875        drop(state);
1876
1877        // Deliver to targets
1878        for (event_id, source, detail_type, detail, time, resources, targets) in events_to_deliver {
1879            let detail_value: Value = serde_json::from_str(&detail).unwrap_or(json!({}));
1880            let event_json = json!({
1881                "version": "0",
1882                "id": event_id,
1883                "source": source,
1884                "account": req.account_id,
1885                "detail-type": detail_type,
1886                "detail": detail_value,
1887                "time": time.format("%Y-%m-%dT%H:%M:%SZ").to_string(),
1888                "region": req.region,
1889                "resources": resources,
1890            });
1891            let event_str = event_json.to_string();
1892
1893            for target in targets {
1894                let arn = &target.arn;
1895                // Compute the message body, applying InputTransformer if present
1896                let body_str = if let Some(ref transformer) = target.input_transformer {
1897                    apply_input_transformer(transformer, &event_json)
1898                } else if let Some(ref input) = target.input {
1899                    input.clone()
1900                } else if let Some(ref input_path) = target.input_path {
1901                    resolve_json_path(&event_json, input_path)
1902                        .map(|v| v.to_string())
1903                        .unwrap_or_else(|| event_str.clone())
1904                } else {
1905                    event_str.clone()
1906                };
1907
1908                if arn.contains(":sqs:") {
1909                    // Extract FIFO parameters (MessageGroupId)
1910                    let group_id = target
1911                        .sqs_parameters
1912                        .as_ref()
1913                        .and_then(|p| p["MessageGroupId"].as_str())
1914                        .map(|s| s.to_string());
1915                    if group_id.is_some() {
1916                        // FIFO queue: send with group ID but no dedup ID.
1917                        // Queues with content-based dedup will auto-generate one;
1918                        // queues without it will reject the message.
1919                        self.delivery.send_to_sqs_with_attrs(
1920                            arn,
1921                            &body_str,
1922                            &HashMap::new(),
1923                            group_id.as_deref(),
1924                            None,
1925                        );
1926                    } else {
1927                        self.delivery.send_to_sqs(arn, &body_str, &HashMap::new());
1928                    }
1929                } else if arn.contains(":sns:") {
1930                    self.delivery
1931                        .publish_to_sns(arn, &body_str, Some(&detail_type));
1932                } else if arn.contains(":lambda:") {
1933                    tracing::info!(
1934                        function_arn = %arn,
1935                        payload = %body_str,
1936                        "EventBridge delivering to Lambda function"
1937                    );
1938                    let now = Utc::now();
1939                    let mut state = self.state.write();
1940                    state
1941                        .lambda_invocations
1942                        .push(crate::state::LambdaInvocation {
1943                            function_arn: arn.clone(),
1944                            payload: body_str.clone(),
1945                            timestamp: now,
1946                        });
1947                    drop(state);
1948                    // Record in Lambda state for cross-service visibility
1949                    if let Some(ref ls) = self.lambda_state {
1950                        ls.write().invocations.push(LambdaInvocation {
1951                            function_arn: arn.clone(),
1952                            payload: body_str.clone(),
1953                            timestamp: now,
1954                            source: "aws:events".to_string(),
1955                        });
1956                    }
1957                } else if arn.contains(":logs:") {
1958                    tracing::info!(
1959                        log_group_arn = %arn,
1960                        payload = %body_str,
1961                        "EventBridge delivering to CloudWatch Logs"
1962                    );
1963                    let now = Utc::now();
1964                    let mut state = self.state.write();
1965                    state.log_deliveries.push(crate::state::LogDelivery {
1966                        log_group_arn: arn.clone(),
1967                        payload: body_str.clone(),
1968                        timestamp: now,
1969                    });
1970                    drop(state);
1971                    // Write event to CloudWatch Logs state
1972                    if let Some(ref log_state) = self.logs_state {
1973                        deliver_to_logs(log_state, arn, &body_str, now);
1974                    }
1975                } else if arn.contains(":states:") {
1976                    tracing::info!(
1977                        state_machine_arn = %arn,
1978                        payload = %body_str,
1979                        "EventBridge delivering to Step Functions (stub)"
1980                    );
1981                    let mut state = self.state.write();
1982                    state
1983                        .step_function_executions
1984                        .push(crate::state::StepFunctionExecution {
1985                            state_machine_arn: arn.clone(),
1986                            payload: body_str.clone(),
1987                            timestamp: Utc::now(),
1988                        });
1989                } else if arn.starts_with("https://") || arn.starts_with("http://") {
1990                    // HTTP/API destination target — fire-and-forget POST
1991                    let url = arn.clone();
1992                    let payload = body_str.clone();
1993                    tokio::spawn(async move {
1994                        let client = reqwest::Client::new();
1995                        let result = client
1996                            .post(&url)
1997                            .header("Content-Type", "application/json")
1998                            .body(payload)
1999                            .send()
2000                            .await;
2001                        if let Err(e) = result {
2002                            tracing::warn!(
2003                                endpoint = %url,
2004                                error = %e,
2005                                "EventBridge HTTP target delivery failed"
2006                            );
2007                        }
2008                    });
2009                }
2010            }
2011        }
2012
2013        let resp = json!({
2014            "FailedEntryCount": failed_count,
2015            "Entries": result_entries,
2016        });
2017
2018        Ok(json_resp(resp))
2019    }
2020
2021    // ─── Tagging ────────────────────────────────────────────────────────
2022
2023    fn tag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2024        let body = parse_body(req);
2025        validate_required("ResourceARN", &body["ResourceARN"])?;
2026        let arn = body["ResourceARN"]
2027            .as_str()
2028            .ok_or_else(|| missing("ResourceARN"))?;
2029        validate_string_length("resourceARN", arn, 1, 1600)?;
2030        validate_required("Tags", &body["Tags"])?;
2031        let tags = body["Tags"].as_array().ok_or_else(|| missing("Tags"))?;
2032
2033        let mut state = self.state.write();
2034
2035        let tag_map = find_tags_mut(&mut state, arn)?;
2036
2037        for tag in tags {
2038            if let (Some(key), Some(val)) = (tag["Key"].as_str(), tag["Value"].as_str()) {
2039                tag_map.insert(key.to_string(), val.to_string());
2040            }
2041        }
2042
2043        Ok(json_resp(json!({})))
2044    }
2045
2046    fn untag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2047        let body = parse_body(req);
2048        validate_required("ResourceARN", &body["ResourceARN"])?;
2049        let arn = body["ResourceARN"]
2050            .as_str()
2051            .ok_or_else(|| missing("ResourceARN"))?;
2052        validate_string_length("resourceARN", arn, 1, 1600)?;
2053        validate_required("TagKeys", &body["TagKeys"])?;
2054        let tag_keys = body["TagKeys"]
2055            .as_array()
2056            .ok_or_else(|| missing("TagKeys"))?;
2057
2058        let mut state = self.state.write();
2059        let tag_map = find_tags_mut(&mut state, arn)?;
2060
2061        for key in tag_keys {
2062            if let Some(k) = key.as_str() {
2063                tag_map.remove(k);
2064            }
2065        }
2066
2067        Ok(json_resp(json!({})))
2068    }
2069
2070    fn list_tags_for_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2071        let body = parse_body(req);
2072        validate_required("ResourceARN", &body["ResourceARN"])?;
2073        let arn = body["ResourceARN"]
2074            .as_str()
2075            .ok_or_else(|| missing("ResourceARN"))?;
2076        validate_string_length("resourceARN", arn, 1, 1600)?;
2077
2078        let state = self.state.read();
2079        let tag_map = find_tags(&state, arn)?;
2080
2081        let tags: Vec<Value> = tag_map
2082            .iter()
2083            .map(|(k, v)| json!({"Key": k, "Value": v}))
2084            .collect();
2085
2086        Ok(json_resp(json!({ "Tags": tags })))
2087    }
2088
2089    // ─── Archive Operations ─────────────────────────────────────────────
2090
2091    fn create_archive(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2092        let body = parse_body(req);
2093        validate_required("ArchiveName", &body["ArchiveName"])?;
2094        let name = body["ArchiveName"]
2095            .as_str()
2096            .ok_or_else(|| missing("ArchiveName"))?
2097            .to_string();
2098        validate_string_length("archiveName", &name, 1, 48)?;
2099        validate_required("EventSourceArn", &body["EventSourceArn"])?;
2100        let event_source_arn = body["EventSourceArn"]
2101            .as_str()
2102            .ok_or_else(|| missing("EventSourceArn"))?
2103            .to_string();
2104        validate_string_length("eventSourceArn", &event_source_arn, 1, 1600)?;
2105        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2106        validate_optional_string_length("eventPattern", body["EventPattern"].as_str(), 0, 4096)?;
2107        if let Some(rd) = body["RetentionDays"].as_i64() {
2108            validate_range_i64("retentionDays", rd, 0, i64::MAX)?;
2109        }
2110        let description = body["Description"].as_str().map(|s| s.to_string());
2111        let event_pattern = body["EventPattern"].as_str().map(|s| s.to_string());
2112        let retention_days = body["RetentionDays"].as_i64().unwrap_or(0);
2113
2114        // Validate event pattern if provided
2115        if let Some(ref pattern) = event_pattern {
2116            validate_event_pattern(pattern)?;
2117        }
2118
2119        let mut state = self.state.write();
2120
2121        // Validate event bus exists
2122        let bus_name = state.resolve_bus_name(&event_source_arn);
2123        if !state.buses.contains_key(&bus_name) {
2124            return Err(AwsServiceError::aws_error(
2125                StatusCode::BAD_REQUEST,
2126                "ResourceNotFoundException",
2127                format!("Event bus {bus_name} does not exist."),
2128            ));
2129        }
2130
2131        // Check duplicate
2132        if state.archives.contains_key(&name) {
2133            return Err(AwsServiceError::aws_error(
2134                StatusCode::BAD_REQUEST,
2135                "ResourceAlreadyExistsException",
2136                format!("Archive {name} already exists."),
2137            ));
2138        }
2139
2140        let now = Utc::now();
2141        let arn = format!(
2142            "arn:aws:events:{}:{}:archive/{}",
2143            req.region, state.account_id, name
2144        );
2145
2146        let archive = Archive {
2147            name: name.clone(),
2148            arn: arn.clone(),
2149            event_source_arn: event_source_arn.clone(),
2150            description,
2151            event_pattern: event_pattern.clone(),
2152            retention_days,
2153            state: "ENABLED".to_string(),
2154            creation_time: now,
2155            event_count: 0,
2156            size_bytes: 0,
2157            events: Vec::new(),
2158        };
2159        state.archives.insert(name.clone(), archive);
2160
2161        // Create the archive rule
2162        let rule_name = format!("Events-Archive-{name}");
2163        let rule_arn = format!(
2164            "arn:aws:events:{}:{}:rule/{}",
2165            req.region, state.account_id, rule_name
2166        );
2167        // Merge archive event pattern with replay-name filter
2168        let rule_event_pattern = {
2169            let mut merged = if let Some(ref ep) = event_pattern {
2170                serde_json::from_str::<Value>(ep).unwrap_or_else(|_| json!({}))
2171            } else {
2172                json!({})
2173            };
2174            if let Some(obj) = merged.as_object_mut() {
2175                obj.insert("replay-name".to_string(), json!([{"exists": false}]));
2176            }
2177            serde_json::to_string(&merged).unwrap_or_default()
2178        };
2179
2180        // Build the archive target with InputTransformer
2181        let archive_target = EventTarget {
2182            id: name.clone(),
2183            arn: format!("arn:aws:events:{}:::", req.region),
2184            input: None,
2185            input_path: None,
2186            input_transformer: Some(json!({
2187                "InputPathsMap": {},
2188                "InputTemplate": format!(
2189                    "{{\"archive-arn\": \"{}\", \"event\": <aws.events.event.json>, \"ingestion-time\": <aws.events.event.ingestion-time>}}",
2190                    arn
2191                )
2192            })),
2193            sqs_parameters: None,
2194        };
2195
2196        let archive_rule = EventRule {
2197            name: rule_name.clone(),
2198            arn: rule_arn,
2199            event_bus_name: bus_name.clone(),
2200            event_pattern: Some(rule_event_pattern),
2201            schedule_expression: None,
2202            state: "ENABLED".to_string(),
2203            description: None,
2204            role_arn: None,
2205            managed_by: Some("prod.vhs.events.aws.internal".to_string()),
2206            created_by: Some(state.account_id.clone()),
2207            targets: vec![archive_target],
2208            tags: HashMap::new(),
2209            last_fired: None,
2210        };
2211        let key = (bus_name, rule_name);
2212        state.rules.insert(key, archive_rule);
2213
2214        Ok(json_resp(json!({
2215            "ArchiveArn": arn,
2216            "CreationTime": now.timestamp() as f64,
2217            "State": "ENABLED",
2218        })))
2219    }
2220
2221    fn describe_archive(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2222        let body = parse_body(req);
2223        validate_required("ArchiveName", &body["ArchiveName"])?;
2224        let name = body["ArchiveName"]
2225            .as_str()
2226            .ok_or_else(|| missing("ArchiveName"))?;
2227        validate_string_length("archiveName", name, 1, 48)?;
2228
2229        let state = self.state.read();
2230        let archive = state.archives.get(name).ok_or_else(|| {
2231            AwsServiceError::aws_error(
2232                StatusCode::BAD_REQUEST,
2233                "ResourceNotFoundException",
2234                format!("Archive {name} does not exist."),
2235            )
2236        })?;
2237
2238        let mut resp = json!({
2239            "ArchiveArn": archive.arn,
2240            "ArchiveName": archive.name,
2241            "CreationTime": archive.creation_time.timestamp() as f64,
2242            "EventCount": archive.event_count,
2243            "EventSourceArn": archive.event_source_arn,
2244            "RetentionDays": archive.retention_days,
2245            "SizeBytes": archive.size_bytes,
2246            "State": archive.state,
2247        });
2248        if let Some(ref desc) = archive.description {
2249            resp["Description"] = json!(desc);
2250        }
2251        if let Some(ref ep) = archive.event_pattern {
2252            resp["EventPattern"] = json!(ep);
2253        }
2254
2255        Ok(json_resp(resp))
2256    }
2257
2258    fn list_archives(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2259        let body = parse_body(req);
2260        validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 48)?;
2261        validate_optional_string_length(
2262            "eventSourceArn",
2263            body["EventSourceArn"].as_str(),
2264            1,
2265            1600,
2266        )?;
2267        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
2268        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
2269        let name_prefix = body["NamePrefix"].as_str();
2270        let source_arn = body["EventSourceArn"].as_str();
2271        let archive_state = body["State"].as_str();
2272
2273        // Validate at most one filter
2274        let filter_count = [
2275            name_prefix.is_some(),
2276            source_arn.is_some(),
2277            archive_state.is_some(),
2278        ]
2279        .iter()
2280        .filter(|&&x| x)
2281        .count();
2282        if filter_count > 1 {
2283            return Err(AwsServiceError::aws_error(
2284                StatusCode::BAD_REQUEST,
2285                "ValidationException",
2286                "At most one filter is allowed for ListArchives. Use either : State, EventSourceArn, or NamePrefix.",
2287            ));
2288        }
2289
2290        // Validate state
2291        if let Some(s) = archive_state {
2292            let valid = [
2293                "ENABLED",
2294                "DISABLED",
2295                "CREATING",
2296                "UPDATING",
2297                "CREATE_FAILED",
2298                "UPDATE_FAILED",
2299            ];
2300            if !valid.contains(&s) {
2301                return Err(AwsServiceError::aws_error(
2302                    StatusCode::BAD_REQUEST,
2303                    "ValidationException",
2304                    format!(
2305                        "1 validation error detected: Value '{}' at 'state' failed to satisfy constraint: Member must satisfy enum value set: [ENABLED, DISABLED, CREATING, UPDATING, CREATE_FAILED, UPDATE_FAILED]",
2306                        s
2307                    ),
2308                ));
2309            }
2310        }
2311
2312        let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
2313        let offset: usize = body["NextToken"]
2314            .as_str()
2315            .and_then(|t| t.parse().ok())
2316            .unwrap_or(0);
2317
2318        let state = self.state.read();
2319        let filtered: Vec<Value> = state
2320            .archives
2321            .values()
2322            .filter(|a| {
2323                if let Some(prefix) = name_prefix {
2324                    a.name.starts_with(prefix)
2325                } else if let Some(arn) = source_arn {
2326                    a.event_source_arn == arn
2327                } else if let Some(s) = archive_state {
2328                    a.state == s
2329                } else {
2330                    true
2331                }
2332            })
2333            .skip(offset)
2334            .take(limit + 1)
2335            .map(|a| {
2336                json!({
2337                    "ArchiveName": a.name,
2338                    "CreationTime": a.creation_time.timestamp() as f64,
2339                    "EventCount": a.event_count,
2340                    "EventSourceArn": a.event_source_arn,
2341                    "RetentionDays": a.retention_days,
2342                    "SizeBytes": a.size_bytes,
2343                    "State": a.state,
2344                })
2345            })
2346            .collect();
2347
2348        let has_more = filtered.len() > limit;
2349        let archives: Vec<Value> = filtered.into_iter().take(limit).collect();
2350        let mut resp = json!({ "Archives": archives });
2351        if has_more {
2352            resp["NextToken"] = json!((offset + limit).to_string());
2353        }
2354
2355        Ok(json_resp(resp))
2356    }
2357
2358    fn update_archive(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2359        let body = parse_body(req);
2360        validate_required("ArchiveName", &body["ArchiveName"])?;
2361        let name = body["ArchiveName"]
2362            .as_str()
2363            .ok_or_else(|| missing("ArchiveName"))?;
2364        validate_string_length("archiveName", name, 1, 48)?;
2365        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2366        validate_optional_string_length("eventPattern", body["EventPattern"].as_str(), 0, 4096)?;
2367        if let Some(rd) = body["RetentionDays"].as_i64() {
2368            validate_range_i64("retentionDays", rd, 0, i64::MAX)?;
2369        }
2370
2371        // Validate event pattern if provided
2372        if let Some(pattern) = body["EventPattern"].as_str() {
2373            validate_event_pattern(pattern)?;
2374        }
2375
2376        let mut state = self.state.write();
2377        let archive = state.archives.get_mut(name).ok_or_else(|| {
2378            AwsServiceError::aws_error(
2379                StatusCode::BAD_REQUEST,
2380                "ResourceNotFoundException",
2381                format!("Archive {name} does not exist."),
2382            )
2383        })?;
2384
2385        if let Some(desc) = body["Description"].as_str() {
2386            archive.description = Some(desc.to_string());
2387        }
2388        if let Some(pattern) = body["EventPattern"].as_str() {
2389            archive.event_pattern = Some(pattern.to_string());
2390        }
2391        if let Some(days) = body["RetentionDays"].as_i64() {
2392            archive.retention_days = days;
2393        }
2394
2395        Ok(json_resp(json!({
2396            "ArchiveArn": archive.arn,
2397            "CreationTime": archive.creation_time.timestamp() as f64,
2398            "State": archive.state,
2399        })))
2400    }
2401
2402    fn delete_archive(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2403        let body = parse_body(req);
2404        validate_required("ArchiveName", &body["ArchiveName"])?;
2405        let name = body["ArchiveName"]
2406            .as_str()
2407            .ok_or_else(|| missing("ArchiveName"))?;
2408        validate_string_length("archiveName", name, 1, 48)?;
2409
2410        let mut state = self.state.write();
2411        if !state.archives.contains_key(name) {
2412            return Err(AwsServiceError::aws_error(
2413                StatusCode::BAD_REQUEST,
2414                "ResourceNotFoundException",
2415                format!("Archive {name} does not exist."),
2416            ));
2417        }
2418
2419        state.archives.remove(name);
2420
2421        // Remove the archive rule
2422        let rule_name = format!("Events-Archive-{name}");
2423        state.rules.retain(|k, _| k.1 != rule_name);
2424
2425        Ok(json_resp(json!({})))
2426    }
2427
2428    // ─── Connection Operations ──────────────────────────────────────────
2429
2430    fn create_connection(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2431        let body = parse_body(req);
2432        validate_required("Name", &body["Name"])?;
2433        let name = body["Name"]
2434            .as_str()
2435            .ok_or_else(|| missing("Name"))?
2436            .to_string();
2437        validate_string_length("name", &name, 1, 64)?;
2438        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2439        validate_required("AuthorizationType", &body["AuthorizationType"])?;
2440        let description = body["Description"].as_str().map(|s| s.to_string());
2441        let auth_type = body["AuthorizationType"]
2442            .as_str()
2443            .ok_or_else(|| missing("AuthorizationType"))?
2444            .to_string();
2445        validate_enum(
2446            "authorizationType",
2447            &auth_type,
2448            &["BASIC", "OAUTH_CLIENT_CREDENTIALS", "API_KEY"],
2449        )?;
2450        validate_optional_string_length(
2451            "kmsKeyIdentifier",
2452            body["KmsKeyIdentifier"].as_str(),
2453            0,
2454            2048,
2455        )?;
2456        validate_required("AuthParameters", &body["AuthParameters"])?;
2457        let auth_params = body["AuthParameters"].clone();
2458
2459        let mut state = self.state.write();
2460        let now = Utc::now();
2461        let conn_uuid = uuid::Uuid::new_v4();
2462        let arn = format!(
2463            "arn:aws:events:{}:{}:connection/{}/{}",
2464            req.region, state.account_id, name, conn_uuid
2465        );
2466        let secret_arn = format!(
2467            "arn:aws:secretsmanager:{}:{}:secret:events!connection/{}/{}",
2468            req.region, state.account_id, name, conn_uuid
2469        );
2470
2471        let conn = Connection {
2472            name: name.clone(),
2473            arn: arn.clone(),
2474            description,
2475            authorization_type: auth_type.clone(),
2476            auth_parameters: auth_params,
2477            connection_state: "AUTHORIZED".to_string(),
2478            secret_arn: secret_arn.clone(),
2479            creation_time: now,
2480            last_modified_time: now,
2481            last_authorized_time: now,
2482        };
2483        state.connections.insert(name, conn);
2484
2485        Ok(json_resp(json!({
2486            "ConnectionArn": arn,
2487            "ConnectionState": "AUTHORIZED",
2488            "CreationTime": now.timestamp() as f64,
2489            "LastModifiedTime": now.timestamp() as f64,
2490        })))
2491    }
2492
2493    fn describe_connection(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2494        let body = parse_body(req);
2495        validate_required("Name", &body["Name"])?;
2496        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
2497        validate_string_length("name", name, 1, 64)?;
2498
2499        let state = self.state.read();
2500        let conn = state.connections.get(name).ok_or_else(|| {
2501            AwsServiceError::aws_error(
2502                StatusCode::BAD_REQUEST,
2503                "ResourceNotFoundException",
2504                format!("Connection '{name}' does not exist."),
2505            )
2506        })?;
2507
2508        // Build auth parameters response - strip secrets
2509        let auth_params_response =
2510            build_auth_params_response(&conn.authorization_type, &conn.auth_parameters);
2511
2512        let mut resp = json!({
2513            "ConnectionArn": conn.arn,
2514            "Name": conn.name,
2515            "AuthorizationType": conn.authorization_type,
2516            "AuthParameters": auth_params_response,
2517            "ConnectionState": conn.connection_state,
2518            "SecretArn": conn.secret_arn,
2519            "CreationTime": conn.creation_time.timestamp() as f64,
2520            "LastModifiedTime": conn.last_modified_time.timestamp() as f64,
2521            "LastAuthorizedTime": conn.last_authorized_time.timestamp() as f64,
2522        });
2523        if let Some(ref desc) = conn.description {
2524            resp["Description"] = json!(desc);
2525        }
2526
2527        Ok(json_resp(resp))
2528    }
2529
2530    fn list_connections(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2531        let body = parse_body(req);
2532        validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 64)?;
2533        validate_optional_enum(
2534            "connectionState",
2535            body["ConnectionState"].as_str(),
2536            &[
2537                "CREATING",
2538                "UPDATING",
2539                "DELETING",
2540                "AUTHORIZED",
2541                "DEAUTHORIZED",
2542                "AUTHORIZING",
2543                "DEAUTHORIZING",
2544                "ACTIVE",
2545                "FAILED_CONNECTIVITY",
2546            ],
2547        )?;
2548        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
2549        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
2550
2551        let name_prefix = body["NamePrefix"].as_str();
2552        let connection_state = body["ConnectionState"].as_str();
2553        let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
2554        let offset: usize = body["NextToken"]
2555            .as_str()
2556            .and_then(|t| t.parse().ok())
2557            .unwrap_or(0);
2558
2559        let state = self.state.read();
2560        let filtered: Vec<Value> = state
2561            .connections
2562            .values()
2563            .filter(|c| {
2564                if let Some(prefix) = name_prefix {
2565                    if !c.name.starts_with(prefix) {
2566                        return false;
2567                    }
2568                }
2569                if let Some(cs) = connection_state {
2570                    if c.connection_state != cs {
2571                        return false;
2572                    }
2573                }
2574                true
2575            })
2576            .skip(offset)
2577            .take(limit + 1)
2578            .map(|c| {
2579                json!({
2580                    "ConnectionArn": c.arn,
2581                    "Name": c.name,
2582                    "AuthorizationType": c.authorization_type,
2583                    "ConnectionState": c.connection_state,
2584                    "CreationTime": c.creation_time.timestamp() as f64,
2585                    "LastModifiedTime": c.last_modified_time.timestamp() as f64,
2586                    "LastAuthorizedTime": c.last_authorized_time.timestamp() as f64,
2587                })
2588            })
2589            .collect();
2590
2591        let has_more = filtered.len() > limit;
2592        let conns: Vec<Value> = filtered.into_iter().take(limit).collect();
2593        let mut resp = json!({ "Connections": conns });
2594        if has_more {
2595            resp["NextToken"] = json!((offset + limit).to_string());
2596        }
2597
2598        Ok(json_resp(resp))
2599    }
2600
2601    fn update_connection(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2602        let body = parse_body(req);
2603        validate_required("Name", &body["Name"])?;
2604        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
2605        validate_string_length("name", name, 1, 64)?;
2606        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2607        validate_optional_enum(
2608            "authorizationType",
2609            body["AuthorizationType"].as_str(),
2610            &["BASIC", "OAUTH_CLIENT_CREDENTIALS", "API_KEY"],
2611        )?;
2612
2613        let mut state = self.state.write();
2614        let conn = state.connections.get_mut(name).ok_or_else(|| {
2615            AwsServiceError::aws_error(
2616                StatusCode::BAD_REQUEST,
2617                "ResourceNotFoundException",
2618                format!("Connection '{name}' does not exist."),
2619            )
2620        })?;
2621
2622        if let Some(desc) = body["Description"].as_str() {
2623            conn.description = Some(desc.to_string());
2624        }
2625        if let Some(auth_type) = body["AuthorizationType"].as_str() {
2626            conn.authorization_type = auth_type.to_string();
2627        }
2628        if body.get("AuthParameters").is_some() {
2629            conn.auth_parameters = body["AuthParameters"].clone();
2630        }
2631        conn.last_modified_time = Utc::now();
2632
2633        Ok(json_resp(json!({
2634            "ConnectionArn": conn.arn,
2635            "ConnectionState": conn.connection_state,
2636            "CreationTime": conn.creation_time.timestamp() as f64,
2637            "LastModifiedTime": conn.last_modified_time.timestamp() as f64,
2638            "LastAuthorizedTime": conn.last_authorized_time.timestamp() as f64,
2639        })))
2640    }
2641
2642    fn delete_connection(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2643        let body = parse_body(req);
2644        validate_required("Name", &body["Name"])?;
2645        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
2646        validate_string_length("name", name, 1, 64)?;
2647
2648        let mut state = self.state.write();
2649        let conn = state.connections.remove(name).ok_or_else(|| {
2650            AwsServiceError::aws_error(
2651                StatusCode::BAD_REQUEST,
2652                "ResourceNotFoundException",
2653                format!("Connection '{name}' does not exist."),
2654            )
2655        })?;
2656
2657        Ok(json_resp(json!({
2658            "ConnectionArn": conn.arn,
2659            "ConnectionState": conn.connection_state,
2660            "CreationTime": conn.creation_time.timestamp() as f64,
2661            "LastModifiedTime": conn.last_modified_time.timestamp() as f64,
2662            "LastAuthorizedTime": conn.last_authorized_time.timestamp() as f64,
2663        })))
2664    }
2665
2666    // ─── API Destination Operations ─────────────────────────────────────
2667
2668    fn create_api_destination(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2669        let body = parse_body(req);
2670        validate_required("Name", &body["Name"])?;
2671        let name = body["Name"]
2672            .as_str()
2673            .ok_or_else(|| missing("Name"))?
2674            .to_string();
2675        validate_string_length("name", &name, 1, 64)?;
2676        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2677        validate_required("ConnectionArn", &body["ConnectionArn"])?;
2678        let description = body["Description"].as_str().map(|s| s.to_string());
2679        let connection_arn = body["ConnectionArn"]
2680            .as_str()
2681            .ok_or_else(|| missing("ConnectionArn"))?
2682            .to_string();
2683        validate_string_length("connectionArn", &connection_arn, 1, 1600)?;
2684        validate_required("InvocationEndpoint", &body["InvocationEndpoint"])?;
2685        let endpoint = body["InvocationEndpoint"]
2686            .as_str()
2687            .ok_or_else(|| missing("InvocationEndpoint"))?
2688            .to_string();
2689        validate_string_length("invocationEndpoint", &endpoint, 1, 2048)?;
2690        validate_required("HttpMethod", &body["HttpMethod"])?;
2691        let http_method = body["HttpMethod"]
2692            .as_str()
2693            .ok_or_else(|| missing("HttpMethod"))?
2694            .to_string();
2695        validate_enum(
2696            "httpMethod",
2697            &http_method,
2698            &["POST", "GET", "HEAD", "OPTIONS", "PUT", "PATCH", "DELETE"],
2699        )?;
2700        let rate_limit = body["InvocationRateLimitPerSecond"].as_i64();
2701        if let Some(r) = rate_limit {
2702            validate_range_i64("invocationRateLimitPerSecond", r, 1, i64::MAX)?;
2703        }
2704
2705        let mut state = self.state.write();
2706        let now = Utc::now();
2707        let dest_uuid = uuid::Uuid::new_v4();
2708        let arn = format!(
2709            "arn:aws:events:{}:{}:api-destination/{}/{}",
2710            req.region, state.account_id, name, dest_uuid
2711        );
2712
2713        let dest = ApiDestination {
2714            name: name.clone(),
2715            arn: arn.clone(),
2716            description,
2717            connection_arn,
2718            invocation_endpoint: endpoint,
2719            http_method,
2720            invocation_rate_limit_per_second: rate_limit,
2721            state: "ACTIVE".to_string(),
2722            creation_time: now,
2723            last_modified_time: now,
2724        };
2725        state.api_destinations.insert(name, dest);
2726
2727        Ok(json_resp(json!({
2728            "ApiDestinationArn": arn,
2729            "ApiDestinationState": "ACTIVE",
2730            "CreationTime": now.timestamp() as f64,
2731            "LastModifiedTime": now.timestamp() as f64,
2732        })))
2733    }
2734
2735    fn describe_api_destination(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2736        let body = parse_body(req);
2737        validate_required("Name", &body["Name"])?;
2738        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
2739        validate_string_length("name", name, 1, 64)?;
2740
2741        let state = self.state.read();
2742        let dest = state.api_destinations.get(name).ok_or_else(|| {
2743            AwsServiceError::aws_error(
2744                StatusCode::BAD_REQUEST,
2745                "ResourceNotFoundException",
2746                format!("An api-destination '{name}' does not exist."),
2747            )
2748        })?;
2749
2750        let mut resp = json!({
2751            "ApiDestinationArn": dest.arn,
2752            "Name": dest.name,
2753            "ConnectionArn": dest.connection_arn,
2754            "InvocationEndpoint": dest.invocation_endpoint,
2755            "HttpMethod": dest.http_method,
2756            "ApiDestinationState": dest.state,
2757            "CreationTime": dest.creation_time.timestamp() as f64,
2758            "LastModifiedTime": dest.last_modified_time.timestamp() as f64,
2759        });
2760        if let Some(ref desc) = dest.description {
2761            resp["Description"] = json!(desc);
2762        }
2763        if let Some(rate) = dest.invocation_rate_limit_per_second {
2764            resp["InvocationRateLimitPerSecond"] = json!(rate);
2765        }
2766
2767        Ok(json_resp(resp))
2768    }
2769
2770    fn list_api_destinations(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2771        let body = parse_body(req);
2772        validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 64)?;
2773        validate_optional_string_length("connectionArn", body["ConnectionArn"].as_str(), 1, 1600)?;
2774        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
2775        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
2776
2777        let name_prefix = body["NamePrefix"].as_str();
2778        let connection_arn = body["ConnectionArn"].as_str();
2779        let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
2780        let offset: usize = body["NextToken"]
2781            .as_str()
2782            .and_then(|t| t.parse().ok())
2783            .unwrap_or(0);
2784
2785        let state = self.state.read();
2786        let filtered: Vec<Value> = state
2787            .api_destinations
2788            .values()
2789            .filter(|d| {
2790                if let Some(prefix) = name_prefix {
2791                    if !d.name.starts_with(prefix) {
2792                        return false;
2793                    }
2794                }
2795                if let Some(arn) = connection_arn {
2796                    if d.connection_arn != arn {
2797                        return false;
2798                    }
2799                }
2800                true
2801            })
2802            .skip(offset)
2803            .take(limit + 1)
2804            .map(|d| {
2805                let mut obj = json!({
2806                    "ApiDestinationArn": d.arn,
2807                    "Name": d.name,
2808                    "ConnectionArn": d.connection_arn,
2809                    "InvocationEndpoint": d.invocation_endpoint,
2810                    "HttpMethod": d.http_method,
2811                    "ApiDestinationState": d.state,
2812                    "CreationTime": d.creation_time.timestamp() as f64,
2813                    "LastModifiedTime": d.last_modified_time.timestamp() as f64,
2814                });
2815                if let Some(rate) = d.invocation_rate_limit_per_second {
2816                    obj["InvocationRateLimitPerSecond"] = json!(rate);
2817                }
2818                obj
2819            })
2820            .collect();
2821
2822        let has_more = filtered.len() > limit;
2823        let dests: Vec<Value> = filtered.into_iter().take(limit).collect();
2824        let mut resp = json!({ "ApiDestinations": dests });
2825        if has_more {
2826            resp["NextToken"] = json!((offset + limit).to_string());
2827        }
2828
2829        Ok(json_resp(resp))
2830    }
2831
2832    fn update_api_destination(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2833        let body = parse_body(req);
2834        validate_required("Name", &body["Name"])?;
2835        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
2836        validate_string_length("name", name, 1, 64)?;
2837        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2838        validate_optional_string_length("connectionArn", body["ConnectionArn"].as_str(), 1, 1600)?;
2839        validate_optional_string_length(
2840            "invocationEndpoint",
2841            body["InvocationEndpoint"].as_str(),
2842            1,
2843            2048,
2844        )?;
2845        validate_optional_enum(
2846            "httpMethod",
2847            body["HttpMethod"].as_str(),
2848            &["POST", "GET", "HEAD", "OPTIONS", "PUT", "PATCH", "DELETE"],
2849        )?;
2850        if let Some(r) = body["InvocationRateLimitPerSecond"].as_i64() {
2851            validate_range_i64("invocationRateLimitPerSecond", r, 1, i64::MAX)?;
2852        }
2853
2854        let mut state = self.state.write();
2855        let dest = state.api_destinations.get_mut(name).ok_or_else(|| {
2856            AwsServiceError::aws_error(
2857                StatusCode::BAD_REQUEST,
2858                "ResourceNotFoundException",
2859                format!("An api-destination '{name}' does not exist."),
2860            )
2861        })?;
2862
2863        if let Some(desc) = body["Description"].as_str() {
2864            dest.description = Some(desc.to_string());
2865        }
2866        if let Some(endpoint) = body["InvocationEndpoint"].as_str() {
2867            dest.invocation_endpoint = endpoint.to_string();
2868        }
2869        if let Some(method) = body["HttpMethod"].as_str() {
2870            dest.http_method = method.to_string();
2871        }
2872        if let Some(rate) = body["InvocationRateLimitPerSecond"].as_i64() {
2873            dest.invocation_rate_limit_per_second = Some(rate);
2874        }
2875        if let Some(conn) = body["ConnectionArn"].as_str() {
2876            dest.connection_arn = conn.to_string();
2877        }
2878        dest.last_modified_time = Utc::now();
2879
2880        Ok(json_resp(json!({
2881            "ApiDestinationArn": dest.arn,
2882            "ApiDestinationState": dest.state,
2883            "CreationTime": dest.creation_time.timestamp() as f64,
2884            "LastModifiedTime": dest.last_modified_time.timestamp() as f64,
2885        })))
2886    }
2887
2888    fn delete_api_destination(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2889        let body = parse_body(req);
2890        validate_required("Name", &body["Name"])?;
2891        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
2892        validate_string_length("name", name, 1, 64)?;
2893
2894        let mut state = self.state.write();
2895        if !state.api_destinations.contains_key(name) {
2896            return Err(AwsServiceError::aws_error(
2897                StatusCode::BAD_REQUEST,
2898                "ResourceNotFoundException",
2899                format!("An api-destination '{name}' does not exist."),
2900            ));
2901        }
2902        state.api_destinations.remove(name);
2903
2904        Ok(json_resp(json!({})))
2905    }
2906
2907    // ─── Replay Operations ──────────────────────────────────────────────
2908
2909    fn start_replay(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2910        let body = parse_body(req);
2911        validate_required("ReplayName", &body["ReplayName"])?;
2912        let name = body["ReplayName"]
2913            .as_str()
2914            .ok_or_else(|| missing("ReplayName"))?
2915            .to_string();
2916        validate_string_length("replayName", &name, 1, 64)?;
2917        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
2918        validate_required("EventSourceArn", &body["EventSourceArn"])?;
2919        let description = body["Description"].as_str().map(|s| s.to_string());
2920        let event_source_arn = body["EventSourceArn"]
2921            .as_str()
2922            .ok_or_else(|| missing("EventSourceArn"))?
2923            .to_string();
2924        validate_string_length("eventSourceArn", &event_source_arn, 1, 1600)?;
2925        validate_required("EventStartTime", &body["EventStartTime"])?;
2926        validate_required("EventEndTime", &body["EventEndTime"])?;
2927        validate_required("Destination", &body["Destination"])?;
2928        let destination = body["Destination"].clone();
2929        let event_start_time_f = body["EventStartTime"].as_f64();
2930        let event_end_time_f = body["EventEndTime"].as_f64();
2931
2932        let event_start_time = event_start_time_f
2933            .and_then(|f| DateTime::from_timestamp(f as i64, 0))
2934            .unwrap_or_else(Utc::now);
2935        let event_end_time = event_end_time_f
2936            .and_then(|f| DateTime::from_timestamp(f as i64, 0))
2937            .unwrap_or_else(Utc::now);
2938
2939        // Validate destination ARN
2940        let dest_arn = destination["Arn"].as_str().unwrap_or("");
2941        if !dest_arn.contains(":event-bus/") {
2942            return Err(AwsServiceError::aws_error(
2943                StatusCode::BAD_REQUEST,
2944                "ValidationException",
2945                "Parameter Destination.Arn is not valid. Reason: Must contain an event bus ARN.",
2946            ));
2947        }
2948
2949        let mut state = self.state.write();
2950
2951        // Validate event bus exists
2952        let bus_name = state.resolve_bus_name(dest_arn);
2953        if !state.buses.contains_key(&bus_name) {
2954            return Err(AwsServiceError::aws_error(
2955                StatusCode::BAD_REQUEST,
2956                "ResourceNotFoundException",
2957                format!("Event bus {bus_name} does not exist."),
2958            ));
2959        }
2960
2961        // Validate archive exists
2962        let archive_name = event_source_arn
2963            .rsplit_once("archive/")
2964            .map(|(_, n)| n.to_string())
2965            .unwrap_or_default();
2966        if !state.archives.contains_key(&archive_name) {
2967            return Err(AwsServiceError::aws_error(
2968                StatusCode::BAD_REQUEST,
2969                "ValidationException",
2970                format!(
2971                    "Parameter EventSourceArn is not valid. Reason: Archive {archive_name} does not exist."
2972                ),
2973            ));
2974        }
2975
2976        // Validate archive bus matches destination bus
2977        let archive = state.archives.get(&archive_name).unwrap();
2978        let archive_bus = state.resolve_bus_name(&archive.event_source_arn);
2979        if archive_bus != bus_name {
2980            return Err(AwsServiceError::aws_error(
2981                StatusCode::BAD_REQUEST,
2982                "ValidationException",
2983                "Parameter Destination.Arn is not valid. Reason: Cross event bus replay is not permitted.",
2984            ));
2985        }
2986
2987        // Validate end time after start time
2988        if event_end_time <= event_start_time {
2989            return Err(AwsServiceError::aws_error(
2990                StatusCode::BAD_REQUEST,
2991                "ValidationException",
2992                "Parameter EventEndTime is not valid. Reason: EventStartTime must be before EventEndTime.",
2993            ));
2994        }
2995
2996        // Check duplicate
2997        if state.replays.contains_key(&name) {
2998            return Err(AwsServiceError::aws_error(
2999                StatusCode::BAD_REQUEST,
3000                "ResourceAlreadyExistsException",
3001                format!("Replay {name} already exists."),
3002            ));
3003        }
3004
3005        let now = Utc::now();
3006        let arn = format!(
3007            "arn:aws:events:{}:{}:replay/{}",
3008            req.region, state.account_id, name
3009        );
3010
3011        let replay = Replay {
3012            name: name.clone(),
3013            arn: arn.clone(),
3014            description,
3015            event_source_arn,
3016            destination,
3017            event_start_time,
3018            event_end_time,
3019            state: "COMPLETED".to_string(), // Mock completes immediately
3020            replay_start_time: now,
3021            replay_end_time: Some(now),
3022        };
3023        state.replays.insert(name, replay);
3024
3025        Ok(json_resp(json!({
3026            "ReplayArn": arn,
3027            "ReplayStartTime": now.timestamp() as f64,
3028            "State": "STARTING",
3029        })))
3030    }
3031
3032    fn describe_replay(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3033        let body = parse_body(req);
3034        validate_required("ReplayName", &body["ReplayName"])?;
3035        let name = body["ReplayName"]
3036            .as_str()
3037            .ok_or_else(|| missing("ReplayName"))?;
3038        validate_string_length("replayName", name, 1, 64)?;
3039
3040        let state = self.state.read();
3041        let replay = state.replays.get(name).ok_or_else(|| {
3042            AwsServiceError::aws_error(
3043                StatusCode::BAD_REQUEST,
3044                "ResourceNotFoundException",
3045                format!("Replay {name} does not exist."),
3046            )
3047        })?;
3048
3049        let mut resp = json!({
3050            "Destination": replay.destination,
3051            "EventSourceArn": replay.event_source_arn,
3052            "EventStartTime": replay.event_start_time.timestamp() as f64,
3053            "EventEndTime": replay.event_end_time.timestamp() as f64,
3054            "ReplayArn": replay.arn,
3055            "ReplayName": replay.name,
3056            "ReplayStartTime": replay.replay_start_time.timestamp() as f64,
3057            "State": replay.state,
3058        });
3059        if let Some(ref desc) = replay.description {
3060            resp["Description"] = json!(desc);
3061        }
3062        if let Some(ref end) = replay.replay_end_time {
3063            resp["ReplayEndTime"] = json!(end.timestamp() as f64);
3064        }
3065
3066        Ok(json_resp(resp))
3067    }
3068
3069    fn list_replays(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3070        let body = parse_body(req);
3071        validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 64)?;
3072        validate_optional_string_length(
3073            "eventSourceArn",
3074            body["EventSourceArn"].as_str(),
3075            1,
3076            1600,
3077        )?;
3078        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
3079        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
3080        let name_prefix = body["NamePrefix"].as_str();
3081        let source_arn = body["EventSourceArn"].as_str();
3082        let replay_state = body["State"].as_str();
3083
3084        // Validate at most one filter
3085        let filter_count = [
3086            name_prefix.is_some(),
3087            source_arn.is_some(),
3088            replay_state.is_some(),
3089        ]
3090        .iter()
3091        .filter(|&&x| x)
3092        .count();
3093        if filter_count > 1 {
3094            return Err(AwsServiceError::aws_error(
3095                StatusCode::BAD_REQUEST,
3096                "ValidationException",
3097                "At most one filter is allowed for ListReplays. Use either : State, EventSourceArn, or NamePrefix.",
3098            ));
3099        }
3100
3101        // Validate state
3102        if let Some(s) = replay_state {
3103            let valid = [
3104                "CANCELLED",
3105                "CANCELLING",
3106                "COMPLETED",
3107                "FAILED",
3108                "RUNNING",
3109                "STARTING",
3110            ];
3111            if !valid.contains(&s) {
3112                return Err(AwsServiceError::aws_error(
3113                    StatusCode::BAD_REQUEST,
3114                    "ValidationException",
3115                    format!(
3116                        "1 validation error detected: Value '{}' at 'state' failed to satisfy constraint: Member must satisfy enum value set: [CANCELLED, CANCELLING, COMPLETED, FAILED, RUNNING, STARTING]",
3117                        s
3118                    ),
3119                ));
3120            }
3121        }
3122
3123        let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
3124        let offset: usize = body["NextToken"]
3125            .as_str()
3126            .and_then(|t| t.parse().ok())
3127            .unwrap_or(0);
3128
3129        let state = self.state.read();
3130        let filtered: Vec<Value> = state
3131            .replays
3132            .values()
3133            .filter(|r| {
3134                if let Some(prefix) = name_prefix {
3135                    r.name.starts_with(prefix)
3136                } else if let Some(arn) = source_arn {
3137                    r.event_source_arn == arn
3138                } else if let Some(s) = replay_state {
3139                    r.state == s
3140                } else {
3141                    true
3142                }
3143            })
3144            .skip(offset)
3145            .take(limit + 1)
3146            .map(|r| {
3147                let mut obj = json!({
3148                    "EventSourceArn": r.event_source_arn,
3149                    "EventStartTime": r.event_start_time.timestamp() as f64,
3150                    "EventEndTime": r.event_end_time.timestamp() as f64,
3151                    "ReplayName": r.name,
3152                    "ReplayStartTime": r.replay_start_time.timestamp() as f64,
3153                    "State": r.state,
3154                });
3155                if let Some(ref end) = r.replay_end_time {
3156                    obj["ReplayEndTime"] = json!(end.timestamp() as f64);
3157                }
3158                obj
3159            })
3160            .collect();
3161
3162        let has_more = filtered.len() > limit;
3163        let replays: Vec<Value> = filtered.into_iter().take(limit).collect();
3164        let mut resp = json!({ "Replays": replays });
3165        if has_more {
3166            resp["NextToken"] = json!((offset + limit).to_string());
3167        }
3168
3169        Ok(json_resp(resp))
3170    }
3171
3172    fn cancel_replay(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3173        let body = parse_body(req);
3174        validate_required("ReplayName", &body["ReplayName"])?;
3175        let name = body["ReplayName"]
3176            .as_str()
3177            .ok_or_else(|| missing("ReplayName"))?;
3178        validate_string_length("replayName", name, 1, 64)?;
3179
3180        let mut state = self.state.write();
3181        let replay = state.replays.get_mut(name).ok_or_else(|| {
3182            AwsServiceError::aws_error(
3183                StatusCode::BAD_REQUEST,
3184                "ResourceNotFoundException",
3185                format!("Replay {name} does not exist."),
3186            )
3187        })?;
3188
3189        // Can only cancel STARTING or RUNNING replays (or COMPLETED in our mock)
3190        if replay.state == "CANCELLED" || replay.state == "CANCELLING" {
3191            return Err(AwsServiceError::aws_error(
3192                StatusCode::BAD_REQUEST,
3193                "IllegalStatusException",
3194                format!("Replay {name} is not in a valid state for this operation."),
3195            ));
3196        }
3197
3198        let arn = replay.arn.clone();
3199        replay.state = "CANCELLED".to_string();
3200
3201        Ok(json_resp(json!({
3202            "ReplayArn": arn,
3203            "State": "CANCELLING",
3204        })))
3205    }
3206}
3207
3208// ─── Tag Lookup Helpers ─────────────────────────────────────────────────
3209
3210fn find_tags_mut<'a>(
3211    state: &'a mut crate::state::EventBridgeState,
3212    arn: &str,
3213) -> Result<&'a mut HashMap<String, String>, AwsServiceError> {
3214    // Check buses
3215    for bus in state.buses.values_mut() {
3216        if bus.arn == arn {
3217            return Ok(&mut bus.tags);
3218        }
3219    }
3220    // Check rules
3221    for rule in state.rules.values_mut() {
3222        if rule.arn == arn {
3223            return Ok(&mut rule.tags);
3224        }
3225    }
3226
3227    // Parse ARN to give better error messages
3228    let error_msg = if arn.contains(":rule/") {
3229        // Extract rule name and bus from ARN
3230        let parts: Vec<&str> = arn.rsplitn(2, ":rule/").collect();
3231        if let Some(rule_path) = parts.first() {
3232            if let Some((bus, rule_name)) = rule_path.rsplit_once('/') {
3233                format!("Rule {rule_name} does not exist on EventBus {bus}.")
3234            } else {
3235                format!("Rule {} does not exist on EventBus default.", rule_path)
3236            }
3237        } else {
3238            format!("Resource {arn} not found.")
3239        }
3240    } else {
3241        format!("Resource {arn} not found.")
3242    };
3243
3244    Err(AwsServiceError::aws_error(
3245        StatusCode::BAD_REQUEST,
3246        "ResourceNotFoundException",
3247        error_msg,
3248    ))
3249}
3250
3251fn find_tags<'a>(
3252    state: &'a crate::state::EventBridgeState,
3253    arn: &str,
3254) -> Result<&'a HashMap<String, String>, AwsServiceError> {
3255    for bus in state.buses.values() {
3256        if bus.arn == arn {
3257            return Ok(&bus.tags);
3258        }
3259    }
3260    for rule in state.rules.values() {
3261        if rule.arn == arn {
3262            return Ok(&rule.tags);
3263        }
3264    }
3265
3266    let error_msg = if arn.contains(":rule/") {
3267        let parts: Vec<&str> = arn.rsplitn(2, ":rule/").collect();
3268        if let Some(rule_path) = parts.first() {
3269            if let Some((bus, rule_name)) = rule_path.rsplit_once('/') {
3270                format!("Rule {rule_name} does not exist on EventBus {bus}.")
3271            } else {
3272                format!("Rule {} does not exist on EventBus default.", rule_path)
3273            }
3274        } else {
3275            format!("Resource {arn} not found.")
3276        }
3277    } else {
3278        format!("Resource {arn} not found.")
3279    };
3280
3281    Err(AwsServiceError::aws_error(
3282        StatusCode::BAD_REQUEST,
3283        "ResourceNotFoundException",
3284        error_msg,
3285    ))
3286}
3287
3288// ─── Event Pattern Validation ────────────────────────────────────────
3289
3290fn validate_event_pattern(pattern: &str) -> Result<(), AwsServiceError> {
3291    let parsed: Value = serde_json::from_str(pattern).map_err(|_| {
3292        AwsServiceError::aws_error(
3293            StatusCode::BAD_REQUEST,
3294            "InvalidEventPatternException",
3295            "Event pattern is not valid. Reason: Invalid JSON",
3296        )
3297    })?;
3298
3299    validate_pattern_values(&parsed, "")?;
3300    Ok(())
3301}
3302
3303fn validate_pattern_values(value: &Value, path: &str) -> Result<(), AwsServiceError> {
3304    match value {
3305        Value::Object(obj) => {
3306            for (key, val) in obj {
3307                let new_path = if path.is_empty() {
3308                    key.clone()
3309                } else {
3310                    format!("{path}.{key}")
3311                };
3312                match val {
3313                    Value::Object(_) => validate_pattern_values(val, &new_path)?,
3314                    Value::Array(_) => {} // Arrays are fine at leaf level
3315                    _ => {
3316                        return Err(AwsServiceError::aws_error(
3317                            StatusCode::BAD_REQUEST,
3318                            "InvalidEventPatternException",
3319                            format!(
3320                                "Event pattern is not valid. Reason: '{}' must be an object or an array",
3321                                key
3322                            ),
3323                        ));
3324                    }
3325                }
3326            }
3327            Ok(())
3328        }
3329        _ => Ok(()),
3330    }
3331}
3332
3333// ─── Connection Auth Params Response Builder ────────────────────────
3334
3335fn build_auth_params_response(auth_type: &str, params: &Value) -> Value {
3336    match auth_type {
3337        "API_KEY" => {
3338            let mut resp = json!({});
3339            if let Some(api_key) = params.get("ApiKeyAuthParameters") {
3340                resp["ApiKeyAuthParameters"] = json!({
3341                    "ApiKeyName": api_key["ApiKeyName"],
3342                });
3343            }
3344            resp
3345        }
3346        "BASIC" => {
3347            let mut resp = json!({});
3348            if let Some(basic) = params.get("BasicAuthParameters") {
3349                resp["BasicAuthParameters"] = json!({
3350                    "Username": basic["Username"],
3351                });
3352            }
3353            resp
3354        }
3355        "OAUTH_CLIENT_CREDENTIALS" => {
3356            let mut resp = json!({});
3357            if let Some(oauth) = params.get("OAuthParameters") {
3358                resp["OAuthParameters"] = json!({
3359                    "AuthorizationEndpoint": oauth["AuthorizationEndpoint"],
3360                    "HttpMethod": oauth["HttpMethod"],
3361                    "ClientParameters": {
3362                        "ClientID": oauth.get("ClientParameters").and_then(|c| c.get("ClientID")),
3363                    },
3364                });
3365            }
3366            resp
3367        }
3368        _ => params.clone(),
3369    }
3370}
3371
3372// ─── Event Pattern Matching ─────────────────────────────────────────
3373
3374/// Match an event against an EventBridge event pattern.
3375fn matches_pattern(
3376    pattern_json: Option<&str>,
3377    source: &str,
3378    detail_type: &str,
3379    detail: &str,
3380    account: &str,
3381    region: &str,
3382    resources: &[String],
3383) -> bool {
3384    let pattern_json = match pattern_json {
3385        Some(p) => p,
3386        None => return true,
3387    };
3388
3389    let pattern: Value = match serde_json::from_str(pattern_json) {
3390        Ok(v) => v,
3391        Err(_) => return false,
3392    };
3393
3394    let pattern_obj = match pattern.as_object() {
3395        Some(o) => o,
3396        None => return false,
3397    };
3398
3399    let detail_value: Value = serde_json::from_str(detail).unwrap_or(json!({}));
3400    let event = json!({
3401        "source": source,
3402        "detail-type": detail_type,
3403        "detail": detail_value,
3404        "account": account,
3405        "region": region,
3406        "resources": resources,
3407    });
3408
3409    for (key, pattern_value) in pattern_obj {
3410        let event_value = &event[key];
3411        if !matches_value(pattern_value, event_value) {
3412            return false;
3413        }
3414    }
3415
3416    true
3417}
3418
3419fn matches_value(pattern: &Value, event_value: &Value) -> bool {
3420    match pattern {
3421        Value::Object(obj) => {
3422            for (key, sub_pattern) in obj {
3423                let sub_value = &event_value[key];
3424                if !matches_value(sub_pattern, sub_value) {
3425                    return false;
3426                }
3427            }
3428            true
3429        }
3430        Value::Array(arr) => arr.iter().any(|elem| matches_single(elem, event_value)),
3431        _ => false,
3432    }
3433}
3434
3435fn matches_single(pattern_elem: &Value, event_value: &Value) -> bool {
3436    match pattern_elem {
3437        Value::Object(obj) => {
3438            if let Some(prefix_val) = obj.get("prefix") {
3439                if let (Some(prefix), Some(actual)) = (prefix_val.as_str(), event_value.as_str()) {
3440                    return actual.starts_with(prefix);
3441                }
3442                return false;
3443            }
3444            if let Some(exists_val) = obj.get("exists") {
3445                let should_exist = exists_val.as_bool().unwrap_or(true);
3446                let does_exist = !event_value.is_null();
3447                return should_exist == does_exist;
3448            }
3449            if let Some(anything_but_val) = obj.get("anything-but") {
3450                return match anything_but_val {
3451                    Value::String(s) => event_value.as_str() != Some(s.as_str()),
3452                    Value::Array(arr) => !arr.iter().any(|v| values_equal(v, event_value)),
3453                    Value::Number(_) => event_value != anything_but_val,
3454                    _ => true,
3455                };
3456            }
3457            if let Some(numeric_val) = obj.get("numeric") {
3458                return matches_numeric(numeric_val, event_value);
3459            }
3460            false
3461        }
3462        _ => values_equal(pattern_elem, event_value),
3463    }
3464}
3465
3466fn matches_numeric(numeric_arr: &Value, event_value: &Value) -> bool {
3467    let arr = match numeric_arr.as_array() {
3468        Some(a) => a,
3469        None => return false,
3470    };
3471    let actual = match event_value.as_f64() {
3472        Some(n) => n,
3473        None => return false,
3474    };
3475    let mut i = 0;
3476    while i + 1 < arr.len() {
3477        let op = match arr[i].as_str() {
3478            Some(s) => s,
3479            None => return false,
3480        };
3481        let threshold = match arr[i + 1].as_f64() {
3482            Some(n) => n,
3483            None => return false,
3484        };
3485        let ok = match op {
3486            ">" => actual > threshold,
3487            ">=" => actual >= threshold,
3488            "<" => actual < threshold,
3489            "<=" => actual <= threshold,
3490            "=" => (actual - threshold).abs() < f64::EPSILON,
3491            _ => return false,
3492        };
3493        if !ok {
3494            return false;
3495        }
3496        i += 2;
3497    }
3498    true
3499}
3500
3501fn values_equal(a: &Value, b: &Value) -> bool {
3502    a == b
3503}
3504
3505/// Resolve a simple JSON path like `$.detail.name` against an event JSON value.
3506fn resolve_json_path(event: &Value, path: &str) -> Option<Value> {
3507    let path = path.strip_prefix('$').unwrap_or(path);
3508    let mut current = event;
3509    for segment in path.split('.') {
3510        if segment.is_empty() {
3511            continue;
3512        }
3513        current = current.get(segment)?;
3514    }
3515    Some(current.clone())
3516}
3517
3518/// Apply an EventBridge InputTransformer to an event.
3519fn apply_input_transformer(transformer: &Value, event: &Value) -> String {
3520    let input_paths_map = transformer
3521        .get("InputPathsMap")
3522        .and_then(|v| v.as_object())
3523        .cloned()
3524        .unwrap_or_default();
3525    let template = transformer
3526        .get("InputTemplate")
3527        .and_then(|v| v.as_str())
3528        .unwrap_or("")
3529        .to_string();
3530
3531    // Resolve all input paths
3532    let mut resolved: HashMap<String, Value> = HashMap::new();
3533    for (var_name, path_val) in &input_paths_map {
3534        if let Some(path_str) = path_val.as_str() {
3535            if let Some(val) = resolve_json_path(event, path_str) {
3536                resolved.insert(var_name.clone(), val);
3537            }
3538        }
3539    }
3540
3541    // Replace <varName> placeholders in template
3542    let mut result = template;
3543    for (var_name, val) in &resolved {
3544        let placeholder = format!("<{var_name}>");
3545        let replacement = match val {
3546            Value::String(s) => s.clone(),
3547            other => other.to_string(),
3548        };
3549        result = result.replace(&placeholder, &replacement);
3550    }
3551
3552    result
3553}
3554
3555fn missing(name: &str) -> AwsServiceError {
3556    AwsServiceError::aws_error(
3557        StatusCode::BAD_REQUEST,
3558        "ValidationException",
3559        format!("The request must contain the parameter {name}"),
3560    )
3561}
3562
3563/// Deliver an EventBridge event to CloudWatch Logs by writing a log event
3564/// to the appropriate log group and stream.
3565pub fn deliver_to_logs(
3566    logs_state: &SharedLogsState,
3567    log_group_arn: &str,
3568    payload: &str,
3569    timestamp: chrono::DateTime<chrono::Utc>,
3570) {
3571    // Extract log group name from ARN: arn:aws:logs:region:account:log-group:NAME
3572    // or just the name if it's not an ARN
3573    let group_name = if log_group_arn.contains(":log-group:") {
3574        log_group_arn
3575            .split(":log-group:")
3576            .nth(1)
3577            .unwrap_or(log_group_arn)
3578            .trim_end_matches(":*")
3579    } else {
3580        log_group_arn
3581    };
3582
3583    let stream_name = "events".to_string();
3584    let ts_millis = timestamp.timestamp_millis();
3585
3586    let mut state = logs_state.write();
3587    let region = state.region.clone();
3588    let account_id = state.account_id.clone();
3589
3590    // Auto-create log group and stream if they don't exist
3591    let group = state
3592        .log_groups
3593        .entry(group_name.to_string())
3594        .or_insert_with(|| fakecloud_logs::state::LogGroup {
3595            name: group_name.to_string(),
3596            arn: format!("arn:aws:logs:{region}:{account_id}:log-group:{group_name}"),
3597            creation_time: ts_millis,
3598            retention_in_days: None,
3599            kms_key_id: None,
3600            tags: HashMap::new(),
3601            log_streams: HashMap::new(),
3602            stored_bytes: 0,
3603            subscription_filters: Vec::new(),
3604            data_protection_policy: None,
3605            index_policies: Vec::new(),
3606            transformer: None,
3607            deletion_protection: false,
3608        });
3609
3610    let stream = group
3611        .log_streams
3612        .entry(stream_name.clone())
3613        .or_insert_with(|| fakecloud_logs::state::LogStream {
3614            name: stream_name,
3615            arn: format!("{}:log-stream:events", group.arn),
3616            creation_time: ts_millis,
3617            first_event_timestamp: None,
3618            last_event_timestamp: None,
3619            last_ingestion_time: None,
3620            upload_sequence_token: "1".to_string(),
3621            events: Vec::new(),
3622        });
3623
3624    stream.events.push(fakecloud_logs::state::LogEvent {
3625        timestamp: ts_millis,
3626        message: payload.to_string(),
3627        ingestion_time: ts_millis,
3628    });
3629    stream.last_event_timestamp = Some(ts_millis);
3630    stream.last_ingestion_time = Some(ts_millis);
3631    if stream.first_event_timestamp.is_none() {
3632        stream.first_event_timestamp = Some(ts_millis);
3633    }
3634}
3635
3636#[cfg(test)]
3637mod tests {
3638    use super::*;
3639
3640    /// Test helper that calls matches_pattern with default account/region/resources
3641    fn test_matches(
3642        pattern_json: Option<&str>,
3643        source: &str,
3644        detail_type: &str,
3645        detail: &str,
3646    ) -> bool {
3647        matches_pattern(
3648            pattern_json,
3649            source,
3650            detail_type,
3651            detail,
3652            "123456789012",
3653            "us-east-1",
3654            &[],
3655        )
3656    }
3657
3658    #[test]
3659    fn pattern_matches_source() {
3660        assert!(test_matches(
3661            Some(r#"{"source": ["my.app"]}"#),
3662            "my.app",
3663            "OrderPlaced",
3664            "{}"
3665        ));
3666        assert!(!test_matches(
3667            Some(r#"{"source": ["other.app"]}"#),
3668            "my.app",
3669            "OrderPlaced",
3670            "{}"
3671        ));
3672    }
3673
3674    #[test]
3675    fn pattern_matches_detail_type() {
3676        assert!(test_matches(
3677            Some(r#"{"detail-type": ["OrderPlaced"]}"#),
3678            "my.app",
3679            "OrderPlaced",
3680            "{}"
3681        ));
3682        assert!(!test_matches(
3683            Some(r#"{"detail-type": ["OrderShipped"]}"#),
3684            "my.app",
3685            "OrderPlaced",
3686            "{}"
3687        ));
3688    }
3689
3690    #[test]
3691    fn pattern_matches_detail_field() {
3692        assert!(test_matches(
3693            Some(r#"{"detail": {"status": ["ACTIVE"]}}"#),
3694            "my.app",
3695            "StatusChange",
3696            r#"{"status": "ACTIVE"}"#
3697        ));
3698        assert!(!test_matches(
3699            Some(r#"{"detail": {"status": ["ACTIVE"]}}"#),
3700            "my.app",
3701            "StatusChange",
3702            r#"{"status": "INACTIVE"}"#
3703        ));
3704    }
3705
3706    #[test]
3707    fn no_pattern_matches_everything() {
3708        assert!(test_matches(None, "any", "any", "{}"));
3709    }
3710
3711    #[test]
3712    fn combined_pattern() {
3713        let pattern = r#"{"source": ["orders"], "detail-type": ["OrderPlaced"]}"#;
3714        assert!(test_matches(Some(pattern), "orders", "OrderPlaced", "{}"));
3715        assert!(!test_matches(Some(pattern), "orders", "OrderShipped", "{}"));
3716        assert!(!test_matches(Some(pattern), "other", "OrderPlaced", "{}"));
3717    }
3718
3719    #[test]
3720    fn nested_detail_pattern() {
3721        let pattern = r#"{"detail": {"order": {"status": ["PLACED"]}}}"#;
3722        assert!(test_matches(
3723            Some(pattern),
3724            "my.app",
3725            "OrderEvent",
3726            r#"{"order": {"status": "PLACED", "id": "123"}}"#
3727        ));
3728        assert!(!test_matches(
3729            Some(pattern),
3730            "my.app",
3731            "OrderEvent",
3732            r#"{"order": {"status": "SHIPPED", "id": "123"}}"#
3733        ));
3734        assert!(!test_matches(
3735            Some(pattern),
3736            "my.app",
3737            "OrderEvent",
3738            r#"{"order": {"id": "123"}}"#
3739        ));
3740    }
3741
3742    #[test]
3743    fn deeply_nested_detail_pattern() {
3744        let pattern = r#"{"detail": {"a": {"b": {"c": ["deep"]}}}}"#;
3745        assert!(test_matches(
3746            Some(pattern),
3747            "src",
3748            "type",
3749            r#"{"a": {"b": {"c": "deep"}}}"#
3750        ));
3751        assert!(!test_matches(
3752            Some(pattern),
3753            "src",
3754            "type",
3755            r#"{"a": {"b": {"c": "shallow"}}}"#
3756        ));
3757    }
3758
3759    #[test]
3760    fn prefix_matcher() {
3761        let pattern = r#"{"source": [{"prefix": "com.myapp"}]}"#;
3762        assert!(test_matches(
3763            Some(pattern),
3764            "com.myapp.orders",
3765            "OrderPlaced",
3766            "{}"
3767        ));
3768        assert!(test_matches(
3769            Some(pattern),
3770            "com.myapp",
3771            "OrderPlaced",
3772            "{}"
3773        ));
3774        assert!(!test_matches(
3775            Some(pattern),
3776            "com.other",
3777            "OrderPlaced",
3778            "{}"
3779        ));
3780    }
3781
3782    #[test]
3783    fn prefix_matcher_in_detail() {
3784        let pattern = r#"{"detail": {"region": [{"prefix": "us-"}]}}"#;
3785        assert!(test_matches(
3786            Some(pattern),
3787            "src",
3788            "type",
3789            r#"{"region": "us-east-1"}"#
3790        ));
3791        assert!(!test_matches(
3792            Some(pattern),
3793            "src",
3794            "type",
3795            r#"{"region": "eu-west-1"}"#
3796        ));
3797    }
3798
3799    #[test]
3800    fn exists_matcher() {
3801        let pattern = r#"{"detail": {"error": [{"exists": true}]}}"#;
3802        assert!(test_matches(
3803            Some(pattern),
3804            "src",
3805            "type",
3806            r#"{"error": "something broke"}"#
3807        ));
3808        assert!(!test_matches(
3809            Some(pattern),
3810            "src",
3811            "type",
3812            r#"{"status": "ok"}"#
3813        ));
3814
3815        let pattern = r#"{"detail": {"error": [{"exists": false}]}}"#;
3816        assert!(test_matches(
3817            Some(pattern),
3818            "src",
3819            "type",
3820            r#"{"status": "ok"}"#
3821        ));
3822        assert!(!test_matches(
3823            Some(pattern),
3824            "src",
3825            "type",
3826            r#"{"error": "something broke"}"#
3827        ));
3828    }
3829
3830    #[test]
3831    fn anything_but_matcher() {
3832        let pattern = r#"{"source": [{"anything-but": "internal"}]}"#;
3833        assert!(test_matches(Some(pattern), "external", "Event", "{}"));
3834        assert!(!test_matches(Some(pattern), "internal", "Event", "{}"));
3835
3836        let pattern = r#"{"source": [{"anything-but": ["internal", "test"]}]}"#;
3837        assert!(test_matches(Some(pattern), "external", "Event", "{}"));
3838        assert!(!test_matches(Some(pattern), "internal", "Event", "{}"));
3839        assert!(!test_matches(Some(pattern), "test", "Event", "{}"));
3840    }
3841
3842    #[test]
3843    fn anything_but_in_detail() {
3844        let pattern = r#"{"detail": {"env": [{"anything-but": "prod"}]}}"#;
3845        assert!(test_matches(
3846            Some(pattern),
3847            "src",
3848            "type",
3849            r#"{"env": "staging"}"#
3850        ));
3851        assert!(!test_matches(
3852            Some(pattern),
3853            "src",
3854            "type",
3855            r#"{"env": "prod"}"#
3856        ));
3857    }
3858
3859    #[test]
3860    fn numeric_greater_than() {
3861        let pattern = r#"{"detail": {"count": [{"numeric": [">", 100]}]}}"#;
3862        assert!(test_matches(
3863            Some(pattern),
3864            "src",
3865            "type",
3866            r#"{"count": 150}"#
3867        ));
3868        assert!(!test_matches(
3869            Some(pattern),
3870            "src",
3871            "type",
3872            r#"{"count": 100}"#
3873        ));
3874        assert!(!test_matches(
3875            Some(pattern),
3876            "src",
3877            "type",
3878            r#"{"count": 50}"#
3879        ));
3880    }
3881
3882    #[test]
3883    fn numeric_less_than() {
3884        let pattern = r#"{"detail": {"count": [{"numeric": ["<", 10]}]}}"#;
3885        assert!(test_matches(
3886            Some(pattern),
3887            "src",
3888            "type",
3889            r#"{"count": 5}"#
3890        ));
3891        assert!(!test_matches(
3892            Some(pattern),
3893            "src",
3894            "type",
3895            r#"{"count": 10}"#
3896        ));
3897        assert!(!test_matches(
3898            Some(pattern),
3899            "src",
3900            "type",
3901            r#"{"count": 15}"#
3902        ));
3903    }
3904
3905    #[test]
3906    fn numeric_range() {
3907        let pattern = r#"{"detail": {"count": [{"numeric": [">=", 50, "<", 200]}]}}"#;
3908        assert!(test_matches(
3909            Some(pattern),
3910            "src",
3911            "type",
3912            r#"{"count": 50}"#
3913        ));
3914        assert!(test_matches(
3915            Some(pattern),
3916            "src",
3917            "type",
3918            r#"{"count": 100}"#
3919        ));
3920        assert!(!test_matches(
3921            Some(pattern),
3922            "src",
3923            "type",
3924            r#"{"count": 200}"#
3925        ));
3926        assert!(!test_matches(
3927            Some(pattern),
3928            "src",
3929            "type",
3930            r#"{"count": 49}"#
3931        ));
3932    }
3933
3934    #[test]
3935    fn mixed_matchers_and_literals() {
3936        let pattern = r#"{"source": ["exact.match", {"prefix": "com.myapp"}]}"#;
3937        assert!(test_matches(Some(pattern), "exact.match", "Event", "{}"));
3938        assert!(test_matches(
3939            Some(pattern),
3940            "com.myapp.orders",
3941            "Event",
3942            "{}"
3943        ));
3944        assert!(!test_matches(Some(pattern), "other.source", "Event", "{}"));
3945    }
3946
3947    // ---- list_connections / list_api_destinations filtering & pagination ----
3948
3949    use crate::state::EventBridgeState;
3950    use fakecloud_core::delivery::DeliveryBus;
3951    use parking_lot::RwLock;
3952
3953    fn make_service() -> EventBridgeService {
3954        let state = Arc::new(RwLock::new(EventBridgeState::new(
3955            "123456789012",
3956            "us-east-1",
3957        )));
3958        let delivery = Arc::new(DeliveryBus::new());
3959        EventBridgeService::new(state, delivery)
3960    }
3961
3962    fn make_request(action: &str, body: Value) -> AwsRequest {
3963        AwsRequest {
3964            service: "events".to_string(),
3965            action: action.to_string(),
3966            region: "us-east-1".to_string(),
3967            account_id: "123456789012".to_string(),
3968            request_id: "test-id".to_string(),
3969            headers: http::HeaderMap::new(),
3970            query_params: HashMap::new(),
3971            body: serde_json::to_vec(&body).unwrap().into(),
3972            path_segments: vec![],
3973            raw_path: "/".to_string(),
3974            method: http::Method::POST,
3975            is_query_protocol: false,
3976            access_key_id: None,
3977        }
3978    }
3979
3980    fn create_connection(svc: &EventBridgeService, name: &str) {
3981        let req = make_request(
3982            "CreateConnection",
3983            json!({
3984                "Name": name,
3985                "AuthorizationType": "API_KEY",
3986                "AuthParameters": {
3987                    "ApiKeyAuthParameters": {
3988                        "ApiKeyName": "x-api-key",
3989                        "ApiKeyValue": "secret"
3990                    }
3991                }
3992            }),
3993        );
3994        svc.create_connection(&req).unwrap();
3995    }
3996
3997    fn create_api_destination(svc: &EventBridgeService, name: &str, conn_name: &str) {
3998        let conn_arn_field = {
3999            let state = svc.state.read();
4000            state.connections.get(conn_name).unwrap().arn.clone()
4001        };
4002        let req = make_request(
4003            "CreateApiDestination",
4004            json!({
4005                "Name": name,
4006                "ConnectionArn": conn_arn_field,
4007                "InvocationEndpoint": "https://example.com",
4008                "HttpMethod": "POST"
4009            }),
4010        );
4011        svc.create_api_destination(&req).unwrap();
4012    }
4013
4014    // -- ListConnections tests --
4015
4016    #[test]
4017    fn list_connections_returns_all_by_default() {
4018        let svc = make_service();
4019        create_connection(&svc, "conn-alpha");
4020        create_connection(&svc, "conn-beta");
4021        create_connection(&svc, "conn-gamma");
4022
4023        let req = make_request("ListConnections", json!({}));
4024        let resp = svc.list_connections(&req).unwrap();
4025        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4026        assert_eq!(body["Connections"].as_array().unwrap().len(), 3);
4027        assert!(body["NextToken"].is_null());
4028    }
4029
4030    #[test]
4031    fn list_connections_name_prefix_filter() {
4032        let svc = make_service();
4033        create_connection(&svc, "prod-conn-1");
4034        create_connection(&svc, "prod-conn-2");
4035        create_connection(&svc, "dev-conn-1");
4036
4037        let req = make_request("ListConnections", json!({ "NamePrefix": "prod-" }));
4038        let resp = svc.list_connections(&req).unwrap();
4039        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4040        let names: Vec<&str> = body["Connections"]
4041            .as_array()
4042            .unwrap()
4043            .iter()
4044            .map(|c| c["Name"].as_str().unwrap())
4045            .collect();
4046        assert_eq!(names.len(), 2);
4047        assert!(names.iter().all(|n| n.starts_with("prod-")));
4048    }
4049
4050    #[test]
4051    fn list_connections_state_filter() {
4052        let svc = make_service();
4053        create_connection(&svc, "conn-a");
4054        create_connection(&svc, "conn-b");
4055
4056        // All connections start as AUTHORIZED; change one
4057        {
4058            let mut state = svc.state.write();
4059            state
4060                .connections
4061                .get_mut("conn-b")
4062                .unwrap()
4063                .connection_state = "DEAUTHORIZED".to_string();
4064        }
4065
4066        let req = make_request(
4067            "ListConnections",
4068            json!({ "ConnectionState": "AUTHORIZED" }),
4069        );
4070        let resp = svc.list_connections(&req).unwrap();
4071        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4072        let conns = body["Connections"].as_array().unwrap();
4073        assert_eq!(conns.len(), 1);
4074        assert_eq!(conns[0]["Name"].as_str().unwrap(), "conn-a");
4075    }
4076
4077    #[test]
4078    fn list_connections_pagination() {
4079        let svc = make_service();
4080        for i in 0..5 {
4081            create_connection(&svc, &format!("conn-{i:02}"));
4082        }
4083
4084        // First page: limit 2
4085        let req = make_request("ListConnections", json!({ "Limit": 2 }));
4086        let resp = svc.list_connections(&req).unwrap();
4087        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4088        assert_eq!(body["Connections"].as_array().unwrap().len(), 2);
4089        let token = body["NextToken"].as_str().unwrap();
4090        assert_eq!(token, "2");
4091
4092        // Second page
4093        let req = make_request("ListConnections", json!({ "Limit": 2, "NextToken": token }));
4094        let resp = svc.list_connections(&req).unwrap();
4095        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4096        assert_eq!(body["Connections"].as_array().unwrap().len(), 2);
4097        let token = body["NextToken"].as_str().unwrap();
4098        assert_eq!(token, "4");
4099
4100        // Third page (only 1 remaining)
4101        let req = make_request("ListConnections", json!({ "Limit": 2, "NextToken": token }));
4102        let resp = svc.list_connections(&req).unwrap();
4103        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4104        assert_eq!(body["Connections"].as_array().unwrap().len(), 1);
4105        assert!(body["NextToken"].is_null());
4106    }
4107
4108    #[test]
4109    fn list_connections_pagination_with_filter() {
4110        let svc = make_service();
4111        for i in 0..4 {
4112            create_connection(&svc, &format!("prod-{i:02}"));
4113        }
4114        create_connection(&svc, "dev-00");
4115
4116        let req = make_request(
4117            "ListConnections",
4118            json!({ "NamePrefix": "prod-", "Limit": 2 }),
4119        );
4120        let resp = svc.list_connections(&req).unwrap();
4121        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4122        assert_eq!(body["Connections"].as_array().unwrap().len(), 2);
4123        assert!(body["NextToken"].as_str().is_some());
4124    }
4125
4126    // -- ListApiDestinations tests --
4127
4128    #[test]
4129    fn list_api_destinations_returns_all_by_default() {
4130        let svc = make_service();
4131        create_connection(&svc, "my-conn");
4132        create_api_destination(&svc, "dest-alpha", "my-conn");
4133        create_api_destination(&svc, "dest-beta", "my-conn");
4134
4135        let req = make_request("ListApiDestinations", json!({}));
4136        let resp = svc.list_api_destinations(&req).unwrap();
4137        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4138        assert_eq!(body["ApiDestinations"].as_array().unwrap().len(), 2);
4139        assert!(body["NextToken"].is_null());
4140    }
4141
4142    #[test]
4143    fn list_api_destinations_name_prefix_filter() {
4144        let svc = make_service();
4145        create_connection(&svc, "my-conn");
4146        create_api_destination(&svc, "prod-dest-1", "my-conn");
4147        create_api_destination(&svc, "prod-dest-2", "my-conn");
4148        create_api_destination(&svc, "dev-dest-1", "my-conn");
4149
4150        let req = make_request("ListApiDestinations", json!({ "NamePrefix": "prod-" }));
4151        let resp = svc.list_api_destinations(&req).unwrap();
4152        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4153        let names: Vec<&str> = body["ApiDestinations"]
4154            .as_array()
4155            .unwrap()
4156            .iter()
4157            .map(|d| d["Name"].as_str().unwrap())
4158            .collect();
4159        assert_eq!(names.len(), 2);
4160        assert!(names.iter().all(|n| n.starts_with("prod-")));
4161    }
4162
4163    #[test]
4164    fn list_api_destinations_connection_arn_filter() {
4165        let svc = make_service();
4166        create_connection(&svc, "conn-a");
4167        create_connection(&svc, "conn-b");
4168        create_api_destination(&svc, "dest-1", "conn-a");
4169        create_api_destination(&svc, "dest-2", "conn-b");
4170        create_api_destination(&svc, "dest-3", "conn-a");
4171
4172        let conn_a_arn = {
4173            let state = svc.state.read();
4174            state.connections.get("conn-a").unwrap().arn.clone()
4175        };
4176
4177        let req = make_request(
4178            "ListApiDestinations",
4179            json!({ "ConnectionArn": conn_a_arn }),
4180        );
4181        let resp = svc.list_api_destinations(&req).unwrap();
4182        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4183        let names: Vec<&str> = body["ApiDestinations"]
4184            .as_array()
4185            .unwrap()
4186            .iter()
4187            .map(|d| d["Name"].as_str().unwrap())
4188            .collect();
4189        assert_eq!(names.len(), 2);
4190        assert!(names.contains(&"dest-1"));
4191        assert!(names.contains(&"dest-3"));
4192    }
4193
4194    #[test]
4195    fn list_api_destinations_pagination() {
4196        let svc = make_service();
4197        create_connection(&svc, "my-conn");
4198        for i in 0..5 {
4199            create_api_destination(&svc, &format!("dest-{i:02}"), "my-conn");
4200        }
4201
4202        // First page
4203        let req = make_request("ListApiDestinations", json!({ "Limit": 2 }));
4204        let resp = svc.list_api_destinations(&req).unwrap();
4205        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4206        assert_eq!(body["ApiDestinations"].as_array().unwrap().len(), 2);
4207        let token = body["NextToken"].as_str().unwrap();
4208        assert_eq!(token, "2");
4209
4210        // Second page
4211        let req = make_request(
4212            "ListApiDestinations",
4213            json!({ "Limit": 2, "NextToken": token }),
4214        );
4215        let resp = svc.list_api_destinations(&req).unwrap();
4216        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4217        assert_eq!(body["ApiDestinations"].as_array().unwrap().len(), 2);
4218        let token = body["NextToken"].as_str().unwrap();
4219        assert_eq!(token, "4");
4220
4221        // Last page
4222        let req = make_request(
4223            "ListApiDestinations",
4224            json!({ "Limit": 2, "NextToken": token }),
4225        );
4226        let resp = svc.list_api_destinations(&req).unwrap();
4227        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4228        assert_eq!(body["ApiDestinations"].as_array().unwrap().len(), 1);
4229        assert!(body["NextToken"].is_null());
4230    }
4231
4232    // -- ListEventBuses pagination tests --
4233
4234    fn create_event_bus(svc: &EventBridgeService, name: &str) {
4235        let req = make_request("CreateEventBus", json!({ "Name": name }));
4236        svc.create_event_bus(&req).unwrap();
4237    }
4238
4239    #[test]
4240    fn list_event_buses_pagination() {
4241        let svc = make_service();
4242        // "default" bus already exists, create 4 more
4243        for i in 0..4 {
4244            create_event_bus(&svc, &format!("bus-{i:02}"));
4245        }
4246
4247        // First page: limit 2
4248        let req = make_request("ListEventBuses", json!({ "Limit": 2 }));
4249        let resp = svc.list_event_buses(&req).unwrap();
4250        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4251        assert_eq!(body["EventBuses"].as_array().unwrap().len(), 2);
4252        let token = body["NextToken"].as_str().unwrap();
4253        assert_eq!(token, "2");
4254
4255        // Second page
4256        let req = make_request("ListEventBuses", json!({ "Limit": 2, "NextToken": token }));
4257        let resp = svc.list_event_buses(&req).unwrap();
4258        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4259        assert_eq!(body["EventBuses"].as_array().unwrap().len(), 2);
4260        let token = body["NextToken"].as_str().unwrap();
4261        assert_eq!(token, "4");
4262
4263        // Third page (only 1 remaining)
4264        let req = make_request("ListEventBuses", json!({ "Limit": 2, "NextToken": token }));
4265        let resp = svc.list_event_buses(&req).unwrap();
4266        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4267        assert_eq!(body["EventBuses"].as_array().unwrap().len(), 1);
4268        assert!(body["NextToken"].is_null());
4269    }
4270
4271    #[test]
4272    fn list_event_buses_no_pagination_returns_all() {
4273        let svc = make_service();
4274        create_event_bus(&svc, "bus-alpha");
4275        create_event_bus(&svc, "bus-beta");
4276
4277        let req = make_request("ListEventBuses", json!({}));
4278        let resp = svc.list_event_buses(&req).unwrap();
4279        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4280        // default + 2 custom = 3
4281        assert_eq!(body["EventBuses"].as_array().unwrap().len(), 3);
4282        assert!(body["NextToken"].is_null());
4283    }
4284
4285    // -- PutEvents EndpointId tests --
4286
4287    #[test]
4288    fn put_events_never_includes_endpoint_id_in_response() {
4289        let svc = make_service();
4290        // Even when EndpointId is provided in the request, it must not appear in the response
4291        let req = make_request(
4292            "PutEvents",
4293            json!({
4294                "EndpointId": "my-endpoint.abc123",
4295                "Entries": [{
4296                    "Source": "my.source",
4297                    "DetailType": "MyType",
4298                    "Detail": "{}",
4299                    "EventBusName": "default"
4300                }]
4301            }),
4302        );
4303        let resp = svc.put_events(&req).unwrap();
4304        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4305        assert!(
4306            !body.as_object().unwrap().contains_key("EndpointId"),
4307            "EndpointId should never be in the PutEvents response"
4308        );
4309        assert_eq!(body["FailedEntryCount"], 0);
4310    }
4311
4312    // -- ListArchives pagination tests --
4313
4314    fn create_archive(svc: &EventBridgeService, name: &str) {
4315        let req = make_request(
4316            "CreateArchive",
4317            json!({
4318                "ArchiveName": name,
4319                "EventSourceArn": "arn:aws:events:us-east-1:123456789012:event-bus/default"
4320            }),
4321        );
4322        svc.create_archive(&req).unwrap();
4323    }
4324
4325    #[test]
4326    fn list_archives_pagination() {
4327        let svc = make_service();
4328        for i in 0..5 {
4329            create_archive(&svc, &format!("archive-{i:02}"));
4330        }
4331
4332        // First page: limit 2
4333        let req = make_request("ListArchives", json!({ "Limit": 2 }));
4334        let resp = svc.list_archives(&req).unwrap();
4335        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4336        assert_eq!(body["Archives"].as_array().unwrap().len(), 2);
4337        let token = body["NextToken"].as_str().unwrap();
4338        assert_eq!(token, "2");
4339
4340        // Second page
4341        let req = make_request("ListArchives", json!({ "Limit": 2, "NextToken": token }));
4342        let resp = svc.list_archives(&req).unwrap();
4343        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4344        assert_eq!(body["Archives"].as_array().unwrap().len(), 2);
4345        let token = body["NextToken"].as_str().unwrap();
4346        assert_eq!(token, "4");
4347
4348        // Third page (only 1 remaining)
4349        let req = make_request("ListArchives", json!({ "Limit": 2, "NextToken": token }));
4350        let resp = svc.list_archives(&req).unwrap();
4351        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4352        assert_eq!(body["Archives"].as_array().unwrap().len(), 1);
4353        assert!(body["NextToken"].is_null());
4354    }
4355
4356    // -- ListReplays pagination tests --
4357
4358    fn create_replay(svc: &EventBridgeService, name: &str) {
4359        // Need an archive first for the replay's event source
4360        let archive_arn = {
4361            let state = svc.state.read();
4362            if state.archives.contains_key("replay-archive") {
4363                state.archives["replay-archive"].arn.clone()
4364            } else {
4365                drop(state);
4366                create_archive(svc, "replay-archive");
4367                svc.state.read().archives["replay-archive"].arn.clone()
4368            }
4369        };
4370        let req = make_request(
4371            "StartReplay",
4372            json!({
4373                "ReplayName": name,
4374                "EventSourceArn": archive_arn,
4375                "EventStartTime": 1000000.0,
4376                "EventEndTime": 2000000.0,
4377                "Destination": {
4378                    "Arn": "arn:aws:events:us-east-1:123456789012:event-bus/default"
4379                }
4380            }),
4381        );
4382        svc.start_replay(&req).unwrap();
4383    }
4384
4385    #[test]
4386    fn list_replays_pagination() {
4387        let svc = make_service();
4388        for i in 0..5 {
4389            create_replay(&svc, &format!("replay-{i:02}"));
4390        }
4391
4392        // First page: limit 2
4393        let req = make_request("ListReplays", json!({ "Limit": 2 }));
4394        let resp = svc.list_replays(&req).unwrap();
4395        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4396        assert_eq!(body["Replays"].as_array().unwrap().len(), 2);
4397        let token = body["NextToken"].as_str().unwrap();
4398        assert_eq!(token, "2");
4399
4400        // Second page
4401        let req = make_request("ListReplays", json!({ "Limit": 2, "NextToken": token }));
4402        let resp = svc.list_replays(&req).unwrap();
4403        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4404        assert_eq!(body["Replays"].as_array().unwrap().len(), 2);
4405        let token = body["NextToken"].as_str().unwrap();
4406        assert_eq!(token, "4");
4407
4408        // Third page (only 1 remaining)
4409        let req = make_request("ListReplays", json!({ "Limit": 2, "NextToken": token }));
4410        let resp = svc.list_replays(&req).unwrap();
4411        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4412        assert_eq!(body["Replays"].as_array().unwrap().len(), 1);
4413        assert!(body["NextToken"].is_null());
4414    }
4415
4416    #[test]
4417    fn list_event_buses_invalid_next_token_returns_error() {
4418        let svc = make_service();
4419
4420        let req = make_request("ListEventBuses", json!({ "NextToken": "not-a-number" }));
4421        let result = svc.list_event_buses(&req);
4422        assert!(
4423            result.is_err(),
4424            "non-numeric NextToken should return an error"
4425        );
4426    }
4427
4428    // ---- TestEventPattern tests ----
4429
4430    #[test]
4431    fn test_event_pattern_match() {
4432        let svc = make_service();
4433        let req = make_request(
4434            "TestEventPattern",
4435            json!({
4436                "EventPattern": r#"{"source": ["my.app"]}"#,
4437                "Event": r#"{"source": "my.app", "detail-type": "Test", "detail": {}}"#
4438            }),
4439        );
4440        let resp = svc.test_event_pattern(&req).unwrap();
4441        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4442        assert_eq!(body["Result"], true);
4443    }
4444
4445    #[test]
4446    fn test_event_pattern_no_match() {
4447        let svc = make_service();
4448        let req = make_request(
4449            "TestEventPattern",
4450            json!({
4451                "EventPattern": r#"{"source": ["other.app"]}"#,
4452                "Event": r#"{"source": "my.app", "detail-type": "Test", "detail": {}}"#
4453            }),
4454        );
4455        let resp = svc.test_event_pattern(&req).unwrap();
4456        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4457        assert_eq!(body["Result"], false);
4458    }
4459
4460    #[test]
4461    fn test_event_pattern_detail_match() {
4462        let svc = make_service();
4463        let req = make_request(
4464            "TestEventPattern",
4465            json!({
4466                "EventPattern": r#"{"detail": {"status": ["PLACED"]}}"#,
4467                "Event": r#"{"source": "my.app", "detail-type": "Order", "detail": {"status": "PLACED", "id": "123"}}"#
4468            }),
4469        );
4470        let resp = svc.test_event_pattern(&req).unwrap();
4471        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4472        assert_eq!(body["Result"], true);
4473    }
4474
4475    // ---- UpdateEventBus tests ----
4476
4477    #[test]
4478    fn update_event_bus_description() {
4479        let svc = make_service();
4480        create_event_bus(&svc, "my-bus");
4481
4482        let req = make_request(
4483            "UpdateEventBus",
4484            json!({ "Name": "my-bus", "Description": "Updated desc" }),
4485        );
4486        let resp = svc.update_event_bus(&req).unwrap();
4487        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4488        assert_eq!(body["Name"], "my-bus");
4489
4490        // Verify via describe
4491        let req = make_request("DescribeEventBus", json!({ "Name": "my-bus" }));
4492        let resp = svc.describe_event_bus(&req).unwrap();
4493        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4494        assert_eq!(body["Description"], "Updated desc");
4495    }
4496
4497    #[test]
4498    fn update_event_bus_not_found() {
4499        let svc = make_service();
4500        let req = make_request(
4501            "UpdateEventBus",
4502            json!({ "Name": "ghost-bus", "Description": "nope" }),
4503        );
4504        assert!(svc.update_event_bus(&req).is_err());
4505    }
4506
4507    // ---- Endpoint CRUD tests ----
4508
4509    fn create_endpoint_helper(svc: &EventBridgeService, name: &str) {
4510        let req = make_request(
4511            "CreateEndpoint",
4512            json!({
4513                "Name": name,
4514                "RoutingConfig": {
4515                    "FailoverConfig": {
4516                        "Primary": { "HealthCheck": "" },
4517                        "Secondary": { "Route": "us-west-2" }
4518                    }
4519                },
4520                "EventBuses": [
4521                    { "EventBusArn": "arn:aws:events:us-east-1:123456789012:event-bus/default" }
4522                ]
4523            }),
4524        );
4525        svc.create_endpoint(&req).unwrap();
4526    }
4527
4528    #[test]
4529    fn endpoint_create_describe_delete() {
4530        let svc = make_service();
4531        create_endpoint_helper(&svc, "my-endpoint");
4532
4533        // Describe
4534        let req = make_request("DescribeEndpoint", json!({ "Name": "my-endpoint" }));
4535        let resp = svc.describe_endpoint(&req).unwrap();
4536        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4537        assert_eq!(body["Name"], "my-endpoint");
4538        assert_eq!(body["State"], "ACTIVE");
4539        assert!(body["EndpointId"].as_str().unwrap().contains("my-endpoint"));
4540
4541        // Delete
4542        let req = make_request("DeleteEndpoint", json!({ "Name": "my-endpoint" }));
4543        svc.delete_endpoint(&req).unwrap();
4544
4545        // Verify gone
4546        let req = make_request("DescribeEndpoint", json!({ "Name": "my-endpoint" }));
4547        assert!(svc.describe_endpoint(&req).is_err());
4548    }
4549
4550    #[test]
4551    fn endpoint_list_and_update() {
4552        let svc = make_service();
4553        create_endpoint_helper(&svc, "ep-alpha");
4554        create_endpoint_helper(&svc, "ep-beta");
4555
4556        // List all
4557        let req = make_request("ListEndpoints", json!({}));
4558        let resp = svc.list_endpoints(&req).unwrap();
4559        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4560        assert_eq!(body["Endpoints"].as_array().unwrap().len(), 2);
4561
4562        // Update
4563        let req = make_request(
4564            "UpdateEndpoint",
4565            json!({ "Name": "ep-alpha", "Description": "updated" }),
4566        );
4567        let resp = svc.update_endpoint(&req).unwrap();
4568        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4569        assert_eq!(body["Name"], "ep-alpha");
4570
4571        // Verify description
4572        let req = make_request("DescribeEndpoint", json!({ "Name": "ep-alpha" }));
4573        let resp = svc.describe_endpoint(&req).unwrap();
4574        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4575        assert_eq!(body["Description"], "updated");
4576    }
4577
4578    #[test]
4579    fn endpoint_duplicate_fails() {
4580        let svc = make_service();
4581        create_endpoint_helper(&svc, "dup-ep");
4582        let req = make_request(
4583            "CreateEndpoint",
4584            json!({
4585                "Name": "dup-ep",
4586                "RoutingConfig": {},
4587                "EventBuses": []
4588            }),
4589        );
4590        assert!(svc.create_endpoint(&req).is_err());
4591    }
4592
4593    // ---- DeauthorizeConnection tests ----
4594
4595    #[test]
4596    fn deauthorize_connection_sets_state() {
4597        let svc = make_service();
4598        create_connection(&svc, "deauth-conn");
4599
4600        let req = make_request("DeauthorizeConnection", json!({ "Name": "deauth-conn" }));
4601        let resp = svc.deauthorize_connection(&req).unwrap();
4602        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4603        assert_eq!(body["ConnectionState"], "DEAUTHORIZING");
4604        assert!(body["ConnectionArn"]
4605            .as_str()
4606            .unwrap()
4607            .contains("deauth-conn"));
4608
4609        // Verify via describe
4610        let req = make_request("DescribeConnection", json!({ "Name": "deauth-conn" }));
4611        let resp = svc.describe_connection(&req).unwrap();
4612        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4613        assert_eq!(body["ConnectionState"], "DEAUTHORIZING");
4614    }
4615
4616    #[test]
4617    fn deauthorize_connection_not_found() {
4618        let svc = make_service();
4619        let req = make_request("DeauthorizeConnection", json!({ "Name": "ghost-conn" }));
4620        assert!(svc.deauthorize_connection(&req).is_err());
4621    }
4622
4623    // ---- Partner event source tests ----
4624
4625    #[test]
4626    fn partner_event_source_crud() {
4627        let svc = make_service();
4628
4629        // Create
4630        let req = make_request(
4631            "CreatePartnerEventSource",
4632            json!({ "Name": "partner/test", "Account": "123456789012" }),
4633        );
4634        svc.create_partner_event_source(&req).unwrap();
4635
4636        // Describe
4637        let req = make_request(
4638            "DescribePartnerEventSource",
4639            json!({ "Name": "partner/test" }),
4640        );
4641        let resp = svc.describe_partner_event_source(&req).unwrap();
4642        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4643        assert_eq!(body["Name"], "partner/test");
4644
4645        // List
4646        let req = make_request("ListPartnerEventSources", json!({}));
4647        let resp = svc.list_partner_event_sources(&req).unwrap();
4648        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4649        assert_eq!(body["PartnerEventSources"].as_array().unwrap().len(), 1);
4650
4651        // ListPartnerEventSourceAccounts
4652        let req = make_request(
4653            "ListPartnerEventSourceAccounts",
4654            json!({ "EventSourceName": "partner/test" }),
4655        );
4656        let resp = svc.list_partner_event_source_accounts(&req).unwrap();
4657        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4658        assert_eq!(
4659            body["PartnerEventSourceAccounts"].as_array().unwrap().len(),
4660            1
4661        );
4662
4663        // DescribeEventSource
4664        let req = make_request("DescribeEventSource", json!({ "Name": "partner/test" }));
4665        let resp = svc.describe_event_source(&req).unwrap();
4666        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4667        assert_eq!(body["Name"], "partner/test");
4668        assert_eq!(body["State"], "ACTIVE");
4669
4670        // ListEventSources
4671        let req = make_request("ListEventSources", json!({}));
4672        let resp = svc.list_event_sources(&req).unwrap();
4673        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4674        assert_eq!(body["EventSources"].as_array().unwrap().len(), 1);
4675
4676        // Delete
4677        let req = make_request(
4678            "DeletePartnerEventSource",
4679            json!({ "Name": "partner/test", "Account": "123456789012" }),
4680        );
4681        svc.delete_partner_event_source(&req).unwrap();
4682
4683        // Verify gone
4684        let req = make_request(
4685            "DescribePartnerEventSource",
4686            json!({ "Name": "partner/test" }),
4687        );
4688        assert!(svc.describe_partner_event_source(&req).is_err());
4689    }
4690
4691    #[test]
4692    fn activate_deactivate_event_source() {
4693        let svc = make_service();
4694
4695        // Create a partner event source first
4696        let req = make_request(
4697            "CreatePartnerEventSource",
4698            json!({ "Name": "aws.partner/test", "Account": "123456789012" }),
4699        );
4700        svc.create_partner_event_source(&req).unwrap();
4701
4702        // Deactivate it
4703        let req = make_request(
4704            "DeactivateEventSource",
4705            json!({ "Name": "aws.partner/test" }),
4706        );
4707        svc.deactivate_event_source(&req).unwrap();
4708        {
4709            let state = svc.state.read();
4710            assert_eq!(
4711                state.partner_event_sources["aws.partner/test"].state,
4712                "INACTIVE"
4713            );
4714        }
4715
4716        // Activate it
4717        let req = make_request("ActivateEventSource", json!({ "Name": "aws.partner/test" }));
4718        svc.activate_event_source(&req).unwrap();
4719        {
4720            let state = svc.state.read();
4721            assert_eq!(
4722                state.partner_event_sources["aws.partner/test"].state,
4723                "ACTIVE"
4724            );
4725        }
4726
4727        // Not-found returns error
4728        let req = make_request("ActivateEventSource", json!({ "Name": "nonexistent" }));
4729        assert!(svc.activate_event_source(&req).is_err());
4730
4731        let req = make_request("DeactivateEventSource", json!({ "Name": "nonexistent" }));
4732        assert!(svc.deactivate_event_source(&req).is_err());
4733    }
4734
4735    #[test]
4736    fn delete_partner_event_source_verifies_account() {
4737        let svc = make_service();
4738
4739        // Create a partner event source
4740        let req = make_request(
4741            "CreatePartnerEventSource",
4742            json!({ "Name": "aws.partner/test", "Account": "123456789012" }),
4743        );
4744        svc.create_partner_event_source(&req).unwrap();
4745
4746        // Deleting with wrong account fails
4747        let req = make_request(
4748            "DeletePartnerEventSource",
4749            json!({ "Name": "aws.partner/test", "Account": "999999999999" }),
4750        );
4751        assert!(svc.delete_partner_event_source(&req).is_err());
4752        // Source still exists
4753        assert!(svc
4754            .state
4755            .read()
4756            .partner_event_sources
4757            .contains_key("aws.partner/test"));
4758
4759        // Deleting with correct account succeeds
4760        let req = make_request(
4761            "DeletePartnerEventSource",
4762            json!({ "Name": "aws.partner/test", "Account": "123456789012" }),
4763        );
4764        svc.delete_partner_event_source(&req).unwrap();
4765        assert!(!svc
4766            .state
4767            .read()
4768            .partner_event_sources
4769            .contains_key("aws.partner/test"));
4770
4771        // Deleting non-existent source returns error
4772        let req = make_request(
4773            "DeletePartnerEventSource",
4774            json!({ "Name": "aws.partner/test", "Account": "123456789012" }),
4775        );
4776        assert!(svc.delete_partner_event_source(&req).is_err());
4777    }
4778
4779    #[test]
4780    fn put_partner_events() {
4781        let svc = make_service();
4782        let req = make_request(
4783            "PutPartnerEvents",
4784            json!({
4785                "Entries": [
4786                    { "Source": "partner.app", "DetailType": "Test", "Detail": "{}" }
4787                ]
4788            }),
4789        );
4790        let resp = svc.put_partner_events(&req).unwrap();
4791        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4792        assert_eq!(body["FailedEntryCount"], 0);
4793        assert_eq!(body["Entries"].as_array().unwrap().len(), 1);
4794        assert!(body["Entries"][0]["EventId"].as_str().is_some());
4795    }
4796}