1use std::collections::HashMap;
2use std::sync::Arc;
3
4use chrono::Utc;
5
6use fakecloud_core::delivery::{DeliveryBus, EventBridgeDelivery};
7
8use crate::service::matches_pattern;
9use crate::state::{PutEvent, SharedEventBridgeState};
10
11pub struct EventBridgeDeliveryImpl {
14 state: SharedEventBridgeState,
15 delivery: Arc<DeliveryBus>,
16}
17
18impl EventBridgeDeliveryImpl {
19 pub fn new(state: SharedEventBridgeState, delivery: Arc<DeliveryBus>) -> Self {
20 Self { state, delivery }
21 }
22}
23
24impl EventBridgeDeliveryImpl {
25 fn put_event_in_account(
26 &self,
27 source: &str,
28 detail_type: &str,
29 detail: &str,
30 event_bus_name: &str,
31 target_account_id: Option<&str>,
32 ) {
33 let event_id = uuid::Uuid::new_v4().to_string();
34 let now = Utc::now();
35
36 let event = PutEvent {
37 event_id: event_id.clone(),
38 source: source.to_string(),
39 detail_type: detail_type.to_string(),
40 detail: detail.to_string(),
41 event_bus_name: event_bus_name.to_string(),
42 time: now,
43 resources: Vec::new(),
44 };
45
46 let mut accounts = self.state.write();
47 let state = match target_account_id {
48 Some(account_id) if !account_id.is_empty() => accounts.get_or_create(account_id),
49 _ => accounts.default_mut(),
50 };
51 state.events.push(event);
52
53 let account_id = state.account_id.clone();
55 let region = state.region.clone();
56 let matching_targets: Vec<_> = state
57 .rules
58 .values()
59 .filter(|r| {
60 r.event_bus_name == event_bus_name
61 && r.state == "ENABLED"
62 && matches_pattern(
63 r.event_pattern.as_deref(),
64 source,
65 detail_type,
66 detail,
67 &account_id,
68 ®ion,
69 &[],
70 )
71 })
72 .flat_map(|r| r.targets.clone())
73 .collect();
74
75 drop(accounts);
77
78 if matching_targets.is_empty() {
79 return;
80 }
81
82 let detail_value: serde_json::Value =
84 serde_json::from_str(detail).unwrap_or(serde_json::json!({}));
85 let event_json = serde_json::json!({
86 "version": "0",
87 "id": event_id,
88 "source": source,
89 "account": account_id,
90 "detail-type": detail_type,
91 "detail": detail_value,
92 "time": now.format("%Y-%m-%dT%H:%M:%SZ").to_string(),
93 "region": region,
94 "resources": [],
95 });
96 let event_str = event_json.to_string();
97
98 for target in matching_targets {
99 let arn = &target.arn;
100 if arn.contains(":sqs:") {
101 self.delivery.send_to_sqs(arn, &event_str, &HashMap::new());
102 } else if arn.contains(":sns:") {
103 self.delivery
104 .publish_to_sns(arn, &event_str, Some(detail_type));
105 }
106 }
108 }
109}
110
111impl EventBridgeDelivery for EventBridgeDeliveryImpl {
112 fn put_event(&self, source: &str, detail_type: &str, detail: &str, event_bus_name: &str) {
113 self.put_event_in_account(source, detail_type, detail, event_bus_name, None);
114 }
115
116 fn put_event_to_account(
117 &self,
118 source: &str,
119 detail_type: &str,
120 detail: &str,
121 event_bus_name: &str,
122 target_account_id: &str,
123 ) {
124 self.put_event_in_account(
125 source,
126 detail_type,
127 detail,
128 event_bus_name,
129 Some(target_account_id),
130 );
131 }
132}
133
134#[cfg(test)]
135mod tests {
136 use super::*;
137 use crate::state::{EventRule, EventTarget as EbTarget, SharedEventBridgeState};
138 use fakecloud_core::delivery::{SnsDelivery, SqsDelivery};
139 use parking_lot::RwLock;
140 use std::sync::Mutex;
141
142 #[derive(Default)]
143 struct Recorder {
144 sqs: Mutex<Vec<(String, String)>>,
145 sns: Mutex<Vec<(String, String, Option<String>)>>,
146 }
147
148 impl SqsDelivery for Recorder {
149 fn deliver_to_queue(&self, arn: &str, body: &str, _: &HashMap<String, String>) {
150 self.sqs
151 .lock()
152 .unwrap()
153 .push((arn.to_string(), body.to_string()));
154 }
155 fn deliver_to_queue_with_attrs(
156 &self,
157 arn: &str,
158 body: &str,
159 _: &HashMap<String, fakecloud_core::delivery::SqsMessageAttribute>,
160 _: Option<&str>,
161 _: Option<&str>,
162 ) {
163 self.sqs
164 .lock()
165 .unwrap()
166 .push((arn.to_string(), body.to_string()));
167 }
168 }
169
170 impl SnsDelivery for Recorder {
171 fn publish_to_topic(&self, arn: &str, msg: &str, subject: Option<&str>) {
172 self.sns.lock().unwrap().push((
173 arn.to_string(),
174 msg.to_string(),
175 subject.map(|s| s.to_string()),
176 ));
177 }
178 }
179
180 fn make_shared() -> SharedEventBridgeState {
181 Arc::new(RwLock::new(
182 fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
183 ))
184 }
185
186 fn make_rule(name: &str, pattern: Option<&str>, target_arn: &str) -> EventRule {
187 EventRule {
188 name: name.to_string(),
189 arn: format!("arn:aws:events:us-east-1:123456789012:rule/{name}"),
190 event_bus_name: "default".to_string(),
191 event_pattern: pattern.map(|s| s.to_string()),
192 schedule_expression: None,
193 state: "ENABLED".to_string(),
194 description: None,
195 role_arn: None,
196 managed_by: None,
197 created_by: None,
198 targets: vec![EbTarget {
199 id: "t1".to_string(),
200 arn: target_arn.to_string(),
201 input: None,
202 input_path: None,
203 input_transformer: None,
204 sqs_parameters: None,
205 }],
206 tags: HashMap::new(),
207 last_fired: None,
208 }
209 }
210
211 #[test]
212 fn put_event_appends_to_events_log() {
213 let state = make_shared();
214 let bus = Arc::new(DeliveryBus::new());
215 let delivery = EventBridgeDeliveryImpl::new(state.clone(), bus);
216 delivery.put_event("my.source", "MyType", r#"{"k":"v"}"#, "default");
217 let guard = state.read();
218 let default = guard.default_ref();
219 assert_eq!(default.events.len(), 1);
220 assert_eq!(default.events[0].source, "my.source");
221 assert_eq!(default.events[0].detail_type, "MyType");
222 }
223
224 #[test]
225 fn put_event_dispatches_matching_sqs_target() {
226 let state = make_shared();
227 let q_arn = "arn:aws:sqs:us-east-1:123456789012:q".to_string();
228 {
229 let mut s_accounts = state.write();
230 let s = s_accounts.default_mut();
231 let rule = make_rule("r", None, &q_arn);
232 s.rules
233 .insert(("default".to_string(), "r".to_string()), rule);
234 }
235 let recorder = Arc::new(Recorder::default());
236 let bus = Arc::new(DeliveryBus::new().with_sqs(recorder.clone()));
237 let delivery = EventBridgeDeliveryImpl::new(state, bus);
238 delivery.put_event("app", "Changed", r#"{"x":1}"#, "default");
239 let calls = recorder.sqs.lock().unwrap();
240 assert_eq!(calls.len(), 1);
241 assert_eq!(calls[0].0, q_arn);
242 let env: serde_json::Value = serde_json::from_str(&calls[0].1).unwrap();
243 assert_eq!(env["detail-type"], "Changed");
244 assert_eq!(env["source"], "app");
245 }
246
247 #[test]
248 fn put_event_dispatches_to_sns_target() {
249 let state = make_shared();
250 let topic_arn = "arn:aws:sns:us-east-1:123456789012:t".to_string();
251 {
252 let mut s_accounts = state.write();
253 let s = s_accounts.default_mut();
254 let rule = make_rule("r", None, &topic_arn);
255 s.rules
256 .insert(("default".to_string(), "r".to_string()), rule);
257 }
258 let recorder = Arc::new(Recorder::default());
259 let bus = Arc::new(DeliveryBus::new().with_sns(recorder.clone()));
260 let delivery = EventBridgeDeliveryImpl::new(state, bus);
261 delivery.put_event("app", "Changed", r#"{}"#, "default");
262 let calls = recorder.sns.lock().unwrap();
263 assert_eq!(calls.len(), 1);
264 assert_eq!(calls[0].0, topic_arn);
265 assert_eq!(calls[0].2.as_deref(), Some("Changed"));
266 }
267
268 #[test]
269 fn put_event_skips_disabled_rule() {
270 let state = make_shared();
271 let q_arn = "arn:aws:sqs:us-east-1:123456789012:q".to_string();
272 {
273 let mut s_accounts = state.write();
274 let s = s_accounts.default_mut();
275 let mut rule = make_rule("r", None, &q_arn);
276 rule.state = "DISABLED".to_string();
277 s.rules
278 .insert(("default".to_string(), "r".to_string()), rule);
279 }
280 let recorder = Arc::new(Recorder::default());
281 let bus = Arc::new(DeliveryBus::new().with_sqs(recorder.clone()));
282 let delivery = EventBridgeDeliveryImpl::new(state, bus);
283 delivery.put_event("app", "Changed", r#"{}"#, "default");
284 assert!(recorder.sqs.lock().unwrap().is_empty());
285 }
286
287 #[test]
288 fn put_event_skips_other_bus_rule() {
289 let state = make_shared();
290 let q_arn = "arn:aws:sqs:us-east-1:123456789012:q".to_string();
291 {
292 let mut s_accounts = state.write();
293 let s = s_accounts.default_mut();
294 let mut rule = make_rule("r", None, &q_arn);
295 rule.event_bus_name = "custom-bus".to_string();
296 s.rules
297 .insert(("custom-bus".to_string(), "r".to_string()), rule);
298 }
299 let recorder = Arc::new(Recorder::default());
300 let bus = Arc::new(DeliveryBus::new().with_sqs(recorder.clone()));
301 let delivery = EventBridgeDeliveryImpl::new(state, bus);
302 delivery.put_event("app", "Changed", r#"{}"#, "default");
303 assert!(recorder.sqs.lock().unwrap().is_empty());
304 }
305
306 #[test]
307 fn put_event_handles_invalid_detail_json_gracefully() {
308 let state = make_shared();
309 let q_arn = "arn:aws:sqs:us-east-1:123456789012:q".to_string();
310 {
311 let mut s_accounts = state.write();
312 let s = s_accounts.default_mut();
313 let rule = make_rule("r", None, &q_arn);
314 s.rules
315 .insert(("default".to_string(), "r".to_string()), rule);
316 }
317 let recorder = Arc::new(Recorder::default());
318 let bus = Arc::new(DeliveryBus::new().with_sqs(recorder.clone()));
319 let delivery = EventBridgeDeliveryImpl::new(state, bus);
320 delivery.put_event("app", "Type", "not-json", "default");
321 let calls = recorder.sqs.lock().unwrap();
322 assert_eq!(calls.len(), 1);
323 let env: serde_json::Value = serde_json::from_str(&calls[0].1).unwrap();
324 assert_eq!(env["detail"], serde_json::json!({}));
325 }
326
327 #[test]
328 fn put_event_to_account_writes_to_target_account_bus() {
329 let state = make_shared();
330 let bus = Arc::new(DeliveryBus::new());
331 let delivery = EventBridgeDeliveryImpl::new(state.clone(), bus);
332 delivery.put_event_to_account("scheduler", "Fired", r#"{}"#, "default", "999988887777");
333
334 let guard = state.read();
335 let target = guard
336 .get("999988887777")
337 .expect("target account should be created on demand");
338 assert_eq!(target.events.len(), 1);
339 assert_eq!(target.events[0].source, "scheduler");
340 assert!(guard.default_ref().events.is_empty());
342 }
343
344 #[test]
345 fn put_event_to_account_dispatches_to_rules_in_target_account() {
346 let state = make_shared();
347 let q_arn = "arn:aws:sqs:us-east-1:999988887777:cross-q".to_string();
348 {
349 let mut s_accounts = state.write();
350 let s = s_accounts.get_or_create("999988887777");
351 let rule = make_rule("xacct-rule", None, &q_arn);
352 s.rules
353 .insert(("default".to_string(), "xacct-rule".to_string()), rule);
354 }
355 let recorder = Arc::new(Recorder::default());
356 let bus = Arc::new(DeliveryBus::new().with_sqs(recorder.clone()));
357 let delivery = EventBridgeDeliveryImpl::new(state, bus);
358 delivery.put_event_to_account(
359 "scheduler",
360 "Cross",
361 r#"{"hi":1}"#,
362 "default",
363 "999988887777",
364 );
365 let calls = recorder.sqs.lock().unwrap();
366 assert_eq!(calls.len(), 1);
367 assert_eq!(calls[0].0, q_arn);
368 }
369}