1use std::sync::Arc;
2
3use chrono::Utc;
4
5use fakecloud_core::delivery::{DeliveryBus, EventBridgeDelivery};
6use fakecloud_lambda::runtime::ContainerRuntime;
7use fakecloud_lambda::SharedLambdaState;
8use fakecloud_logs::SharedLogsState;
9
10use crate::service::{dispatch_event_target, matches_pattern, EventDispatchContext};
11use crate::state::{PutEvent, SharedEventBridgeState};
12
13pub struct EventBridgeDeliveryImpl {
16 state: SharedEventBridgeState,
17 delivery: Arc<DeliveryBus>,
18 lambda_state: Option<SharedLambdaState>,
19 logs_state: Option<SharedLogsState>,
20 container_runtime: Option<Arc<ContainerRuntime>>,
21}
22
23impl EventBridgeDeliveryImpl {
24 pub fn new(state: SharedEventBridgeState, delivery: Arc<DeliveryBus>) -> Self {
25 Self {
26 state,
27 delivery,
28 lambda_state: None,
29 logs_state: None,
30 container_runtime: None,
31 }
32 }
33
34 pub fn with_lambda(mut self, lambda_state: SharedLambdaState) -> Self {
35 self.lambda_state = Some(lambda_state);
36 self
37 }
38
39 pub fn with_logs(mut self, logs_state: SharedLogsState) -> Self {
40 self.logs_state = Some(logs_state);
41 self
42 }
43
44 pub fn with_runtime(mut self, runtime: Arc<ContainerRuntime>) -> Self {
45 self.container_runtime = Some(runtime);
46 self
47 }
48}
49
50impl EventBridgeDeliveryImpl {
51 fn put_event_in_account(
52 &self,
53 source: &str,
54 detail_type: &str,
55 detail: &str,
56 event_bus_name: &str,
57 target_account_id: Option<&str>,
58 ) {
59 let event_id = uuid::Uuid::new_v4().to_string();
60 let now = Utc::now();
61
62 let event = PutEvent {
63 event_id: event_id.clone(),
64 source: source.to_string(),
65 detail_type: detail_type.to_string(),
66 detail: detail.to_string(),
67 event_bus_name: event_bus_name.to_string(),
68 time: now,
69 resources: Vec::new(),
70 };
71
72 let mut accounts = self.state.write();
73 let state = match target_account_id {
74 Some(account_id) if !account_id.is_empty() => accounts.get_or_create(account_id),
75 _ => accounts.default_mut(),
76 };
77 state.events.push(event);
78
79 let account_id = state.account_id.clone();
81 let region = state.region.clone();
82 let matching_targets: Vec<_> = state
83 .rules
84 .values()
85 .filter(|r| {
86 r.event_bus_name == event_bus_name
87 && r.state == "ENABLED"
88 && matches_pattern(
89 r.event_pattern.as_deref(),
90 source,
91 detail_type,
92 detail,
93 &account_id,
94 ®ion,
95 &[],
96 )
97 })
98 .flat_map(|r| r.targets.clone())
99 .collect();
100
101 drop(accounts);
103
104 if matching_targets.is_empty() {
105 return;
106 }
107
108 let detail_value: serde_json::Value =
110 serde_json::from_str(detail).unwrap_or(serde_json::json!({}));
111 let event_json = serde_json::json!({
112 "version": "0",
113 "id": event_id,
114 "source": source,
115 "account": account_id,
116 "detail-type": detail_type,
117 "detail": detail_value,
118 "time": now.format("%Y-%m-%dT%H:%M:%SZ").to_string(),
119 "region": region,
120 "resources": [],
121 });
122 let event_str = event_json.to_string();
123
124 let _ = event_str;
125 let resolved_account = if let Some(acct) = target_account_id {
126 acct.to_string()
127 } else {
128 account_id.clone()
129 };
130 let ctx = EventDispatchContext {
131 state: &self.state,
132 delivery: &self.delivery,
133 lambda_state: self.lambda_state.as_ref(),
134 logs_state: self.logs_state.as_ref(),
135 container_runtime: &self.container_runtime,
136 account_id: &resolved_account,
137 region: ®ion,
138 };
139 for target in matching_targets {
140 dispatch_event_target(&ctx, &target, &event_json, &event_id, detail_type);
141 }
142 }
143}
144
145impl EventBridgeDelivery for EventBridgeDeliveryImpl {
146 fn put_event(&self, source: &str, detail_type: &str, detail: &str, event_bus_name: &str) {
147 self.put_event_in_account(source, detail_type, detail, event_bus_name, None);
148 }
149
150 fn put_event_to_account(
151 &self,
152 source: &str,
153 detail_type: &str,
154 detail: &str,
155 event_bus_name: &str,
156 target_account_id: &str,
157 ) {
158 self.put_event_in_account(
159 source,
160 detail_type,
161 detail,
162 event_bus_name,
163 Some(target_account_id),
164 );
165 }
166}
167
168#[cfg(test)]
169mod tests {
170 use super::*;
171 use crate::state::{EventRule, EventTarget as EbTarget, SharedEventBridgeState};
172 use fakecloud_aws::arn::Arn;
173 use fakecloud_core::delivery::{SnsDelivery, SqsDelivery};
174 use parking_lot::RwLock;
175 use std::collections::{BTreeMap, HashMap};
176 use std::sync::Mutex;
177
178 #[derive(Default)]
179 struct Recorder {
180 sqs: Mutex<Vec<(String, String)>>,
181 sns: Mutex<Vec<(String, String, Option<String>)>>,
182 }
183
184 impl SqsDelivery for Recorder {
185 fn deliver_to_queue(&self, arn: &str, body: &str, _: &HashMap<String, String>) {
186 self.sqs
187 .lock()
188 .unwrap()
189 .push((arn.to_string(), body.to_string()));
190 }
191 fn deliver_to_queue_with_attrs(
192 &self,
193 arn: &str,
194 body: &str,
195 _: &HashMap<String, fakecloud_core::delivery::SqsMessageAttribute>,
196 _: Option<&str>,
197 _: Option<&str>,
198 ) {
199 self.sqs
200 .lock()
201 .unwrap()
202 .push((arn.to_string(), body.to_string()));
203 }
204 }
205
206 impl SnsDelivery for Recorder {
207 fn publish_to_topic(&self, arn: &str, msg: &str, subject: Option<&str>) {
208 self.sns.lock().unwrap().push((
209 arn.to_string(),
210 msg.to_string(),
211 subject.map(|s| s.to_string()),
212 ));
213 }
214 }
215
216 fn make_shared() -> SharedEventBridgeState {
217 Arc::new(RwLock::new(
218 fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
219 ))
220 }
221
222 fn make_rule(name: &str, pattern: Option<&str>, target_arn: &str) -> EventRule {
223 EventRule {
224 name: name.to_string(),
225 arn: Arn::new(
226 "events",
227 "us-east-1",
228 "123456789012",
229 &format!("rule/{name}"),
230 )
231 .to_string(),
232 event_bus_name: "default".to_string(),
233 event_pattern: pattern.map(|s| s.to_string()),
234 schedule_expression: None,
235 state: "ENABLED".to_string(),
236 description: None,
237 role_arn: None,
238 managed_by: None,
239 created_by: None,
240 targets: vec![EbTarget {
241 id: "t1".to_string(),
242 arn: target_arn.to_string(),
243 input: None,
244 input_path: None,
245 input_transformer: None,
246 sqs_parameters: None,
247 ..Default::default()
248 }],
249 tags: BTreeMap::new(),
250 last_fired: None,
251 }
252 }
253
254 #[test]
255 fn put_event_appends_to_events_log() {
256 let state = make_shared();
257 let bus = Arc::new(DeliveryBus::new());
258 let delivery = EventBridgeDeliveryImpl::new(state.clone(), bus);
259 delivery.put_event("my.source", "MyType", r#"{"k":"v"}"#, "default");
260 let guard = state.read();
261 let default = guard.default_ref();
262 assert_eq!(default.events.len(), 1);
263 assert_eq!(default.events[0].source, "my.source");
264 assert_eq!(default.events[0].detail_type, "MyType");
265 }
266
267 #[test]
268 fn put_event_dispatches_matching_sqs_target() {
269 let state = make_shared();
270 let q_arn = "arn:aws:sqs:us-east-1:123456789012:q".to_string();
271 {
272 let mut s_accounts = state.write();
273 let s = s_accounts.default_mut();
274 let rule = make_rule("r", None, &q_arn);
275 s.rules
276 .insert(("default".to_string(), "r".to_string()), rule);
277 }
278 let recorder = Arc::new(Recorder::default());
279 let bus = Arc::new(DeliveryBus::new().with_sqs(recorder.clone()));
280 let delivery = EventBridgeDeliveryImpl::new(state, bus);
281 delivery.put_event("app", "Changed", r#"{"x":1}"#, "default");
282 let calls = recorder.sqs.lock().unwrap();
283 assert_eq!(calls.len(), 1);
284 assert_eq!(calls[0].0, q_arn);
285 let env: serde_json::Value = serde_json::from_str(&calls[0].1).unwrap();
286 assert_eq!(env["detail-type"], "Changed");
287 assert_eq!(env["source"], "app");
288 }
289
290 #[test]
291 fn put_event_dispatches_to_sns_target() {
292 let state = make_shared();
293 let topic_arn = "arn:aws:sns:us-east-1:123456789012:t".to_string();
294 {
295 let mut s_accounts = state.write();
296 let s = s_accounts.default_mut();
297 let rule = make_rule("r", None, &topic_arn);
298 s.rules
299 .insert(("default".to_string(), "r".to_string()), rule);
300 }
301 let recorder = Arc::new(Recorder::default());
302 let bus = Arc::new(DeliveryBus::new().with_sns(recorder.clone()));
303 let delivery = EventBridgeDeliveryImpl::new(state, bus);
304 delivery.put_event("app", "Changed", r#"{}"#, "default");
305 let calls = recorder.sns.lock().unwrap();
306 assert_eq!(calls.len(), 1);
307 assert_eq!(calls[0].0, topic_arn);
308 assert_eq!(calls[0].2.as_deref(), Some("Changed"));
309 }
310
311 #[test]
312 fn put_event_skips_disabled_rule() {
313 let state = make_shared();
314 let q_arn = "arn:aws:sqs:us-east-1:123456789012:q".to_string();
315 {
316 let mut s_accounts = state.write();
317 let s = s_accounts.default_mut();
318 let mut rule = make_rule("r", None, &q_arn);
319 rule.state = "DISABLED".to_string();
320 s.rules
321 .insert(("default".to_string(), "r".to_string()), rule);
322 }
323 let recorder = Arc::new(Recorder::default());
324 let bus = Arc::new(DeliveryBus::new().with_sqs(recorder.clone()));
325 let delivery = EventBridgeDeliveryImpl::new(state, bus);
326 delivery.put_event("app", "Changed", r#"{}"#, "default");
327 assert!(recorder.sqs.lock().unwrap().is_empty());
328 }
329
330 #[test]
331 fn put_event_skips_other_bus_rule() {
332 let state = make_shared();
333 let q_arn = "arn:aws:sqs:us-east-1:123456789012:q".to_string();
334 {
335 let mut s_accounts = state.write();
336 let s = s_accounts.default_mut();
337 let mut rule = make_rule("r", None, &q_arn);
338 rule.event_bus_name = "custom-bus".to_string();
339 s.rules
340 .insert(("custom-bus".to_string(), "r".to_string()), rule);
341 }
342 let recorder = Arc::new(Recorder::default());
343 let bus = Arc::new(DeliveryBus::new().with_sqs(recorder.clone()));
344 let delivery = EventBridgeDeliveryImpl::new(state, bus);
345 delivery.put_event("app", "Changed", r#"{}"#, "default");
346 assert!(recorder.sqs.lock().unwrap().is_empty());
347 }
348
349 #[test]
350 fn put_event_handles_invalid_detail_json_gracefully() {
351 let state = make_shared();
352 let q_arn = "arn:aws:sqs:us-east-1:123456789012:q".to_string();
353 {
354 let mut s_accounts = state.write();
355 let s = s_accounts.default_mut();
356 let rule = make_rule("r", None, &q_arn);
357 s.rules
358 .insert(("default".to_string(), "r".to_string()), rule);
359 }
360 let recorder = Arc::new(Recorder::default());
361 let bus = Arc::new(DeliveryBus::new().with_sqs(recorder.clone()));
362 let delivery = EventBridgeDeliveryImpl::new(state, bus);
363 delivery.put_event("app", "Type", "not-json", "default");
364 let calls = recorder.sqs.lock().unwrap();
365 assert_eq!(calls.len(), 1);
366 let env: serde_json::Value = serde_json::from_str(&calls[0].1).unwrap();
367 assert_eq!(env["detail"], serde_json::json!({}));
368 }
369
370 #[test]
371 fn put_event_to_account_writes_to_target_account_bus() {
372 let state = make_shared();
373 let bus = Arc::new(DeliveryBus::new());
374 let delivery = EventBridgeDeliveryImpl::new(state.clone(), bus);
375 delivery.put_event_to_account("scheduler", "Fired", r#"{}"#, "default", "999988887777");
376
377 let guard = state.read();
378 let target = guard
379 .get("999988887777")
380 .expect("target account should be created on demand");
381 assert_eq!(target.events.len(), 1);
382 assert_eq!(target.events[0].source, "scheduler");
383 assert!(guard.default_ref().events.is_empty());
385 }
386
387 #[test]
388 fn put_event_to_account_dispatches_to_rules_in_target_account() {
389 let state = make_shared();
390 let q_arn = "arn:aws:sqs:us-east-1:999988887777:cross-q".to_string();
391 {
392 let mut s_accounts = state.write();
393 let s = s_accounts.get_or_create("999988887777");
394 let rule = make_rule("xacct-rule", None, &q_arn);
395 s.rules
396 .insert(("default".to_string(), "xacct-rule".to_string()), rule);
397 }
398 let recorder = Arc::new(Recorder::default());
399 let bus = Arc::new(DeliveryBus::new().with_sqs(recorder.clone()));
400 let delivery = EventBridgeDeliveryImpl::new(state, bus);
401 delivery.put_event_to_account(
402 "scheduler",
403 "Cross",
404 r#"{"hi":1}"#,
405 "default",
406 "999988887777",
407 );
408 let calls = recorder.sqs.lock().unwrap();
409 assert_eq!(calls.len(), 1);
410 assert_eq!(calls[0].0, q_arn);
411 }
412}