Skip to main content

fakecloud_eventbridge/
service.rs

1use async_trait::async_trait;
2use chrono::{DateTime, Utc};
3use http::StatusCode;
4use serde_json::{json, Value};
5
6use std::collections::{BTreeMap, HashMap};
7use std::sync::Arc;
8
9use tokio::sync::Mutex as AsyncMutex;
10
11use fakecloud_aws::arn::Arn;
12use fakecloud_core::delivery::DeliveryBus;
13use fakecloud_core::pagination::paginate;
14use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
15use fakecloud_core::validation::*;
16use fakecloud_persistence::SnapshotStore;
17
18use fakecloud_lambda::runtime::ContainerRuntime;
19use fakecloud_lambda::{LambdaInvocation, SharedLambdaState};
20use fakecloud_logs::SharedLogsState;
21
22use crate::state::{
23    ApiDestination, Archive, Connection, Endpoint, EventBridgeSnapshot, EventBridgeState, EventBus,
24    EventRule, EventTarget, PartnerEventSource, PutEvent, Replay, SharedEventBridgeState,
25    EVENTBRIDGE_SNAPSHOT_SCHEMA_VERSION,
26};
27
28pub struct EventBridgeService {
29    state: SharedEventBridgeState,
30    delivery: Arc<DeliveryBus>,
31    lambda_state: Option<SharedLambdaState>,
32    logs_state: Option<SharedLogsState>,
33    container_runtime: Option<Arc<ContainerRuntime>>,
34    snapshot_store: Option<Arc<dyn SnapshotStore>>,
35    snapshot_lock: Arc<AsyncMutex<()>>,
36}
37
38impl EventBridgeService {
39    pub fn new(state: SharedEventBridgeState, delivery: Arc<DeliveryBus>) -> Self {
40        Self {
41            state,
42            delivery,
43            lambda_state: None,
44            logs_state: None,
45            container_runtime: None,
46            snapshot_store: None,
47            snapshot_lock: Arc::new(AsyncMutex::new(())),
48        }
49    }
50
51    pub fn with_lambda(mut self, lambda_state: SharedLambdaState) -> Self {
52        self.lambda_state = Some(lambda_state);
53        self
54    }
55
56    pub fn with_logs(mut self, logs_state: SharedLogsState) -> Self {
57        self.logs_state = Some(logs_state);
58        self
59    }
60
61    pub fn with_runtime(mut self, runtime: Arc<ContainerRuntime>) -> Self {
62        self.container_runtime = Some(runtime);
63        self
64    }
65
66    pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
67        self.snapshot_store = Some(store);
68        self
69    }
70
71    /// Persist current state as a snapshot. Held across the
72    /// clone-serialize-write sequence to prevent stale-last writes,
73    /// with serde + file I/O offloaded to the blocking pool.
74    async fn save_snapshot(&self) {
75        let Some(store) = self.snapshot_store.clone() else {
76            return;
77        };
78        let _guard = self.snapshot_lock.lock().await;
79        let snapshot = EventBridgeSnapshot {
80            schema_version: EVENTBRIDGE_SNAPSHOT_SCHEMA_VERSION,
81            accounts: Some(self.state.read().clone()),
82            state: None,
83        };
84        let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
85            let bytes = serde_json::to_vec(&snapshot)
86                .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
87            store.save(&bytes)
88        })
89        .await;
90        match join {
91            Ok(Ok(())) => {}
92            Ok(Err(err)) => tracing::error!(%err, "failed to write eventbridge snapshot"),
93            Err(err) => tracing::error!(%err, "eventbridge snapshot task panicked"),
94        }
95    }
96}
97
98#[async_trait]
99impl AwsService for EventBridgeService {
100    fn service_name(&self) -> &str {
101        "events"
102    }
103
104    async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
105        let mutates = is_mutating_action(req.action.as_str());
106        let result = match req.action.as_str() {
107            "CreateEventBus" => self.create_event_bus(&req),
108            "DeleteEventBus" => self.delete_event_bus(&req),
109            "ListEventBuses" => self.list_event_buses(&req),
110            "DescribeEventBus" => self.describe_event_bus(&req),
111            "PutRule" => self.put_rule(&req),
112            "DeleteRule" => self.delete_rule(&req),
113            "ListRules" => self.list_rules(&req),
114            "DescribeRule" => self.describe_rule(&req),
115            "EnableRule" => self.enable_rule(&req),
116            "DisableRule" => self.disable_rule(&req),
117            "PutTargets" => self.put_targets(&req),
118            "RemoveTargets" => self.remove_targets(&req),
119            "ListTargetsByRule" => self.list_targets_by_rule(&req),
120            "ListRuleNamesByTarget" => self.list_rule_names_by_target(&req),
121            "PutEvents" => self.put_events(&req),
122            "PutPermission" => self.put_permission(&req),
123            "RemovePermission" => self.remove_permission(&req),
124            "TagResource" => self.tag_resource(&req),
125            "UntagResource" => self.untag_resource(&req),
126            "ListTagsForResource" => self.list_tags_for_resource(&req),
127            "CreateArchive" => self.create_archive(&req),
128            "DescribeArchive" => self.describe_archive(&req),
129            "ListArchives" => self.list_archives(&req),
130            "UpdateArchive" => self.update_archive(&req),
131            "DeleteArchive" => self.delete_archive(&req),
132            "CreateConnection" => self.create_connection(&req),
133            "DescribeConnection" => self.describe_connection(&req),
134            "ListConnections" => self.list_connections(&req),
135            "UpdateConnection" => self.update_connection(&req),
136            "DeleteConnection" => self.delete_connection(&req),
137            "CreateApiDestination" => self.create_api_destination(&req),
138            "DescribeApiDestination" => self.describe_api_destination(&req),
139            "ListApiDestinations" => self.list_api_destinations(&req),
140            "UpdateApiDestination" => self.update_api_destination(&req),
141            "DeleteApiDestination" => self.delete_api_destination(&req),
142            "StartReplay" => self.start_replay(&req),
143            "DescribeReplay" => self.describe_replay(&req),
144            "ListReplays" => self.list_replays(&req),
145            "CancelReplay" => self.cancel_replay(&req),
146            "CreatePartnerEventSource" => self.create_partner_event_source(&req),
147            "DeletePartnerEventSource" => self.delete_partner_event_source(&req),
148            "DescribePartnerEventSource" => self.describe_partner_event_source(&req),
149            "ListPartnerEventSources" => self.list_partner_event_sources(&req),
150            "ListPartnerEventSourceAccounts" => self.list_partner_event_source_accounts(&req),
151            "ActivateEventSource" => self.activate_event_source(&req),
152            "DeactivateEventSource" => self.deactivate_event_source(&req),
153            "DescribeEventSource" => self.describe_event_source(&req),
154            "ListEventSources" => self.list_event_sources(&req),
155            "PutPartnerEvents" => self.put_partner_events(&req),
156            "TestEventPattern" => self.test_event_pattern(&req),
157            "UpdateEventBus" => self.update_event_bus(&req),
158            "CreateEndpoint" => self.create_endpoint(&req),
159            "DeleteEndpoint" => self.delete_endpoint(&req),
160            "DescribeEndpoint" => self.describe_endpoint(&req),
161            "ListEndpoints" => self.list_endpoints(&req),
162            "UpdateEndpoint" => self.update_endpoint(&req),
163            "DeauthorizeConnection" => self.deauthorize_connection(&req),
164            _ => Err(AwsServiceError::action_not_implemented(
165                "events",
166                &req.action,
167            )),
168        };
169        if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
170            self.save_snapshot().await;
171        }
172        result
173    }
174
175    fn supported_actions(&self) -> &[&str] {
176        &[
177            "CreateEventBus",
178            "DeleteEventBus",
179            "ListEventBuses",
180            "DescribeEventBus",
181            "PutRule",
182            "DeleteRule",
183            "ListRules",
184            "DescribeRule",
185            "EnableRule",
186            "DisableRule",
187            "PutTargets",
188            "RemoveTargets",
189            "ListTargetsByRule",
190            "ListRuleNamesByTarget",
191            "PutEvents",
192            "PutPermission",
193            "RemovePermission",
194            "TagResource",
195            "UntagResource",
196            "ListTagsForResource",
197            "CreateArchive",
198            "DescribeArchive",
199            "ListArchives",
200            "UpdateArchive",
201            "DeleteArchive",
202            "CreateConnection",
203            "DescribeConnection",
204            "ListConnections",
205            "UpdateConnection",
206            "DeleteConnection",
207            "CreateApiDestination",
208            "DescribeApiDestination",
209            "ListApiDestinations",
210            "UpdateApiDestination",
211            "DeleteApiDestination",
212            "StartReplay",
213            "DescribeReplay",
214            "ListReplays",
215            "CancelReplay",
216            "CreatePartnerEventSource",
217            "DeletePartnerEventSource",
218            "DescribePartnerEventSource",
219            "ListPartnerEventSources",
220            "ListPartnerEventSourceAccounts",
221            "ActivateEventSource",
222            "DeactivateEventSource",
223            "DescribeEventSource",
224            "ListEventSources",
225            "PutPartnerEvents",
226            "TestEventPattern",
227            "UpdateEventBus",
228            "CreateEndpoint",
229            "DeleteEndpoint",
230            "DescribeEndpoint",
231            "ListEndpoints",
232            "UpdateEndpoint",
233            "DeauthorizeConnection",
234        ]
235    }
236}
237
238// ─── Event Bus Operations ───────────────────────────────────────────
239impl EventBridgeService {
240    fn create_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
241        let body = req.json_body();
242        validate_required("Name", &body["Name"])?;
243        let name = body["Name"]
244            .as_str()
245            .ok_or_else(|| missing("Name"))?
246            .to_string();
247        validate_string_length("name", &name, 1, 256)?;
248        validate_optional_string_length(
249            "eventSourceName",
250            body["EventSourceName"].as_str(),
251            1,
252            256,
253        )?;
254        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
255        validate_optional_string_length(
256            "kmsKeyIdentifier",
257            body["KmsKeyIdentifier"].as_str(),
258            0,
259            2048,
260        )?;
261
262        // Validate name doesn't contain '/' (unless partner bus)
263        if name.contains('/') && !name.starts_with("aws.partner/") {
264            return Err(AwsServiceError::aws_error(
265                StatusCode::BAD_REQUEST,
266                "ValidationException",
267                "Event bus name must not contain '/'.",
268            ));
269        }
270
271        // Partner event bus validation
272        if name.starts_with("aws.partner/") {
273            let event_source = body["EventSourceName"].as_str().unwrap_or("");
274            let accounts_r = self.state.read();
275            let empty_r = EventBridgeState::new(&req.account_id, &req.region);
276            let state_r = accounts_r.get(&req.account_id).unwrap_or(&empty_r);
277            let has_source = state_r.partner_event_sources.contains_key(event_source);
278            drop(accounts_r);
279            if !has_source {
280                return Err(AwsServiceError::aws_error(
281                    StatusCode::BAD_REQUEST,
282                    "ResourceNotFoundException",
283                    format!("Event source {event_source} does not exist."),
284                ));
285            }
286        }
287
288        let mut accounts = self.state.write();
289        let state = accounts.get_or_create(&req.account_id);
290
291        if state.buses.contains_key(&name) {
292            return Err(AwsServiceError::aws_error(
293                StatusCode::BAD_REQUEST,
294                "ResourceAlreadyExistsException",
295                format!("Event bus {name} already exists."),
296            ));
297        }
298
299        let arn = format!(
300            "arn:aws:events:{}:{}:event-bus/{}",
301            req.region, state.account_id, name
302        );
303        let now = Utc::now();
304        let description = body["Description"].as_str().map(|s| s.to_string());
305        let kms_key_identifier = body["KmsKeyIdentifier"].as_str().map(|s| s.to_string());
306        let dead_letter_config = body.get("DeadLetterConfig").cloned();
307
308        let tags = parse_tags(&body);
309
310        let bus = EventBus {
311            name: name.clone(),
312            arn: arn.clone(),
313            tags,
314            policy: None,
315            description,
316            kms_key_identifier,
317            dead_letter_config,
318            creation_time: now,
319            last_modified_time: now,
320        };
321        state.buses.insert(name, bus);
322
323        Ok(AwsResponse::ok_json(json!({ "EventBusArn": arn })))
324    }
325
326    fn delete_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
327        let body = req.json_body();
328        validate_required("Name", &body["Name"])?;
329        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
330        validate_string_length("name", name, 1, 256)?;
331
332        if name == "default" {
333            return Err(AwsServiceError::aws_error(
334                StatusCode::BAD_REQUEST,
335                "ValidationException",
336                format!("Cannot delete event bus {name}."),
337            ));
338        }
339
340        let mut accounts = self.state.write();
341        let state = accounts.get_or_create(&req.account_id);
342        state.buses.remove(name);
343        state.rules.retain(|k, _| k.0 != name);
344
345        Ok(AwsResponse::ok_json(json!({})))
346    }
347
348    fn list_event_buses(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
349        let body = req.json_body();
350        validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 256)?;
351        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
352        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
353        let name_prefix = body["NamePrefix"].as_str();
354        let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
355        if let Some(t) = body["NextToken"].as_str() {
356            t.parse::<usize>().map_err(|_| {
357                AwsServiceError::aws_error(
358                    StatusCode::BAD_REQUEST,
359                    "InvalidNextTokenException",
360                    format!("Invalid NextToken value: '{t}'"),
361                )
362            })?;
363        }
364
365        let accounts = self.state.read();
366        let empty = EventBridgeState::new(&req.account_id, &req.region);
367        let state = accounts.get(&req.account_id).unwrap_or(&empty);
368        let filtered: Vec<&_> = state
369            .buses
370            .values()
371            .filter(|b| match name_prefix {
372                Some(prefix) => b.name.starts_with(prefix),
373                None => true,
374            })
375            .collect();
376
377        let (page, next_token) = paginate(&filtered, body["NextToken"].as_str(), limit);
378        let buses: Vec<Value> = page
379            .iter()
380            .map(|b| json!({ "Name": b.name, "Arn": b.arn }))
381            .collect();
382        let mut resp = json!({ "EventBuses": buses });
383        if let Some(token) = next_token {
384            resp["NextToken"] = json!(token);
385        }
386
387        Ok(AwsResponse::ok_json(resp))
388    }
389
390    fn describe_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
391        let body = req.json_body();
392        validate_optional_string_length("name", body["Name"].as_str(), 1, 1600)?;
393        let name = body["Name"].as_str().unwrap_or("default");
394
395        let accounts = self.state.read();
396        let empty = EventBridgeState::new(&req.account_id, &req.region);
397        let state = accounts.get(&req.account_id).unwrap_or(&empty);
398        let bus = state.buses.get(name).ok_or_else(|| {
399            AwsServiceError::aws_error(
400                StatusCode::BAD_REQUEST,
401                "ResourceNotFoundException",
402                format!("Event bus {name} does not exist."),
403            )
404        })?;
405
406        let mut resp = json!({
407            "Name": bus.name,
408            "Arn": bus.arn,
409            "CreationTime": bus.creation_time.timestamp() as f64,
410            "LastModifiedTime": bus.last_modified_time.timestamp() as f64,
411        });
412
413        if let Some(ref policy) = bus.policy {
414            resp["Policy"] = Value::String(serde_json::to_string(policy).unwrap());
415        }
416        if let Some(ref desc) = bus.description {
417            resp["Description"] = json!(desc);
418        }
419        if let Some(ref kms) = bus.kms_key_identifier {
420            resp["KmsKeyIdentifier"] = json!(kms);
421        }
422        if let Some(ref dlc) = bus.dead_letter_config {
423            resp["DeadLetterConfig"] = dlc.clone();
424        }
425
426        Ok(AwsResponse::ok_json(resp))
427    }
428
429    // ─── Permission Operations ──────────────────────────────────────────
430
431    fn put_permission(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
432        let body = req.json_body();
433        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 256)?;
434        validate_optional_string_length("action", body["Action"].as_str(), 1, 64)?;
435        validate_optional_string_length("principal", body["Principal"].as_str(), 1, 12)?;
436        validate_optional_string_length("statementId", body["StatementId"].as_str(), 1, 64)?;
437        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
438
439        let mut accounts = self.state.write();
440        let state = accounts.get_or_create(&req.account_id);
441
442        let bus = state.buses.get_mut(event_bus_name).ok_or_else(|| {
443            AwsServiceError::aws_error(
444                StatusCode::BAD_REQUEST,
445                "ResourceNotFoundException",
446                format!("Event bus {event_bus_name} does not exist."),
447            )
448        })?;
449
450        // Check if Policy is provided (new-style)
451        if let Some(policy_str) = body["Policy"].as_str() {
452            if let Ok(policy) = serde_json::from_str::<Value>(policy_str) {
453                bus.policy = Some(policy);
454                return Ok(AwsResponse::ok_json(json!({})));
455            }
456        }
457
458        // Old-style: Action, Principal, StatementId
459        let action = body["Action"].as_str().unwrap_or("");
460        let principal = body["Principal"].as_str().unwrap_or("");
461        let statement_id = body["StatementId"].as_str().unwrap_or("");
462
463        // Validate action
464        if action != "events:PutEvents" {
465            return Err(AwsServiceError::aws_error(
466                StatusCode::BAD_REQUEST,
467                "ValidationException",
468                "Provided value in parameter 'action' is not supported.",
469            ));
470        }
471
472        let statement = json!({
473            "Sid": statement_id,
474            "Effect": "Allow",
475            "Principal": { "AWS": Arn::global("iam", principal, "root").to_string() },
476            "Action": action,
477            "Resource": bus.arn,
478        });
479
480        let policy = bus.policy.get_or_insert_with(|| {
481            json!({
482                "Version": "2012-10-17",
483                "Statement": [],
484            })
485        });
486
487        if let Some(stmts) = policy["Statement"].as_array_mut() {
488            stmts.push(statement);
489        }
490
491        Ok(AwsResponse::ok_json(json!({})))
492    }
493
494    fn remove_permission(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
495        let body = req.json_body();
496        validate_optional_string_length("statementId", body["StatementId"].as_str(), 1, 64)?;
497        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 256)?;
498        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
499        let statement_id = body["StatementId"].as_str().unwrap_or("");
500        let remove_all = body["RemoveAllPermissions"].as_bool().unwrap_or(false);
501
502        let mut accounts = self.state.write();
503        let state = accounts.get_or_create(&req.account_id);
504
505        let bus = state.buses.get_mut(event_bus_name).ok_or_else(|| {
506            AwsServiceError::aws_error(
507                StatusCode::BAD_REQUEST,
508                "ResourceNotFoundException",
509                format!("Event bus {event_bus_name} does not exist."),
510            )
511        })?;
512
513        if remove_all {
514            bus.policy = None;
515            return Ok(AwsResponse::ok_json(json!({})));
516        }
517
518        let policy = bus.policy.as_mut().ok_or_else(|| {
519            AwsServiceError::aws_error(
520                StatusCode::BAD_REQUEST,
521                "ResourceNotFoundException",
522                "EventBus does not have a policy.",
523            )
524        })?;
525
526        if let Some(stmts) = policy["Statement"].as_array_mut() {
527            let before = stmts.len();
528            stmts.retain(|s| s["Sid"].as_str() != Some(statement_id));
529            if stmts.len() == before {
530                return Err(AwsServiceError::aws_error(
531                    StatusCode::BAD_REQUEST,
532                    "ResourceNotFoundException",
533                    "Statement with the provided id does not exist.",
534                ));
535            }
536            if stmts.is_empty() {
537                bus.policy = None;
538            }
539        }
540
541        Ok(AwsResponse::ok_json(json!({})))
542    }
543
544    // ─── Rule Operations ────────────────────────────────────────────────
545
546    fn put_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
547        let body = req.json_body();
548        validate_required("Name", &body["Name"])?;
549        let name = body["Name"]
550            .as_str()
551            .ok_or_else(|| missing("Name"))?
552            .to_string();
553        validate_string_length("name", &name, 1, 64)?;
554        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
555        validate_optional_string_length(
556            "scheduleExpression",
557            body["ScheduleExpression"].as_str(),
558            0,
559            256,
560        )?;
561        validate_optional_string_length("eventPattern", body["EventPattern"].as_str(), 0, 4096)?;
562        validate_optional_enum(
563            "state",
564            body["State"].as_str(),
565            &[
566                "ENABLED",
567                "DISABLED",
568                "ENABLED_WITH_ALL_CLOUDTRAIL_MANAGEMENT_EVENTS",
569            ],
570        )?;
571        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
572        validate_optional_string_length("roleArn", body["RoleArn"].as_str(), 1, 1600)?;
573
574        let raw_bus = body["EventBusName"]
575            .as_str()
576            .unwrap_or("default")
577            .to_string();
578
579        let mut accounts = self.state.write();
580        let state = accounts.get_or_create(&req.account_id);
581        let event_bus_name = state.resolve_bus_name(&raw_bus);
582
583        let event_pattern = body["EventPattern"].as_str().and_then(|s| {
584            if s.is_empty() {
585                None
586            } else {
587                Some(s.to_string())
588            }
589        });
590        let schedule_expression = body["ScheduleExpression"].as_str().and_then(|s| {
591            if s.is_empty() {
592                None
593            } else {
594                Some(s.to_string())
595            }
596        });
597        let description = body["Description"].as_str().map(|s| s.to_string());
598        let role_arn = body["RoleArn"].as_str().map(|s| s.to_string());
599        let rule_state = body["State"].as_str().unwrap_or("ENABLED").to_string();
600
601        // Validate: schedule expressions only on default bus
602        if schedule_expression.is_some() && event_bus_name != "default" {
603            return Err(AwsServiceError::aws_error(
604                StatusCode::BAD_REQUEST,
605                "ValidationException",
606                "ScheduleExpression is supported only on the default event bus.",
607            ));
608        }
609
610        if !state.buses.contains_key(&event_bus_name) {
611            return Err(AwsServiceError::aws_error(
612                StatusCode::BAD_REQUEST,
613                "ResourceNotFoundException",
614                format!("Event bus {event_bus_name} does not exist."),
615            ));
616        }
617
618        let arn = if event_bus_name == "default" {
619            format!(
620                "arn:aws:events:{}:{}:rule/{}",
621                req.region, state.account_id, name
622            )
623        } else {
624            format!(
625                "arn:aws:events:{}:{}:rule/{}/{}",
626                req.region, state.account_id, event_bus_name, name
627            )
628        };
629
630        let key = (event_bus_name.clone(), name.clone());
631        let targets = state
632            .rules
633            .get(&key)
634            .map(|r| r.targets.clone())
635            .unwrap_or_default();
636
637        let tags = parse_tags(&body);
638
639        let rule = EventRule {
640            name: name.clone(),
641            arn: arn.clone(),
642            event_bus_name,
643            event_pattern,
644            schedule_expression,
645            state: rule_state,
646            description,
647            role_arn,
648            managed_by: None,
649            created_by: None,
650            targets,
651            tags,
652            last_fired: None,
653        };
654
655        state.rules.insert(key, rule);
656        Ok(AwsResponse::ok_json(json!({ "RuleArn": arn })))
657    }
658
659    fn delete_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
660        let body = req.json_body();
661        validate_required("Name", &body["Name"])?;
662        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
663        validate_string_length("name", name, 1, 64)?;
664        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
665        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
666
667        let mut accounts = self.state.write();
668        let state = accounts.get_or_create(&req.account_id);
669        let bus_name = state.resolve_bus_name(event_bus_name);
670        let key = (bus_name, name.to_string());
671
672        // Check if rule has targets
673        if let Some(rule) = state.rules.get(&key) {
674            if !rule.targets.is_empty() {
675                return Err(AwsServiceError::aws_error(
676                    StatusCode::BAD_REQUEST,
677                    "ValidationException",
678                    "Rule can't be deleted since it has targets.",
679                ));
680            }
681        }
682
683        state.rules.remove(&key);
684        Ok(AwsResponse::ok_json(json!({})))
685    }
686
687    fn list_rules(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
688        let body = req.json_body();
689        validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 64)?;
690        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
691        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
692        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
693        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
694        let name_prefix = body["NamePrefix"].as_str();
695        let limit = body["Limit"].as_u64().map(|n| n as usize);
696        let next_token = body["NextToken"].as_str();
697
698        let accounts = self.state.read();
699        let empty = EventBridgeState::new(&req.account_id, &req.region);
700        let state = accounts.get(&req.account_id).unwrap_or(&empty);
701        let bus_name = state.resolve_bus_name(event_bus_name);
702
703        let mut rules: Vec<&EventRule> = state
704            .rules
705            .values()
706            .filter(|r| r.event_bus_name == bus_name)
707            .filter(|r| match name_prefix {
708                Some(prefix) => r.name.starts_with(prefix),
709                None => true,
710            })
711            .collect();
712        rules.sort_by(|a, b| a.name.cmp(&b.name));
713
714        // Pagination
715        let start = next_token
716            .and_then(|t| t.parse::<usize>().ok())
717            .unwrap_or(0)
718            .min(rules.len());
719        let rules_slice = &rules[start..];
720
721        let (page, new_next_token) = if let Some(lim) = limit {
722            if rules_slice.len() > lim {
723                (&rules_slice[..lim], Some((start + lim).to_string()))
724            } else {
725                (rules_slice, None)
726            }
727        } else {
728            (rules_slice, None)
729        };
730
731        let rules_json: Vec<Value> = page
732            .iter()
733            .map(|r| {
734                let mut obj = json!({
735                    "Name": r.name,
736                    "Arn": r.arn,
737                    "EventBusName": r.event_bus_name,
738                    "State": r.state,
739                });
740                if let Some(ref desc) = r.description {
741                    obj["Description"] = json!(desc);
742                }
743                if let Some(ref ep) = r.event_pattern {
744                    obj["EventPattern"] = json!(ep);
745                }
746                if let Some(ref se) = r.schedule_expression {
747                    obj["ScheduleExpression"] = json!(se);
748                }
749                if let Some(ref role) = r.role_arn {
750                    obj["RoleArn"] = json!(role);
751                }
752                if let Some(ref mb) = r.managed_by {
753                    obj["ManagedBy"] = json!(mb);
754                }
755                obj
756            })
757            .collect();
758
759        let mut resp = json!({ "Rules": rules_json });
760        if let Some(token) = new_next_token {
761            resp["NextToken"] = json!(token);
762        }
763
764        Ok(AwsResponse::ok_json(resp))
765    }
766
767    fn describe_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
768        let body = req.json_body();
769        validate_required("Name", &body["Name"])?;
770        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
771        validate_string_length("name", name, 1, 64)?;
772        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
773        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
774
775        let accounts = self.state.read();
776        let empty = EventBridgeState::new(&req.account_id, &req.region);
777        let state = accounts.get(&req.account_id).unwrap_or(&empty);
778        let bus_name = state.resolve_bus_name(event_bus_name);
779        let key = (bus_name.clone(), name.to_string());
780
781        let rule = state.rules.get(&key).ok_or_else(|| {
782            AwsServiceError::aws_error(
783                StatusCode::BAD_REQUEST,
784                "ResourceNotFoundException",
785                format!("Rule {name} does not exist."),
786            )
787        })?;
788
789        let mut resp = json!({
790            "Name": rule.name,
791            "Arn": rule.arn,
792            "EventBusName": rule.event_bus_name,
793            "State": rule.state,
794        });
795
796        if let Some(ref desc) = rule.description {
797            resp["Description"] = json!(desc);
798        }
799        if let Some(ref ep) = rule.event_pattern {
800            resp["EventPattern"] = json!(ep);
801        }
802        if let Some(ref se) = rule.schedule_expression {
803            resp["ScheduleExpression"] = json!(se);
804        }
805        if let Some(ref role) = rule.role_arn {
806            resp["RoleArn"] = json!(role);
807        }
808        if let Some(ref mb) = rule.managed_by {
809            resp["ManagedBy"] = json!(mb);
810        }
811        if let Some(ref cb) = rule.created_by {
812            resp["CreatedBy"] = json!(cb);
813        }
814        // If non-default bus, set CreatedBy to account_id
815        if rule.event_bus_name != "default" && rule.created_by.is_none() {
816            resp["CreatedBy"] = json!(state.account_id);
817        }
818
819        Ok(AwsResponse::ok_json(resp))
820    }
821
822    fn enable_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
823        let body = req.json_body();
824        validate_required("Name", &body["Name"])?;
825        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
826        validate_string_length("name", name, 1, 64)?;
827        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
828        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
829
830        let mut accounts = self.state.write();
831        let state = accounts.get_or_create(&req.account_id);
832        let bus_name = state.resolve_bus_name(event_bus_name);
833        let key = (bus_name, name.to_string());
834
835        let rule = state.rules.get_mut(&key).ok_or_else(|| {
836            AwsServiceError::aws_error(
837                StatusCode::BAD_REQUEST,
838                "ResourceNotFoundException",
839                format!("Rule {name} does not exist."),
840            )
841        })?;
842
843        rule.state = "ENABLED".to_string();
844        Ok(AwsResponse::ok_json(json!({})))
845    }
846
847    fn disable_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
848        let body = req.json_body();
849        validate_required("Name", &body["Name"])?;
850        let name = body["Name"].as_str().ok_or_else(|| missing("Name"))?;
851        validate_string_length("name", name, 1, 64)?;
852        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
853        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
854
855        let mut accounts = self.state.write();
856        let state = accounts.get_or_create(&req.account_id);
857        let bus_name = state.resolve_bus_name(event_bus_name);
858        let key = (bus_name, name.to_string());
859
860        let rule = state.rules.get_mut(&key).ok_or_else(|| {
861            AwsServiceError::aws_error(
862                StatusCode::BAD_REQUEST,
863                "ResourceNotFoundException",
864                format!("Rule {name} does not exist."),
865            )
866        })?;
867
868        rule.state = "DISABLED".to_string();
869        Ok(AwsResponse::ok_json(json!({})))
870    }
871
872    // ─── Target Operations ──────────────────────────────────────────────
873
874    fn put_targets(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
875        let body = req.json_body();
876        validate_required("Rule", &body["Rule"])?;
877        let rule_name = body["Rule"].as_str().ok_or_else(|| missing("Rule"))?;
878        validate_string_length("rule", rule_name, 1, 64)?;
879        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
880        validate_required("Targets", &body["Targets"])?;
881        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
882        let targets = body["Targets"]
883            .as_array()
884            .ok_or_else(|| missing("Targets"))?;
885
886        // Validate targets - check for FIFO SQS without SqsParameters
887        for target in targets {
888            let target_id = target["Id"].as_str().unwrap_or("");
889            let target_arn = target["Arn"].as_str().unwrap_or("");
890
891            if target_arn.ends_with(".fifo") && target.get("SqsParameters").is_none() {
892                return Err(AwsServiceError::aws_error(
893                    StatusCode::BAD_REQUEST,
894                    "ValidationException",
895                    format!(
896                        "Parameter(s) SqsParameters must be specified for target: {target_id}."
897                    ),
898                ));
899            }
900
901            // Validate ARN format
902            if !target_arn.starts_with("arn:") {
903                return Err(AwsServiceError::aws_error(
904                    StatusCode::BAD_REQUEST,
905                    "ValidationException",
906                    format!(
907                        "Parameter {target_arn} is not valid. Reason: Provided Arn is not in correct format."
908                    ),
909                ));
910            }
911        }
912
913        let mut accounts = self.state.write();
914        let state = accounts.get_or_create(&req.account_id);
915        let bus_name = state.resolve_bus_name(event_bus_name);
916        let key = (bus_name.clone(), rule_name.to_string());
917
918        let rule = state.rules.get_mut(&key).ok_or_else(|| {
919            AwsServiceError::aws_error(
920                StatusCode::BAD_REQUEST,
921                "ResourceNotFoundException",
922                format!("Rule {rule_name} does not exist on EventBus {bus_name}."),
923            )
924        })?;
925
926        for target in targets {
927            let et = parse_target(target);
928            // Remove existing target with same ID
929            rule.targets.retain(|t| t.id != et.id);
930            rule.targets.push(et);
931        }
932
933        Ok(AwsResponse::ok_json(json!({
934            "FailedEntryCount": 0,
935            "FailedEntries": [],
936        })))
937    }
938
939    fn remove_targets(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
940        let body = req.json_body();
941        validate_required("Rule", &body["Rule"])?;
942        let rule_name = body["Rule"].as_str().ok_or_else(|| missing("Rule"))?;
943        validate_string_length("rule", rule_name, 1, 64)?;
944        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
945        validate_required("Ids", &body["Ids"])?;
946        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
947        let ids = body["Ids"].as_array().ok_or_else(|| missing("Ids"))?;
948
949        let target_ids: Vec<String> = ids
950            .iter()
951            .filter_map(|v| v.as_str().map(|s| s.to_string()))
952            .collect();
953
954        let mut accounts = self.state.write();
955        let state = accounts.get_or_create(&req.account_id);
956        let bus_name = state.resolve_bus_name(event_bus_name);
957        let key = (bus_name.clone(), rule_name.to_string());
958
959        let rule = state.rules.get_mut(&key).ok_or_else(|| {
960            AwsServiceError::aws_error(
961                StatusCode::BAD_REQUEST,
962                "ResourceNotFoundException",
963                format!("Rule {rule_name} does not exist on EventBus {bus_name}."),
964            )
965        })?;
966
967        rule.targets.retain(|t| !target_ids.contains(&t.id));
968
969        Ok(AwsResponse::ok_json(json!({
970            "FailedEntryCount": 0,
971            "FailedEntries": [],
972        })))
973    }
974
975    fn list_targets_by_rule(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
976        let body = req.json_body();
977        validate_required("Rule", &body["Rule"])?;
978        let rule_name = body["Rule"].as_str().ok_or_else(|| missing("Rule"))?;
979        validate_string_length("rule", rule_name, 1, 64)?;
980        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
981        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
982        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
983        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
984        let limit = body["Limit"].as_u64().map(|n| n as usize);
985        let next_token = body["NextToken"].as_str();
986
987        let accounts = self.state.read();
988        let empty = EventBridgeState::new(&req.account_id, &req.region);
989        let state = accounts.get(&req.account_id).unwrap_or(&empty);
990        let bus_name = state.resolve_bus_name(event_bus_name);
991        let key = (bus_name, rule_name.to_string());
992
993        let rule = state.rules.get(&key).ok_or_else(|| {
994            AwsServiceError::aws_error(
995                StatusCode::BAD_REQUEST,
996                "ResourceNotFoundException",
997                format!("Rule {rule_name} does not exist."),
998            )
999        })?;
1000
1001        let all_targets = &rule.targets;
1002        let start = next_token
1003            .and_then(|t| t.parse::<usize>().ok())
1004            .unwrap_or(0)
1005            .min(all_targets.len());
1006        let slice = &all_targets[start..];
1007
1008        let (page, new_next_token) = if let Some(lim) = limit {
1009            if slice.len() > lim {
1010                (&slice[..lim], Some((start + lim).to_string()))
1011            } else {
1012                (slice, None)
1013            }
1014        } else {
1015            (slice, None)
1016        };
1017
1018        let targets: Vec<Value> = page.iter().map(target_to_json).collect();
1019
1020        let mut resp = json!({ "Targets": targets });
1021        if let Some(token) = new_next_token {
1022            resp["NextToken"] = json!(token);
1023        }
1024
1025        Ok(AwsResponse::ok_json(resp))
1026    }
1027
1028    fn list_rule_names_by_target(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1029        let body = req.json_body();
1030        validate_required("TargetArn", &body["TargetArn"])?;
1031        let target_arn = body["TargetArn"]
1032            .as_str()
1033            .ok_or_else(|| missing("TargetArn"))?;
1034        validate_string_length("targetArn", target_arn, 1, 1600)?;
1035        validate_optional_string_length("eventBusName", body["EventBusName"].as_str(), 1, 1600)?;
1036        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
1037        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1038        let event_bus_name = body["EventBusName"].as_str().unwrap_or("default");
1039        let limit = body["Limit"].as_u64().map(|n| n as usize);
1040        let next_token = body["NextToken"].as_str();
1041
1042        let accounts = self.state.read();
1043        let empty = EventBridgeState::new(&req.account_id, &req.region);
1044        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1045        let bus_name = state.resolve_bus_name(event_bus_name);
1046
1047        // Deduplicate rule names
1048        let mut rule_names: Vec<String> = Vec::new();
1049        for rule in state.rules.values() {
1050            if rule.event_bus_name == bus_name
1051                && rule.targets.iter().any(|t| t.arn == target_arn)
1052                && !rule_names.contains(&rule.name)
1053            {
1054                rule_names.push(rule.name.clone());
1055            }
1056        }
1057        rule_names.sort();
1058
1059        let start = next_token
1060            .and_then(|t| t.parse::<usize>().ok())
1061            .unwrap_or(0)
1062            .min(rule_names.len());
1063        let slice = &rule_names[start..];
1064
1065        let (page, new_next_token) = if let Some(lim) = limit {
1066            if slice.len() > lim {
1067                (&slice[..lim], Some((start + lim).to_string()))
1068            } else {
1069                (slice, None)
1070            }
1071        } else {
1072            (slice, None)
1073        };
1074
1075        let mut resp = json!({ "RuleNames": page });
1076        if let Some(token) = new_next_token {
1077            resp["NextToken"] = json!(token);
1078        }
1079
1080        Ok(AwsResponse::ok_json(resp))
1081    }
1082
1083    // ─── Partner Event Sources ────────────���───────────────────────────
1084
1085    fn test_event_pattern(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1086        let body = req.json_body();
1087        validate_required("EventPattern", &body["EventPattern"])?;
1088        validate_required("Event", &body["Event"])?;
1089        let event_pattern = body["EventPattern"]
1090            .as_str()
1091            .ok_or_else(|| missing("EventPattern"))?;
1092        let event_str = body["Event"].as_str().ok_or_else(|| missing("Event"))?;
1093
1094        // Parse the event JSON
1095        let event: Value = serde_json::from_str(event_str).map_err(|_| {
1096            AwsServiceError::aws_error(
1097                StatusCode::BAD_REQUEST,
1098                "InvalidEventPatternException",
1099                "Event is not valid JSON.",
1100            )
1101        })?;
1102
1103        // Parse the pattern JSON
1104        let _pattern: Value = serde_json::from_str(event_pattern).map_err(|_| {
1105            AwsServiceError::aws_error(
1106                StatusCode::BAD_REQUEST,
1107                "InvalidEventPatternException",
1108                "Event pattern is not valid JSON.",
1109            )
1110        })?;
1111
1112        let source = event["source"].as_str().unwrap_or("");
1113        let detail_type = event["detail-type"].as_str().unwrap_or("");
1114        let detail = event
1115            .get("detail")
1116            .map(|v| serde_json::to_string(v).unwrap_or_default())
1117            .unwrap_or_else(|| "{}".to_string());
1118        let account = event["account"].as_str().unwrap_or("");
1119        let region = event["region"].as_str().unwrap_or("");
1120        let resources: Vec<String> = event["resources"]
1121            .as_array()
1122            .map(|arr| {
1123                arr.iter()
1124                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
1125                    .collect()
1126            })
1127            .unwrap_or_default();
1128
1129        let result = matches_pattern(
1130            Some(event_pattern),
1131            source,
1132            detail_type,
1133            &detail,
1134            account,
1135            region,
1136            &resources,
1137        );
1138
1139        Ok(AwsResponse::ok_json(json!({ "Result": result })))
1140    }
1141
1142    // ─── UpdateEventBus ─────────────────────────────────────────────────
1143
1144    fn update_event_bus(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1145        let body = req.json_body();
1146        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
1147        validate_optional_string_length(
1148            "kmsKeyIdentifier",
1149            body["KmsKeyIdentifier"].as_str(),
1150            0,
1151            2048,
1152        )?;
1153        let name = body["Name"].as_str().unwrap_or("default");
1154
1155        let mut accounts = self.state.write();
1156        let state = accounts.get_or_create(&req.account_id);
1157        let bus = state.buses.get_mut(name).ok_or_else(|| {
1158            AwsServiceError::aws_error(
1159                StatusCode::BAD_REQUEST,
1160                "ResourceNotFoundException",
1161                format!("Event bus {name} does not exist."),
1162            )
1163        })?;
1164
1165        if let Some(desc) = body["Description"].as_str() {
1166            bus.description = Some(desc.to_string());
1167        }
1168        if let Some(kms) = body["KmsKeyIdentifier"].as_str() {
1169            bus.kms_key_identifier = Some(kms.to_string());
1170        }
1171        if let Some(dlc) = body.get("DeadLetterConfig") {
1172            bus.dead_letter_config = Some(dlc.clone());
1173        }
1174        bus.last_modified_time = Utc::now();
1175
1176        let arn = bus.arn.clone();
1177        let bus_name = bus.name.clone();
1178
1179        Ok(AwsResponse::ok_json(json!({
1180            "Arn": arn,
1181            "Name": bus_name,
1182        })))
1183    }
1184
1185    // ─── Endpoint Operations ────────────────────────────────────────────
1186
1187    fn put_events(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1188        let body = req.json_body();
1189        validate_required("Entries", &body["Entries"])?;
1190        validate_optional_string_length("endpointId", body["EndpointId"].as_str(), 1, 50)?;
1191        let entries = body["Entries"]
1192            .as_array()
1193            .ok_or_else(|| missing("Entries"))?;
1194
1195        // Validate entries count
1196        if entries.is_empty() {
1197            return Err(AwsServiceError::aws_error(
1198                StatusCode::BAD_REQUEST,
1199                "ValidationException",
1200                "1 validation error detected: Value '[PutEventsRequestEntry]' at 'entries' failed to satisfy constraint: Member must have length greater than or equal to 1",
1201            ));
1202        }
1203        if entries.len() > 10 {
1204            return Err(AwsServiceError::aws_error(
1205                StatusCode::BAD_REQUEST,
1206                "ValidationException",
1207                "1 validation error detected: Value '[PutEventsRequestEntry]' at 'entries' failed to satisfy constraint: Member must have length less than or equal to 10",
1208            ));
1209        }
1210
1211        let mut accounts = self.state.write();
1212        let state = accounts.get_or_create(&req.account_id);
1213        let mut result_entries = Vec::new();
1214        let mut events_to_deliver = Vec::new();
1215        let mut failed_count = 0;
1216
1217        for entry in entries {
1218            let source = entry["Source"].as_str().unwrap_or("").to_string();
1219            let detail_type = entry["DetailType"].as_str().unwrap_or("").to_string();
1220            let detail = entry["Detail"].as_str().unwrap_or("").to_string();
1221
1222            if let Err(error) = validate_put_events_entry(&source, &detail_type, &detail) {
1223                failed_count += 1;
1224                result_entries.push(error);
1225                continue;
1226            }
1227
1228            let event_id = uuid::Uuid::new_v4().to_string();
1229            let raw_bus = entry["EventBusName"]
1230                .as_str()
1231                .unwrap_or("default")
1232                .to_string();
1233            let event_bus_name = state.resolve_bus_name(&raw_bus);
1234
1235            // Bus resource-policy gate. AWS evaluates the bus's
1236            // resource policy against cross-account callers; same-account
1237            // callers always have access. The policy itself is JSON
1238            // stored as serde_json::Value so the IAM evaluator parses
1239            // it the same way it parses an S3 bucket policy.
1240            let caller_account = req
1241                .principal
1242                .as_ref()
1243                .map(|p| p.account_id.as_str())
1244                .unwrap_or(req.account_id.as_str());
1245            if caller_account != req.account_id {
1246                let bus_policy_value = state
1247                    .buses
1248                    .get(&event_bus_name)
1249                    .and_then(|b| b.policy.clone());
1250                if let Some(policy_value) = bus_policy_value {
1251                    let policy_json = serde_json::to_string(&policy_value).unwrap_or_default();
1252                    let policy_doc = fakecloud_iam::evaluator::PolicyDocument::parse(&policy_json);
1253                    let bus_arn = state
1254                        .buses
1255                        .get(&event_bus_name)
1256                        .map(|b| b.arn.clone())
1257                        .unwrap_or_default();
1258                    let principal =
1259                        req.principal
1260                            .clone()
1261                            .unwrap_or_else(|| fakecloud_core::auth::Principal {
1262                                arn: format!("arn:aws:iam::{caller_account}:root"),
1263                                user_id: caller_account.to_string(),
1264                                account_id: caller_account.to_string(),
1265                                principal_type: fakecloud_core::auth::PrincipalType::Root,
1266                                source_identity: None,
1267                                tags: None,
1268                            });
1269                    let context = fakecloud_iam::evaluator::RequestContext {
1270                        aws_principal_arn: Some(principal.arn.clone()),
1271                        aws_principal_account: Some(principal.account_id.clone()),
1272                        ..Default::default()
1273                    };
1274                    let eval_req = fakecloud_iam::evaluator::EvalRequest {
1275                        principal: &principal,
1276                        action: "events:PutEvents".to_string(),
1277                        resource: bus_arn,
1278                        context,
1279                    };
1280                    let decision = fakecloud_iam::evaluator::evaluate_resource_policy_only(
1281                        &policy_doc,
1282                        &eval_req,
1283                    );
1284                    if !matches!(decision, fakecloud_iam::evaluator::Decision::Allow) {
1285                        failed_count += 1;
1286                        result_entries.push(json!({
1287                            "ErrorCode": "AccessDeniedException",
1288                            "ErrorMessage": format!(
1289                                "User '{}' is not authorized to put events on event bus '{}'",
1290                                principal.arn, event_bus_name
1291                            ),
1292                        }));
1293                        continue;
1294                    }
1295                }
1296            }
1297
1298            let time = parse_put_events_time(&entry["Time"]);
1299            let resources: Vec<String> = entry["Resources"]
1300                .as_array()
1301                .map(|arr| {
1302                    arr.iter()
1303                        .filter_map(|v| v.as_str().map(|s| s.to_string()))
1304                        .collect()
1305                })
1306                .unwrap_or_default();
1307
1308            let event = PutEvent {
1309                event_id: event_id.clone(),
1310                source: source.clone(),
1311                detail_type: detail_type.clone(),
1312                detail: detail.clone(),
1313                event_bus_name: event_bus_name.clone(),
1314                time,
1315                resources: resources.clone(),
1316            };
1317
1318            archive_matching_event(
1319                state,
1320                &event,
1321                &event_bus_name,
1322                &source,
1323                &detail_type,
1324                &detail,
1325                &req.account_id,
1326                &req.region,
1327                &resources,
1328            );
1329
1330            state.events.push(event);
1331
1332            // Find matching rules and their targets
1333            let matching_targets: Vec<EventTarget> = state
1334                .rules
1335                .values()
1336                .filter(|r| {
1337                    r.event_bus_name == event_bus_name
1338                        && r.state == "ENABLED"
1339                        && matches_pattern(
1340                            r.event_pattern.as_deref(),
1341                            &source,
1342                            &detail_type,
1343                            &detail,
1344                            &req.account_id,
1345                            &req.region,
1346                            &resources,
1347                        )
1348                })
1349                .flat_map(|r| r.targets.clone())
1350                .collect();
1351
1352            if !matching_targets.is_empty() {
1353                events_to_deliver.push((
1354                    event_id.clone(),
1355                    source,
1356                    detail_type,
1357                    detail,
1358                    time,
1359                    resources,
1360                    matching_targets,
1361                ));
1362            }
1363
1364            result_entries.push(json!({ "EventId": event_id }));
1365        }
1366
1367        // Drop the lock before delivering
1368        drop(accounts);
1369
1370        // Deliver to targets — single-target dispatch lives in the
1371        // shared helper so cross-service callers (delivery.rs) honor the
1372        // same target shape (SQS/SNS/Lambda/Logs/Kinesis/StepFunctions/
1373        // ApiDestination/HTTP) and the same InputTransformer rules.
1374        for (event_id, source, detail_type, detail, time, resources, targets) in events_to_deliver {
1375            let detail_value: Value = serde_json::from_str(&detail).unwrap_or(json!({}));
1376            let event_json = json!({
1377                "version": "0",
1378                "id": event_id,
1379                "source": source,
1380                "account": req.account_id,
1381                "detail-type": detail_type,
1382                "detail": detail_value,
1383                "time": time.format("%Y-%m-%dT%H:%M:%SZ").to_string(),
1384                "region": req.region,
1385                "resources": resources,
1386            });
1387
1388            let ctx = EventDispatchContext {
1389                state: &self.state,
1390                delivery: &self.delivery,
1391                lambda_state: self.lambda_state.as_ref(),
1392                logs_state: self.logs_state.as_ref(),
1393                container_runtime: &self.container_runtime,
1394                account_id: &req.account_id,
1395                region: &req.region,
1396            };
1397            for target in targets {
1398                dispatch_event_target(&ctx, &target, &event_json, &event_id, &detail_type);
1399            }
1400        }
1401
1402        let resp = json!({
1403            "FailedEntryCount": failed_count,
1404            "Entries": result_entries,
1405        });
1406
1407        Ok(AwsResponse::ok_json(resp))
1408    }
1409
1410    // ─── Tagging ────────────────────────────────────────────────────────
1411
1412    fn tag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1413        let body = req.json_body();
1414        validate_required("ResourceARN", &body["ResourceARN"])?;
1415        let arn = body["ResourceARN"]
1416            .as_str()
1417            .ok_or_else(|| missing("ResourceARN"))?;
1418        validate_string_length("resourceARN", arn, 1, 1600)?;
1419        validate_required("Tags", &body["Tags"])?;
1420
1421        let mut accounts = self.state.write();
1422        let state = accounts.get_or_create(&req.account_id);
1423        let tag_map = find_tags_mut(state, arn)?;
1424
1425        fakecloud_core::tags::apply_tags(tag_map, &body, "Tags", "Key", "Value").map_err(|f| {
1426            AwsServiceError::aws_error(
1427                StatusCode::BAD_REQUEST,
1428                "ValidationException",
1429                format!("{f} must be a list"),
1430            )
1431        })?;
1432
1433        Ok(AwsResponse::ok_json(json!({})))
1434    }
1435
1436    fn untag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1437        let body = req.json_body();
1438        validate_required("ResourceARN", &body["ResourceARN"])?;
1439        let arn = body["ResourceARN"]
1440            .as_str()
1441            .ok_or_else(|| missing("ResourceARN"))?;
1442        validate_string_length("resourceARN", arn, 1, 1600)?;
1443        validate_required("TagKeys", &body["TagKeys"])?;
1444
1445        let mut accounts = self.state.write();
1446        let state = accounts.get_or_create(&req.account_id);
1447        let tag_map = find_tags_mut(state, arn)?;
1448
1449        fakecloud_core::tags::remove_tags(tag_map, &body, "TagKeys").map_err(|f| {
1450            AwsServiceError::aws_error(
1451                StatusCode::BAD_REQUEST,
1452                "ValidationException",
1453                format!("{f} must be a list"),
1454            )
1455        })?;
1456
1457        Ok(AwsResponse::ok_json(json!({})))
1458    }
1459
1460    fn list_tags_for_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1461        let body = req.json_body();
1462        validate_required("ResourceARN", &body["ResourceARN"])?;
1463        let arn = body["ResourceARN"]
1464            .as_str()
1465            .ok_or_else(|| missing("ResourceARN"))?;
1466        validate_string_length("resourceARN", arn, 1, 1600)?;
1467
1468        let accounts = self.state.read();
1469        let empty = EventBridgeState::new(&req.account_id, &req.region);
1470        let state = accounts.get(&req.account_id).unwrap_or(&empty);
1471        let tag_map = find_tags(state, arn)?;
1472
1473        let tags = fakecloud_core::tags::tags_to_json(tag_map, "Key", "Value");
1474
1475        Ok(AwsResponse::ok_json(json!({ "Tags": tags })))
1476    }
1477
1478    // ─── Archive Operations ─────────────────────────────────────────────
1479}
1480
1481// ─── Tag Lookup Helpers ─────────────────────────────────────────────────
1482
1483// ─── Event Pattern Validation ────────────────────────────────────────
1484
1485// ─── Connection Auth Params Response Builder ────────────────────────
1486
1487// ─── Event Pattern Matching ─────────────────────────────────────────
1488
1489/// Parsed + validated inputs for `StartReplay`.
1490struct StartReplayInput {
1491    name: String,
1492    description: Option<String>,
1493    event_source_arn: String,
1494    destination: Value,
1495    destination_arn: String,
1496    event_start_time: DateTime<Utc>,
1497    event_end_time: DateTime<Utc>,
1498}
1499
1500impl StartReplayInput {
1501    fn from_body(body: &Value) -> Result<Self, AwsServiceError> {
1502        validate_required("ReplayName", &body["ReplayName"])?;
1503        let name = body["ReplayName"]
1504            .as_str()
1505            .ok_or_else(|| missing("ReplayName"))?
1506            .to_string();
1507        validate_string_length("replayName", &name, 1, 64)?;
1508        validate_optional_string_length("description", body["Description"].as_str(), 0, 512)?;
1509        validate_required("EventSourceArn", &body["EventSourceArn"])?;
1510        let description = body["Description"].as_str().map(|s| s.to_string());
1511        let event_source_arn = body["EventSourceArn"]
1512            .as_str()
1513            .ok_or_else(|| missing("EventSourceArn"))?
1514            .to_string();
1515        validate_string_length("eventSourceArn", &event_source_arn, 1, 1600)?;
1516        validate_required("EventStartTime", &body["EventStartTime"])?;
1517        validate_required("EventEndTime", &body["EventEndTime"])?;
1518        validate_required("Destination", &body["Destination"])?;
1519        let destination = body["Destination"].clone();
1520
1521        let event_start_time = body["EventStartTime"]
1522            .as_f64()
1523            .and_then(|f| DateTime::from_timestamp(f as i64, 0))
1524            .unwrap_or_else(Utc::now);
1525        let event_end_time = body["EventEndTime"]
1526            .as_f64()
1527            .and_then(|f| DateTime::from_timestamp(f as i64, 0))
1528            .unwrap_or_else(Utc::now);
1529
1530        let destination_arn = destination["Arn"].as_str().unwrap_or("").to_string();
1531        if !destination_arn.contains(":event-bus/") {
1532            return Err(AwsServiceError::aws_error(
1533                StatusCode::BAD_REQUEST,
1534                "ValidationException",
1535                "Parameter Destination.Arn is not valid. Reason: Must contain an event bus ARN.",
1536            ));
1537        }
1538
1539        Ok(Self {
1540            name,
1541            description,
1542            event_source_arn,
1543            destination,
1544            destination_arn,
1545            event_start_time,
1546            event_end_time,
1547        })
1548    }
1549}
1550
1551#[path = "service_archives_replays.rs"]
1552mod service_archives_replays;
1553#[path = "service_connections_apidests.rs"]
1554mod service_connections_apidests;
1555#[path = "service_endpoints.rs"]
1556mod service_endpoints;
1557#[path = "service_partner_sources.rs"]
1558mod service_partner_sources;
1559
1560#[path = "helpers.rs"]
1561mod helpers;
1562pub(crate) use helpers::*;
1563
1564#[cfg(test)]
1565#[path = "service_tests.rs"]
1566mod tests;