Skip to main content

fakecloud_eventbridge/
delivery.rs

1use std::sync::Arc;
2
3use chrono::Utc;
4
5use fakecloud_core::delivery::{DeliveryBus, EventBridgeDelivery};
6use fakecloud_lambda::runtime::ContainerRuntime;
7use fakecloud_lambda::SharedLambdaState;
8use fakecloud_logs::SharedLogsState;
9
10use crate::service::{dispatch_event_target, matches_pattern, EventDispatchContext};
11use crate::state::{PutEvent, SharedEventBridgeState};
12
13/// Implements EventBridgeDelivery so other services (SES) can put events
14/// on an EventBridge bus with full rule matching and target delivery.
15pub struct EventBridgeDeliveryImpl {
16    state: SharedEventBridgeState,
17    delivery: Arc<DeliveryBus>,
18    lambda_state: Option<SharedLambdaState>,
19    logs_state: Option<SharedLogsState>,
20    container_runtime: Option<Arc<ContainerRuntime>>,
21}
22
23impl EventBridgeDeliveryImpl {
24    pub fn new(state: SharedEventBridgeState, delivery: Arc<DeliveryBus>) -> Self {
25        Self {
26            state,
27            delivery,
28            lambda_state: None,
29            logs_state: None,
30            container_runtime: None,
31        }
32    }
33
34    pub fn with_lambda(mut self, lambda_state: SharedLambdaState) -> Self {
35        self.lambda_state = Some(lambda_state);
36        self
37    }
38
39    pub fn with_logs(mut self, logs_state: SharedLogsState) -> Self {
40        self.logs_state = Some(logs_state);
41        self
42    }
43
44    pub fn with_runtime(mut self, runtime: Arc<ContainerRuntime>) -> Self {
45        self.container_runtime = Some(runtime);
46        self
47    }
48}
49
50impl EventBridgeDeliveryImpl {
51    fn put_event_in_account(
52        &self,
53        source: &str,
54        detail_type: &str,
55        detail: &str,
56        event_bus_name: &str,
57        target_account_id: Option<&str>,
58    ) {
59        let event_id = uuid::Uuid::new_v4().to_string();
60        let now = Utc::now();
61
62        let event = PutEvent {
63            event_id: event_id.clone(),
64            source: source.to_string(),
65            detail_type: detail_type.to_string(),
66            detail: detail.to_string(),
67            event_bus_name: event_bus_name.to_string(),
68            time: now,
69            resources: Vec::new(),
70        };
71
72        let mut accounts = self.state.write();
73        let state = match target_account_id {
74            Some(account_id) if !account_id.is_empty() => accounts.get_or_create(account_id),
75            _ => accounts.default_mut(),
76        };
77        state.events.push(event);
78
79        // Find matching rules and their targets
80        let account_id = state.account_id.clone();
81        let region = state.region.clone();
82        let matching_targets: Vec<_> = state
83            .rules
84            .values()
85            .filter(|r| {
86                r.event_bus_name == event_bus_name
87                    && r.state == "ENABLED"
88                    && matches_pattern(
89                        r.event_pattern.as_deref(),
90                        source,
91                        detail_type,
92                        detail,
93                        &account_id,
94                        &region,
95                        &[],
96                    )
97            })
98            .flat_map(|r| r.targets.clone())
99            .collect();
100
101        // Drop the lock before delivering
102        drop(accounts);
103
104        if matching_targets.is_empty() {
105            return;
106        }
107
108        // Build the EventBridge event envelope
109        let detail_value: serde_json::Value =
110            serde_json::from_str(detail).unwrap_or(serde_json::json!({}));
111        let event_json = serde_json::json!({
112            "version": "0",
113            "id": event_id,
114            "source": source,
115            "account": account_id,
116            "detail-type": detail_type,
117            "detail": detail_value,
118            "time": now.format("%Y-%m-%dT%H:%M:%SZ").to_string(),
119            "region": region,
120            "resources": [],
121        });
122        let event_str = event_json.to_string();
123
124        let _ = event_str;
125        let resolved_account = if let Some(acct) = target_account_id {
126            acct.to_string()
127        } else {
128            account_id.clone()
129        };
130        let ctx = EventDispatchContext {
131            state: &self.state,
132            delivery: &self.delivery,
133            lambda_state: self.lambda_state.as_ref(),
134            logs_state: self.logs_state.as_ref(),
135            container_runtime: &self.container_runtime,
136            account_id: &resolved_account,
137            region: &region,
138        };
139        for target in matching_targets {
140            dispatch_event_target(&ctx, &target, &event_json, &event_id, detail_type);
141        }
142    }
143}
144
145impl EventBridgeDelivery for EventBridgeDeliveryImpl {
146    fn put_event(&self, source: &str, detail_type: &str, detail: &str, event_bus_name: &str) {
147        self.put_event_in_account(source, detail_type, detail, event_bus_name, None);
148    }
149
150    fn put_event_to_account(
151        &self,
152        source: &str,
153        detail_type: &str,
154        detail: &str,
155        event_bus_name: &str,
156        target_account_id: &str,
157    ) {
158        self.put_event_in_account(
159            source,
160            detail_type,
161            detail,
162            event_bus_name,
163            Some(target_account_id),
164        );
165    }
166}
167
168#[cfg(test)]
169mod tests {
170    use super::*;
171    use crate::state::{EventRule, EventTarget as EbTarget, SharedEventBridgeState};
172    use fakecloud_aws::arn::Arn;
173    use fakecloud_core::delivery::{SnsDelivery, SqsDelivery};
174    use parking_lot::RwLock;
175    use std::collections::{BTreeMap, HashMap};
176    use std::sync::Mutex;
177
178    #[derive(Default)]
179    struct Recorder {
180        sqs: Mutex<Vec<(String, String)>>,
181        sns: Mutex<Vec<(String, String, Option<String>)>>,
182    }
183
184    impl SqsDelivery for Recorder {
185        fn deliver_to_queue(&self, arn: &str, body: &str, _: &HashMap<String, String>) {
186            self.sqs
187                .lock()
188                .unwrap()
189                .push((arn.to_string(), body.to_string()));
190        }
191        fn deliver_to_queue_with_attrs(
192            &self,
193            arn: &str,
194            body: &str,
195            _: &HashMap<String, fakecloud_core::delivery::SqsMessageAttribute>,
196            _: Option<&str>,
197            _: Option<&str>,
198        ) {
199            self.sqs
200                .lock()
201                .unwrap()
202                .push((arn.to_string(), body.to_string()));
203        }
204    }
205
206    impl SnsDelivery for Recorder {
207        fn publish_to_topic(&self, arn: &str, msg: &str, subject: Option<&str>) {
208            self.sns.lock().unwrap().push((
209                arn.to_string(),
210                msg.to_string(),
211                subject.map(|s| s.to_string()),
212            ));
213        }
214    }
215
216    fn make_shared() -> SharedEventBridgeState {
217        Arc::new(RwLock::new(
218            fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
219        ))
220    }
221
222    fn make_rule(name: &str, pattern: Option<&str>, target_arn: &str) -> EventRule {
223        EventRule {
224            name: name.to_string(),
225            arn: Arn::new(
226                "events",
227                "us-east-1",
228                "123456789012",
229                &format!("rule/{name}"),
230            )
231            .to_string(),
232            event_bus_name: "default".to_string(),
233            event_pattern: pattern.map(|s| s.to_string()),
234            schedule_expression: None,
235            state: "ENABLED".to_string(),
236            description: None,
237            role_arn: None,
238            managed_by: None,
239            created_by: None,
240            targets: vec![EbTarget {
241                id: "t1".to_string(),
242                arn: target_arn.to_string(),
243                input: None,
244                input_path: None,
245                input_transformer: None,
246                sqs_parameters: None,
247                ..Default::default()
248            }],
249            tags: BTreeMap::new(),
250            last_fired: None,
251        }
252    }
253
254    #[test]
255    fn put_event_appends_to_events_log() {
256        let state = make_shared();
257        let bus = Arc::new(DeliveryBus::new());
258        let delivery = EventBridgeDeliveryImpl::new(state.clone(), bus);
259        delivery.put_event("my.source", "MyType", r#"{"k":"v"}"#, "default");
260        let guard = state.read();
261        let default = guard.default_ref();
262        assert_eq!(default.events.len(), 1);
263        assert_eq!(default.events[0].source, "my.source");
264        assert_eq!(default.events[0].detail_type, "MyType");
265    }
266
267    #[test]
268    fn put_event_dispatches_matching_sqs_target() {
269        let state = make_shared();
270        let q_arn = "arn:aws:sqs:us-east-1:123456789012:q".to_string();
271        {
272            let mut s_accounts = state.write();
273            let s = s_accounts.default_mut();
274            let rule = make_rule("r", None, &q_arn);
275            s.rules
276                .insert(("default".to_string(), "r".to_string()), rule);
277        }
278        let recorder = Arc::new(Recorder::default());
279        let bus = Arc::new(DeliveryBus::new().with_sqs(recorder.clone()));
280        let delivery = EventBridgeDeliveryImpl::new(state, bus);
281        delivery.put_event("app", "Changed", r#"{"x":1}"#, "default");
282        let calls = recorder.sqs.lock().unwrap();
283        assert_eq!(calls.len(), 1);
284        assert_eq!(calls[0].0, q_arn);
285        let env: serde_json::Value = serde_json::from_str(&calls[0].1).unwrap();
286        assert_eq!(env["detail-type"], "Changed");
287        assert_eq!(env["source"], "app");
288    }
289
290    #[test]
291    fn put_event_dispatches_to_sns_target() {
292        let state = make_shared();
293        let topic_arn = "arn:aws:sns:us-east-1:123456789012:t".to_string();
294        {
295            let mut s_accounts = state.write();
296            let s = s_accounts.default_mut();
297            let rule = make_rule("r", None, &topic_arn);
298            s.rules
299                .insert(("default".to_string(), "r".to_string()), rule);
300        }
301        let recorder = Arc::new(Recorder::default());
302        let bus = Arc::new(DeliveryBus::new().with_sns(recorder.clone()));
303        let delivery = EventBridgeDeliveryImpl::new(state, bus);
304        delivery.put_event("app", "Changed", r#"{}"#, "default");
305        let calls = recorder.sns.lock().unwrap();
306        assert_eq!(calls.len(), 1);
307        assert_eq!(calls[0].0, topic_arn);
308        assert_eq!(calls[0].2.as_deref(), Some("Changed"));
309    }
310
311    #[test]
312    fn put_event_skips_disabled_rule() {
313        let state = make_shared();
314        let q_arn = "arn:aws:sqs:us-east-1:123456789012:q".to_string();
315        {
316            let mut s_accounts = state.write();
317            let s = s_accounts.default_mut();
318            let mut rule = make_rule("r", None, &q_arn);
319            rule.state = "DISABLED".to_string();
320            s.rules
321                .insert(("default".to_string(), "r".to_string()), rule);
322        }
323        let recorder = Arc::new(Recorder::default());
324        let bus = Arc::new(DeliveryBus::new().with_sqs(recorder.clone()));
325        let delivery = EventBridgeDeliveryImpl::new(state, bus);
326        delivery.put_event("app", "Changed", r#"{}"#, "default");
327        assert!(recorder.sqs.lock().unwrap().is_empty());
328    }
329
330    #[test]
331    fn put_event_skips_other_bus_rule() {
332        let state = make_shared();
333        let q_arn = "arn:aws:sqs:us-east-1:123456789012:q".to_string();
334        {
335            let mut s_accounts = state.write();
336            let s = s_accounts.default_mut();
337            let mut rule = make_rule("r", None, &q_arn);
338            rule.event_bus_name = "custom-bus".to_string();
339            s.rules
340                .insert(("custom-bus".to_string(), "r".to_string()), rule);
341        }
342        let recorder = Arc::new(Recorder::default());
343        let bus = Arc::new(DeliveryBus::new().with_sqs(recorder.clone()));
344        let delivery = EventBridgeDeliveryImpl::new(state, bus);
345        delivery.put_event("app", "Changed", r#"{}"#, "default");
346        assert!(recorder.sqs.lock().unwrap().is_empty());
347    }
348
349    #[test]
350    fn put_event_handles_invalid_detail_json_gracefully() {
351        let state = make_shared();
352        let q_arn = "arn:aws:sqs:us-east-1:123456789012:q".to_string();
353        {
354            let mut s_accounts = state.write();
355            let s = s_accounts.default_mut();
356            let rule = make_rule("r", None, &q_arn);
357            s.rules
358                .insert(("default".to_string(), "r".to_string()), rule);
359        }
360        let recorder = Arc::new(Recorder::default());
361        let bus = Arc::new(DeliveryBus::new().with_sqs(recorder.clone()));
362        let delivery = EventBridgeDeliveryImpl::new(state, bus);
363        delivery.put_event("app", "Type", "not-json", "default");
364        let calls = recorder.sqs.lock().unwrap();
365        assert_eq!(calls.len(), 1);
366        let env: serde_json::Value = serde_json::from_str(&calls[0].1).unwrap();
367        assert_eq!(env["detail"], serde_json::json!({}));
368    }
369
370    #[test]
371    fn put_event_to_account_writes_to_target_account_bus() {
372        let state = make_shared();
373        let bus = Arc::new(DeliveryBus::new());
374        let delivery = EventBridgeDeliveryImpl::new(state.clone(), bus);
375        delivery.put_event_to_account("scheduler", "Fired", r#"{}"#, "default", "999988887777");
376
377        let guard = state.read();
378        let target = guard
379            .get("999988887777")
380            .expect("target account should be created on demand");
381        assert_eq!(target.events.len(), 1);
382        assert_eq!(target.events[0].source, "scheduler");
383        // The default account's bus should be untouched.
384        assert!(guard.default_ref().events.is_empty());
385    }
386
387    #[test]
388    fn put_event_to_account_dispatches_to_rules_in_target_account() {
389        let state = make_shared();
390        let q_arn = "arn:aws:sqs:us-east-1:999988887777:cross-q".to_string();
391        {
392            let mut s_accounts = state.write();
393            let s = s_accounts.get_or_create("999988887777");
394            let rule = make_rule("xacct-rule", None, &q_arn);
395            s.rules
396                .insert(("default".to_string(), "xacct-rule".to_string()), rule);
397        }
398        let recorder = Arc::new(Recorder::default());
399        let bus = Arc::new(DeliveryBus::new().with_sqs(recorder.clone()));
400        let delivery = EventBridgeDeliveryImpl::new(state, bus);
401        delivery.put_event_to_account(
402            "scheduler",
403            "Cross",
404            r#"{"hi":1}"#,
405            "default",
406            "999988887777",
407        );
408        let calls = recorder.sqs.lock().unwrap();
409        assert_eq!(calls.len(), 1);
410        assert_eq!(calls[0].0, q_arn);
411    }
412}