Skip to main content

fakecloud_eventbridge/
delivery.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use chrono::Utc;
5
6use fakecloud_core::delivery::{DeliveryBus, EventBridgeDelivery};
7
8use crate::service::matches_pattern;
9use crate::state::{PutEvent, SharedEventBridgeState};
10
11/// Implements EventBridgeDelivery so other services (SES) can put events
12/// on an EventBridge bus with full rule matching and target delivery.
13pub struct EventBridgeDeliveryImpl {
14    state: SharedEventBridgeState,
15    delivery: Arc<DeliveryBus>,
16}
17
18impl EventBridgeDeliveryImpl {
19    pub fn new(state: SharedEventBridgeState, delivery: Arc<DeliveryBus>) -> Self {
20        Self { state, delivery }
21    }
22}
23
24impl EventBridgeDelivery for EventBridgeDeliveryImpl {
25    fn put_event(&self, source: &str, detail_type: &str, detail: &str, event_bus_name: &str) {
26        let event_id = uuid::Uuid::new_v4().to_string();
27        let now = Utc::now();
28
29        let event = PutEvent {
30            event_id: event_id.clone(),
31            source: source.to_string(),
32            detail_type: detail_type.to_string(),
33            detail: detail.to_string(),
34            event_bus_name: event_bus_name.to_string(),
35            time: now,
36            resources: Vec::new(),
37        };
38
39        let mut state = self.state.write();
40        state.events.push(event);
41
42        // Find matching rules and their targets
43        let account_id = state.account_id.clone();
44        let region = state.region.clone();
45        let matching_targets: Vec<_> = state
46            .rules
47            .values()
48            .filter(|r| {
49                r.event_bus_name == event_bus_name
50                    && r.state == "ENABLED"
51                    && matches_pattern(
52                        r.event_pattern.as_deref(),
53                        source,
54                        detail_type,
55                        detail,
56                        &account_id,
57                        &region,
58                        &[],
59                    )
60            })
61            .flat_map(|r| r.targets.clone())
62            .collect();
63
64        // Drop the lock before delivering
65        drop(state);
66
67        if matching_targets.is_empty() {
68            return;
69        }
70
71        // Build the EventBridge event envelope
72        let detail_value: serde_json::Value =
73            serde_json::from_str(detail).unwrap_or(serde_json::json!({}));
74        let event_json = serde_json::json!({
75            "version": "0",
76            "id": event_id,
77            "source": source,
78            "account": account_id,
79            "detail-type": detail_type,
80            "detail": detail_value,
81            "time": now.format("%Y-%m-%dT%H:%M:%SZ").to_string(),
82            "region": region,
83            "resources": [],
84        });
85        let event_str = event_json.to_string();
86
87        for target in matching_targets {
88            let arn = &target.arn;
89            if arn.contains(":sqs:") {
90                self.delivery.send_to_sqs(arn, &event_str, &HashMap::new());
91            } else if arn.contains(":sns:") {
92                self.delivery
93                    .publish_to_sns(arn, &event_str, Some(detail_type));
94            }
95            // Lambda and other targets could be added here
96        }
97    }
98}