Skip to main content

fakecloud_eventbridge/
delivery.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use chrono::Utc;
5
6use fakecloud_core::delivery::{DeliveryBus, EventBridgeDelivery};
7
8use crate::service::matches_pattern;
9use crate::state::{PutEvent, SharedEventBridgeState};
10
11/// Implements EventBridgeDelivery so other services (SES) can put events
12/// on an EventBridge bus with full rule matching and target delivery.
13pub struct EventBridgeDeliveryImpl {
14    state: SharedEventBridgeState,
15    delivery: Arc<DeliveryBus>,
16}
17
18impl EventBridgeDeliveryImpl {
19    pub fn new(state: SharedEventBridgeState, delivery: Arc<DeliveryBus>) -> Self {
20        Self { state, delivery }
21    }
22}
23
24impl EventBridgeDelivery for EventBridgeDeliveryImpl {
25    fn put_event(&self, source: &str, detail_type: &str, detail: &str, event_bus_name: &str) {
26        let event_id = uuid::Uuid::new_v4().to_string();
27        let now = Utc::now();
28
29        let event = PutEvent {
30            event_id: event_id.clone(),
31            source: source.to_string(),
32            detail_type: detail_type.to_string(),
33            detail: detail.to_string(),
34            event_bus_name: event_bus_name.to_string(),
35            time: now,
36            resources: Vec::new(),
37        };
38
39        let mut accounts = self.state.write();
40        let state = accounts.default_mut();
41        state.events.push(event);
42
43        // Find matching rules and their targets
44        let account_id = state.account_id.clone();
45        let region = state.region.clone();
46        let matching_targets: Vec<_> = state
47            .rules
48            .values()
49            .filter(|r| {
50                r.event_bus_name == event_bus_name
51                    && r.state == "ENABLED"
52                    && matches_pattern(
53                        r.event_pattern.as_deref(),
54                        source,
55                        detail_type,
56                        detail,
57                        &account_id,
58                        &region,
59                        &[],
60                    )
61            })
62            .flat_map(|r| r.targets.clone())
63            .collect();
64
65        // Drop the lock before delivering
66        drop(accounts);
67
68        if matching_targets.is_empty() {
69            return;
70        }
71
72        // Build the EventBridge event envelope
73        let detail_value: serde_json::Value =
74            serde_json::from_str(detail).unwrap_or(serde_json::json!({}));
75        let event_json = serde_json::json!({
76            "version": "0",
77            "id": event_id,
78            "source": source,
79            "account": account_id,
80            "detail-type": detail_type,
81            "detail": detail_value,
82            "time": now.format("%Y-%m-%dT%H:%M:%SZ").to_string(),
83            "region": region,
84            "resources": [],
85        });
86        let event_str = event_json.to_string();
87
88        for target in matching_targets {
89            let arn = &target.arn;
90            if arn.contains(":sqs:") {
91                self.delivery.send_to_sqs(arn, &event_str, &HashMap::new());
92            } else if arn.contains(":sns:") {
93                self.delivery
94                    .publish_to_sns(arn, &event_str, Some(detail_type));
95            }
96            // Lambda and other targets could be added here
97        }
98    }
99}
100
101#[cfg(test)]
102mod tests {
103    use super::*;
104    use crate::state::{EventRule, EventTarget as EbTarget, SharedEventBridgeState};
105    use fakecloud_core::delivery::{SnsDelivery, SqsDelivery};
106    use parking_lot::RwLock;
107    use std::sync::Mutex;
108
109    #[derive(Default)]
110    struct Recorder {
111        sqs: Mutex<Vec<(String, String)>>,
112        sns: Mutex<Vec<(String, String, Option<String>)>>,
113    }
114
115    impl SqsDelivery for Recorder {
116        fn deliver_to_queue(&self, arn: &str, body: &str, _: &HashMap<String, String>) {
117            self.sqs
118                .lock()
119                .unwrap()
120                .push((arn.to_string(), body.to_string()));
121        }
122        fn deliver_to_queue_with_attrs(
123            &self,
124            arn: &str,
125            body: &str,
126            _: &HashMap<String, fakecloud_core::delivery::SqsMessageAttribute>,
127            _: Option<&str>,
128            _: Option<&str>,
129        ) {
130            self.sqs
131                .lock()
132                .unwrap()
133                .push((arn.to_string(), body.to_string()));
134        }
135    }
136
137    impl SnsDelivery for Recorder {
138        fn publish_to_topic(&self, arn: &str, msg: &str, subject: Option<&str>) {
139            self.sns.lock().unwrap().push((
140                arn.to_string(),
141                msg.to_string(),
142                subject.map(|s| s.to_string()),
143            ));
144        }
145    }
146
147    fn make_shared() -> SharedEventBridgeState {
148        Arc::new(RwLock::new(
149            fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
150        ))
151    }
152
153    fn make_rule(name: &str, pattern: Option<&str>, target_arn: &str) -> EventRule {
154        EventRule {
155            name: name.to_string(),
156            arn: format!("arn:aws:events:us-east-1:123456789012:rule/{name}"),
157            event_bus_name: "default".to_string(),
158            event_pattern: pattern.map(|s| s.to_string()),
159            schedule_expression: None,
160            state: "ENABLED".to_string(),
161            description: None,
162            role_arn: None,
163            managed_by: None,
164            created_by: None,
165            targets: vec![EbTarget {
166                id: "t1".to_string(),
167                arn: target_arn.to_string(),
168                input: None,
169                input_path: None,
170                input_transformer: None,
171                sqs_parameters: None,
172            }],
173            tags: HashMap::new(),
174            last_fired: None,
175        }
176    }
177
178    #[test]
179    fn put_event_appends_to_events_log() {
180        let state = make_shared();
181        let bus = Arc::new(DeliveryBus::new());
182        let delivery = EventBridgeDeliveryImpl::new(state.clone(), bus);
183        delivery.put_event("my.source", "MyType", r#"{"k":"v"}"#, "default");
184        let guard = state.read();
185        let default = guard.default_ref();
186        assert_eq!(default.events.len(), 1);
187        assert_eq!(default.events[0].source, "my.source");
188        assert_eq!(default.events[0].detail_type, "MyType");
189    }
190
191    #[test]
192    fn put_event_dispatches_matching_sqs_target() {
193        let state = make_shared();
194        let q_arn = "arn:aws:sqs:us-east-1:123456789012:q".to_string();
195        {
196            let mut s_accounts = state.write();
197            let s = s_accounts.default_mut();
198            let rule = make_rule("r", None, &q_arn);
199            s.rules
200                .insert(("default".to_string(), "r".to_string()), rule);
201        }
202        let recorder = Arc::new(Recorder::default());
203        let bus = Arc::new(DeliveryBus::new().with_sqs(recorder.clone()));
204        let delivery = EventBridgeDeliveryImpl::new(state, bus);
205        delivery.put_event("app", "Changed", r#"{"x":1}"#, "default");
206        let calls = recorder.sqs.lock().unwrap();
207        assert_eq!(calls.len(), 1);
208        assert_eq!(calls[0].0, q_arn);
209        let env: serde_json::Value = serde_json::from_str(&calls[0].1).unwrap();
210        assert_eq!(env["detail-type"], "Changed");
211        assert_eq!(env["source"], "app");
212    }
213
214    #[test]
215    fn put_event_dispatches_to_sns_target() {
216        let state = make_shared();
217        let topic_arn = "arn:aws:sns:us-east-1:123456789012:t".to_string();
218        {
219            let mut s_accounts = state.write();
220            let s = s_accounts.default_mut();
221            let rule = make_rule("r", None, &topic_arn);
222            s.rules
223                .insert(("default".to_string(), "r".to_string()), rule);
224        }
225        let recorder = Arc::new(Recorder::default());
226        let bus = Arc::new(DeliveryBus::new().with_sns(recorder.clone()));
227        let delivery = EventBridgeDeliveryImpl::new(state, bus);
228        delivery.put_event("app", "Changed", r#"{}"#, "default");
229        let calls = recorder.sns.lock().unwrap();
230        assert_eq!(calls.len(), 1);
231        assert_eq!(calls[0].0, topic_arn);
232        assert_eq!(calls[0].2.as_deref(), Some("Changed"));
233    }
234
235    #[test]
236    fn put_event_skips_disabled_rule() {
237        let state = make_shared();
238        let q_arn = "arn:aws:sqs:us-east-1:123456789012:q".to_string();
239        {
240            let mut s_accounts = state.write();
241            let s = s_accounts.default_mut();
242            let mut rule = make_rule("r", None, &q_arn);
243            rule.state = "DISABLED".to_string();
244            s.rules
245                .insert(("default".to_string(), "r".to_string()), rule);
246        }
247        let recorder = Arc::new(Recorder::default());
248        let bus = Arc::new(DeliveryBus::new().with_sqs(recorder.clone()));
249        let delivery = EventBridgeDeliveryImpl::new(state, bus);
250        delivery.put_event("app", "Changed", r#"{}"#, "default");
251        assert!(recorder.sqs.lock().unwrap().is_empty());
252    }
253
254    #[test]
255    fn put_event_skips_other_bus_rule() {
256        let state = make_shared();
257        let q_arn = "arn:aws:sqs:us-east-1:123456789012:q".to_string();
258        {
259            let mut s_accounts = state.write();
260            let s = s_accounts.default_mut();
261            let mut rule = make_rule("r", None, &q_arn);
262            rule.event_bus_name = "custom-bus".to_string();
263            s.rules
264                .insert(("custom-bus".to_string(), "r".to_string()), rule);
265        }
266        let recorder = Arc::new(Recorder::default());
267        let bus = Arc::new(DeliveryBus::new().with_sqs(recorder.clone()));
268        let delivery = EventBridgeDeliveryImpl::new(state, bus);
269        delivery.put_event("app", "Changed", r#"{}"#, "default");
270        assert!(recorder.sqs.lock().unwrap().is_empty());
271    }
272
273    #[test]
274    fn put_event_handles_invalid_detail_json_gracefully() {
275        let state = make_shared();
276        let q_arn = "arn:aws:sqs:us-east-1:123456789012:q".to_string();
277        {
278            let mut s_accounts = state.write();
279            let s = s_accounts.default_mut();
280            let rule = make_rule("r", None, &q_arn);
281            s.rules
282                .insert(("default".to_string(), "r".to_string()), rule);
283        }
284        let recorder = Arc::new(Recorder::default());
285        let bus = Arc::new(DeliveryBus::new().with_sqs(recorder.clone()));
286        let delivery = EventBridgeDeliveryImpl::new(state, bus);
287        delivery.put_event("app", "Type", "not-json", "default");
288        let calls = recorder.sqs.lock().unwrap();
289        assert_eq!(calls.len(), 1);
290        let env: serde_json::Value = serde_json::from_str(&calls[0].1).unwrap();
291        assert_eq!(env["detail"], serde_json::json!({}));
292    }
293}