Skip to main content

fakecloud_eventbridge/
delivery.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use chrono::Utc;
5
6use fakecloud_core::delivery::{DeliveryBus, EventBridgeDelivery};
7
8use crate::service::matches_pattern;
9use crate::state::{PutEvent, SharedEventBridgeState};
10
11/// Implements EventBridgeDelivery so other services (SES) can put events
12/// on an EventBridge bus with full rule matching and target delivery.
13pub struct EventBridgeDeliveryImpl {
14    state: SharedEventBridgeState,
15    delivery: Arc<DeliveryBus>,
16}
17
18impl EventBridgeDeliveryImpl {
19    pub fn new(state: SharedEventBridgeState, delivery: Arc<DeliveryBus>) -> Self {
20        Self { state, delivery }
21    }
22}
23
24impl EventBridgeDeliveryImpl {
25    fn put_event_in_account(
26        &self,
27        source: &str,
28        detail_type: &str,
29        detail: &str,
30        event_bus_name: &str,
31        target_account_id: Option<&str>,
32    ) {
33        let event_id = uuid::Uuid::new_v4().to_string();
34        let now = Utc::now();
35
36        let event = PutEvent {
37            event_id: event_id.clone(),
38            source: source.to_string(),
39            detail_type: detail_type.to_string(),
40            detail: detail.to_string(),
41            event_bus_name: event_bus_name.to_string(),
42            time: now,
43            resources: Vec::new(),
44        };
45
46        let mut accounts = self.state.write();
47        let state = match target_account_id {
48            Some(account_id) if !account_id.is_empty() => accounts.get_or_create(account_id),
49            _ => accounts.default_mut(),
50        };
51        state.events.push(event);
52
53        // Find matching rules and their targets
54        let account_id = state.account_id.clone();
55        let region = state.region.clone();
56        let matching_targets: Vec<_> = state
57            .rules
58            .values()
59            .filter(|r| {
60                r.event_bus_name == event_bus_name
61                    && r.state == "ENABLED"
62                    && matches_pattern(
63                        r.event_pattern.as_deref(),
64                        source,
65                        detail_type,
66                        detail,
67                        &account_id,
68                        &region,
69                        &[],
70                    )
71            })
72            .flat_map(|r| r.targets.clone())
73            .collect();
74
75        // Drop the lock before delivering
76        drop(accounts);
77
78        if matching_targets.is_empty() {
79            return;
80        }
81
82        // Build the EventBridge event envelope
83        let detail_value: serde_json::Value =
84            serde_json::from_str(detail).unwrap_or(serde_json::json!({}));
85        let event_json = serde_json::json!({
86            "version": "0",
87            "id": event_id,
88            "source": source,
89            "account": account_id,
90            "detail-type": detail_type,
91            "detail": detail_value,
92            "time": now.format("%Y-%m-%dT%H:%M:%SZ").to_string(),
93            "region": region,
94            "resources": [],
95        });
96        let event_str = event_json.to_string();
97
98        for target in matching_targets {
99            let arn = &target.arn;
100            if arn.contains(":sqs:") {
101                self.delivery.send_to_sqs(arn, &event_str, &HashMap::new());
102            } else if arn.contains(":sns:") {
103                self.delivery
104                    .publish_to_sns(arn, &event_str, Some(detail_type));
105            }
106            // Lambda and other targets could be added here
107        }
108    }
109}
110
111impl EventBridgeDelivery for EventBridgeDeliveryImpl {
112    fn put_event(&self, source: &str, detail_type: &str, detail: &str, event_bus_name: &str) {
113        self.put_event_in_account(source, detail_type, detail, event_bus_name, None);
114    }
115
116    fn put_event_to_account(
117        &self,
118        source: &str,
119        detail_type: &str,
120        detail: &str,
121        event_bus_name: &str,
122        target_account_id: &str,
123    ) {
124        self.put_event_in_account(
125            source,
126            detail_type,
127            detail,
128            event_bus_name,
129            Some(target_account_id),
130        );
131    }
132}
133
134#[cfg(test)]
135mod tests {
136    use super::*;
137    use crate::state::{EventRule, EventTarget as EbTarget, SharedEventBridgeState};
138    use fakecloud_core::delivery::{SnsDelivery, SqsDelivery};
139    use parking_lot::RwLock;
140    use std::sync::Mutex;
141
142    #[derive(Default)]
143    struct Recorder {
144        sqs: Mutex<Vec<(String, String)>>,
145        sns: Mutex<Vec<(String, String, Option<String>)>>,
146    }
147
148    impl SqsDelivery for Recorder {
149        fn deliver_to_queue(&self, arn: &str, body: &str, _: &HashMap<String, String>) {
150            self.sqs
151                .lock()
152                .unwrap()
153                .push((arn.to_string(), body.to_string()));
154        }
155        fn deliver_to_queue_with_attrs(
156            &self,
157            arn: &str,
158            body: &str,
159            _: &HashMap<String, fakecloud_core::delivery::SqsMessageAttribute>,
160            _: Option<&str>,
161            _: Option<&str>,
162        ) {
163            self.sqs
164                .lock()
165                .unwrap()
166                .push((arn.to_string(), body.to_string()));
167        }
168    }
169
170    impl SnsDelivery for Recorder {
171        fn publish_to_topic(&self, arn: &str, msg: &str, subject: Option<&str>) {
172            self.sns.lock().unwrap().push((
173                arn.to_string(),
174                msg.to_string(),
175                subject.map(|s| s.to_string()),
176            ));
177        }
178    }
179
180    fn make_shared() -> SharedEventBridgeState {
181        Arc::new(RwLock::new(
182            fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
183        ))
184    }
185
186    fn make_rule(name: &str, pattern: Option<&str>, target_arn: &str) -> EventRule {
187        EventRule {
188            name: name.to_string(),
189            arn: format!("arn:aws:events:us-east-1:123456789012:rule/{name}"),
190            event_bus_name: "default".to_string(),
191            event_pattern: pattern.map(|s| s.to_string()),
192            schedule_expression: None,
193            state: "ENABLED".to_string(),
194            description: None,
195            role_arn: None,
196            managed_by: None,
197            created_by: None,
198            targets: vec![EbTarget {
199                id: "t1".to_string(),
200                arn: target_arn.to_string(),
201                input: None,
202                input_path: None,
203                input_transformer: None,
204                sqs_parameters: None,
205            }],
206            tags: HashMap::new(),
207            last_fired: None,
208        }
209    }
210
211    #[test]
212    fn put_event_appends_to_events_log() {
213        let state = make_shared();
214        let bus = Arc::new(DeliveryBus::new());
215        let delivery = EventBridgeDeliveryImpl::new(state.clone(), bus);
216        delivery.put_event("my.source", "MyType", r#"{"k":"v"}"#, "default");
217        let guard = state.read();
218        let default = guard.default_ref();
219        assert_eq!(default.events.len(), 1);
220        assert_eq!(default.events[0].source, "my.source");
221        assert_eq!(default.events[0].detail_type, "MyType");
222    }
223
224    #[test]
225    fn put_event_dispatches_matching_sqs_target() {
226        let state = make_shared();
227        let q_arn = "arn:aws:sqs:us-east-1:123456789012:q".to_string();
228        {
229            let mut s_accounts = state.write();
230            let s = s_accounts.default_mut();
231            let rule = make_rule("r", None, &q_arn);
232            s.rules
233                .insert(("default".to_string(), "r".to_string()), rule);
234        }
235        let recorder = Arc::new(Recorder::default());
236        let bus = Arc::new(DeliveryBus::new().with_sqs(recorder.clone()));
237        let delivery = EventBridgeDeliveryImpl::new(state, bus);
238        delivery.put_event("app", "Changed", r#"{"x":1}"#, "default");
239        let calls = recorder.sqs.lock().unwrap();
240        assert_eq!(calls.len(), 1);
241        assert_eq!(calls[0].0, q_arn);
242        let env: serde_json::Value = serde_json::from_str(&calls[0].1).unwrap();
243        assert_eq!(env["detail-type"], "Changed");
244        assert_eq!(env["source"], "app");
245    }
246
247    #[test]
248    fn put_event_dispatches_to_sns_target() {
249        let state = make_shared();
250        let topic_arn = "arn:aws:sns:us-east-1:123456789012:t".to_string();
251        {
252            let mut s_accounts = state.write();
253            let s = s_accounts.default_mut();
254            let rule = make_rule("r", None, &topic_arn);
255            s.rules
256                .insert(("default".to_string(), "r".to_string()), rule);
257        }
258        let recorder = Arc::new(Recorder::default());
259        let bus = Arc::new(DeliveryBus::new().with_sns(recorder.clone()));
260        let delivery = EventBridgeDeliveryImpl::new(state, bus);
261        delivery.put_event("app", "Changed", r#"{}"#, "default");
262        let calls = recorder.sns.lock().unwrap();
263        assert_eq!(calls.len(), 1);
264        assert_eq!(calls[0].0, topic_arn);
265        assert_eq!(calls[0].2.as_deref(), Some("Changed"));
266    }
267
268    #[test]
269    fn put_event_skips_disabled_rule() {
270        let state = make_shared();
271        let q_arn = "arn:aws:sqs:us-east-1:123456789012:q".to_string();
272        {
273            let mut s_accounts = state.write();
274            let s = s_accounts.default_mut();
275            let mut rule = make_rule("r", None, &q_arn);
276            rule.state = "DISABLED".to_string();
277            s.rules
278                .insert(("default".to_string(), "r".to_string()), rule);
279        }
280        let recorder = Arc::new(Recorder::default());
281        let bus = Arc::new(DeliveryBus::new().with_sqs(recorder.clone()));
282        let delivery = EventBridgeDeliveryImpl::new(state, bus);
283        delivery.put_event("app", "Changed", r#"{}"#, "default");
284        assert!(recorder.sqs.lock().unwrap().is_empty());
285    }
286
287    #[test]
288    fn put_event_skips_other_bus_rule() {
289        let state = make_shared();
290        let q_arn = "arn:aws:sqs:us-east-1:123456789012:q".to_string();
291        {
292            let mut s_accounts = state.write();
293            let s = s_accounts.default_mut();
294            let mut rule = make_rule("r", None, &q_arn);
295            rule.event_bus_name = "custom-bus".to_string();
296            s.rules
297                .insert(("custom-bus".to_string(), "r".to_string()), rule);
298        }
299        let recorder = Arc::new(Recorder::default());
300        let bus = Arc::new(DeliveryBus::new().with_sqs(recorder.clone()));
301        let delivery = EventBridgeDeliveryImpl::new(state, bus);
302        delivery.put_event("app", "Changed", r#"{}"#, "default");
303        assert!(recorder.sqs.lock().unwrap().is_empty());
304    }
305
306    #[test]
307    fn put_event_handles_invalid_detail_json_gracefully() {
308        let state = make_shared();
309        let q_arn = "arn:aws:sqs:us-east-1:123456789012:q".to_string();
310        {
311            let mut s_accounts = state.write();
312            let s = s_accounts.default_mut();
313            let rule = make_rule("r", None, &q_arn);
314            s.rules
315                .insert(("default".to_string(), "r".to_string()), rule);
316        }
317        let recorder = Arc::new(Recorder::default());
318        let bus = Arc::new(DeliveryBus::new().with_sqs(recorder.clone()));
319        let delivery = EventBridgeDeliveryImpl::new(state, bus);
320        delivery.put_event("app", "Type", "not-json", "default");
321        let calls = recorder.sqs.lock().unwrap();
322        assert_eq!(calls.len(), 1);
323        let env: serde_json::Value = serde_json::from_str(&calls[0].1).unwrap();
324        assert_eq!(env["detail"], serde_json::json!({}));
325    }
326
327    #[test]
328    fn put_event_to_account_writes_to_target_account_bus() {
329        let state = make_shared();
330        let bus = Arc::new(DeliveryBus::new());
331        let delivery = EventBridgeDeliveryImpl::new(state.clone(), bus);
332        delivery.put_event_to_account("scheduler", "Fired", r#"{}"#, "default", "999988887777");
333
334        let guard = state.read();
335        let target = guard
336            .get("999988887777")
337            .expect("target account should be created on demand");
338        assert_eq!(target.events.len(), 1);
339        assert_eq!(target.events[0].source, "scheduler");
340        // The default account's bus should be untouched.
341        assert!(guard.default_ref().events.is_empty());
342    }
343
344    #[test]
345    fn put_event_to_account_dispatches_to_rules_in_target_account() {
346        let state = make_shared();
347        let q_arn = "arn:aws:sqs:us-east-1:999988887777:cross-q".to_string();
348        {
349            let mut s_accounts = state.write();
350            let s = s_accounts.get_or_create("999988887777");
351            let rule = make_rule("xacct-rule", None, &q_arn);
352            s.rules
353                .insert(("default".to_string(), "xacct-rule".to_string()), rule);
354        }
355        let recorder = Arc::new(Recorder::default());
356        let bus = Arc::new(DeliveryBus::new().with_sqs(recorder.clone()));
357        let delivery = EventBridgeDeliveryImpl::new(state, bus);
358        delivery.put_event_to_account(
359            "scheduler",
360            "Cross",
361            r#"{"hi":1}"#,
362            "default",
363            "999988887777",
364        );
365        let calls = recorder.sqs.lock().unwrap();
366        assert_eq!(calls.len(), 1);
367        assert_eq!(calls[0].0, q_arn);
368    }
369}