1use std::collections::HashMap;
2use std::sync::Arc;
3
4use chrono::Utc;
5
6use fakecloud_core::delivery::{DeliveryBus, EventBridgeDelivery};
7
8use crate::service::matches_pattern;
9use crate::state::{PutEvent, SharedEventBridgeState};
10
11pub struct EventBridgeDeliveryImpl {
14 state: SharedEventBridgeState,
15 delivery: Arc<DeliveryBus>,
16}
17
18impl EventBridgeDeliveryImpl {
19 pub fn new(state: SharedEventBridgeState, delivery: Arc<DeliveryBus>) -> Self {
20 Self { state, delivery }
21 }
22}
23
24impl EventBridgeDelivery for EventBridgeDeliveryImpl {
25 fn put_event(&self, source: &str, detail_type: &str, detail: &str, event_bus_name: &str) {
26 let event_id = uuid::Uuid::new_v4().to_string();
27 let now = Utc::now();
28
29 let event = PutEvent {
30 event_id: event_id.clone(),
31 source: source.to_string(),
32 detail_type: detail_type.to_string(),
33 detail: detail.to_string(),
34 event_bus_name: event_bus_name.to_string(),
35 time: now,
36 resources: Vec::new(),
37 };
38
39 let mut accounts = self.state.write();
40 let state = accounts.default_mut();
41 state.events.push(event);
42
43 let account_id = state.account_id.clone();
45 let region = state.region.clone();
46 let matching_targets: Vec<_> = state
47 .rules
48 .values()
49 .filter(|r| {
50 r.event_bus_name == event_bus_name
51 && r.state == "ENABLED"
52 && matches_pattern(
53 r.event_pattern.as_deref(),
54 source,
55 detail_type,
56 detail,
57 &account_id,
58 ®ion,
59 &[],
60 )
61 })
62 .flat_map(|r| r.targets.clone())
63 .collect();
64
65 drop(accounts);
67
68 if matching_targets.is_empty() {
69 return;
70 }
71
72 let detail_value: serde_json::Value =
74 serde_json::from_str(detail).unwrap_or(serde_json::json!({}));
75 let event_json = serde_json::json!({
76 "version": "0",
77 "id": event_id,
78 "source": source,
79 "account": account_id,
80 "detail-type": detail_type,
81 "detail": detail_value,
82 "time": now.format("%Y-%m-%dT%H:%M:%SZ").to_string(),
83 "region": region,
84 "resources": [],
85 });
86 let event_str = event_json.to_string();
87
88 for target in matching_targets {
89 let arn = &target.arn;
90 if arn.contains(":sqs:") {
91 self.delivery.send_to_sqs(arn, &event_str, &HashMap::new());
92 } else if arn.contains(":sns:") {
93 self.delivery
94 .publish_to_sns(arn, &event_str, Some(detail_type));
95 }
96 }
98 }
99}
100
101#[cfg(test)]
102mod tests {
103 use super::*;
104 use crate::state::{EventRule, EventTarget as EbTarget, SharedEventBridgeState};
105 use fakecloud_core::delivery::{SnsDelivery, SqsDelivery};
106 use parking_lot::RwLock;
107 use std::sync::Mutex;
108
109 #[derive(Default)]
110 struct Recorder {
111 sqs: Mutex<Vec<(String, String)>>,
112 sns: Mutex<Vec<(String, String, Option<String>)>>,
113 }
114
115 impl SqsDelivery for Recorder {
116 fn deliver_to_queue(&self, arn: &str, body: &str, _: &HashMap<String, String>) {
117 self.sqs
118 .lock()
119 .unwrap()
120 .push((arn.to_string(), body.to_string()));
121 }
122 fn deliver_to_queue_with_attrs(
123 &self,
124 arn: &str,
125 body: &str,
126 _: &HashMap<String, fakecloud_core::delivery::SqsMessageAttribute>,
127 _: Option<&str>,
128 _: Option<&str>,
129 ) {
130 self.sqs
131 .lock()
132 .unwrap()
133 .push((arn.to_string(), body.to_string()));
134 }
135 }
136
137 impl SnsDelivery for Recorder {
138 fn publish_to_topic(&self, arn: &str, msg: &str, subject: Option<&str>) {
139 self.sns.lock().unwrap().push((
140 arn.to_string(),
141 msg.to_string(),
142 subject.map(|s| s.to_string()),
143 ));
144 }
145 }
146
147 fn make_shared() -> SharedEventBridgeState {
148 Arc::new(RwLock::new(
149 fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
150 ))
151 }
152
153 fn make_rule(name: &str, pattern: Option<&str>, target_arn: &str) -> EventRule {
154 EventRule {
155 name: name.to_string(),
156 arn: format!("arn:aws:events:us-east-1:123456789012:rule/{name}"),
157 event_bus_name: "default".to_string(),
158 event_pattern: pattern.map(|s| s.to_string()),
159 schedule_expression: None,
160 state: "ENABLED".to_string(),
161 description: None,
162 role_arn: None,
163 managed_by: None,
164 created_by: None,
165 targets: vec![EbTarget {
166 id: "t1".to_string(),
167 arn: target_arn.to_string(),
168 input: None,
169 input_path: None,
170 input_transformer: None,
171 sqs_parameters: None,
172 }],
173 tags: HashMap::new(),
174 last_fired: None,
175 }
176 }
177
178 #[test]
179 fn put_event_appends_to_events_log() {
180 let state = make_shared();
181 let bus = Arc::new(DeliveryBus::new());
182 let delivery = EventBridgeDeliveryImpl::new(state.clone(), bus);
183 delivery.put_event("my.source", "MyType", r#"{"k":"v"}"#, "default");
184 let guard = state.read();
185 let default = guard.default_ref();
186 assert_eq!(default.events.len(), 1);
187 assert_eq!(default.events[0].source, "my.source");
188 assert_eq!(default.events[0].detail_type, "MyType");
189 }
190
191 #[test]
192 fn put_event_dispatches_matching_sqs_target() {
193 let state = make_shared();
194 let q_arn = "arn:aws:sqs:us-east-1:123456789012:q".to_string();
195 {
196 let mut s_accounts = state.write();
197 let s = s_accounts.default_mut();
198 let rule = make_rule("r", None, &q_arn);
199 s.rules
200 .insert(("default".to_string(), "r".to_string()), rule);
201 }
202 let recorder = Arc::new(Recorder::default());
203 let bus = Arc::new(DeliveryBus::new().with_sqs(recorder.clone()));
204 let delivery = EventBridgeDeliveryImpl::new(state, bus);
205 delivery.put_event("app", "Changed", r#"{"x":1}"#, "default");
206 let calls = recorder.sqs.lock().unwrap();
207 assert_eq!(calls.len(), 1);
208 assert_eq!(calls[0].0, q_arn);
209 let env: serde_json::Value = serde_json::from_str(&calls[0].1).unwrap();
210 assert_eq!(env["detail-type"], "Changed");
211 assert_eq!(env["source"], "app");
212 }
213
214 #[test]
215 fn put_event_dispatches_to_sns_target() {
216 let state = make_shared();
217 let topic_arn = "arn:aws:sns:us-east-1:123456789012:t".to_string();
218 {
219 let mut s_accounts = state.write();
220 let s = s_accounts.default_mut();
221 let rule = make_rule("r", None, &topic_arn);
222 s.rules
223 .insert(("default".to_string(), "r".to_string()), rule);
224 }
225 let recorder = Arc::new(Recorder::default());
226 let bus = Arc::new(DeliveryBus::new().with_sns(recorder.clone()));
227 let delivery = EventBridgeDeliveryImpl::new(state, bus);
228 delivery.put_event("app", "Changed", r#"{}"#, "default");
229 let calls = recorder.sns.lock().unwrap();
230 assert_eq!(calls.len(), 1);
231 assert_eq!(calls[0].0, topic_arn);
232 assert_eq!(calls[0].2.as_deref(), Some("Changed"));
233 }
234
235 #[test]
236 fn put_event_skips_disabled_rule() {
237 let state = make_shared();
238 let q_arn = "arn:aws:sqs:us-east-1:123456789012:q".to_string();
239 {
240 let mut s_accounts = state.write();
241 let s = s_accounts.default_mut();
242 let mut rule = make_rule("r", None, &q_arn);
243 rule.state = "DISABLED".to_string();
244 s.rules
245 .insert(("default".to_string(), "r".to_string()), rule);
246 }
247 let recorder = Arc::new(Recorder::default());
248 let bus = Arc::new(DeliveryBus::new().with_sqs(recorder.clone()));
249 let delivery = EventBridgeDeliveryImpl::new(state, bus);
250 delivery.put_event("app", "Changed", r#"{}"#, "default");
251 assert!(recorder.sqs.lock().unwrap().is_empty());
252 }
253
254 #[test]
255 fn put_event_skips_other_bus_rule() {
256 let state = make_shared();
257 let q_arn = "arn:aws:sqs:us-east-1:123456789012:q".to_string();
258 {
259 let mut s_accounts = state.write();
260 let s = s_accounts.default_mut();
261 let mut rule = make_rule("r", None, &q_arn);
262 rule.event_bus_name = "custom-bus".to_string();
263 s.rules
264 .insert(("custom-bus".to_string(), "r".to_string()), rule);
265 }
266 let recorder = Arc::new(Recorder::default());
267 let bus = Arc::new(DeliveryBus::new().with_sqs(recorder.clone()));
268 let delivery = EventBridgeDeliveryImpl::new(state, bus);
269 delivery.put_event("app", "Changed", r#"{}"#, "default");
270 assert!(recorder.sqs.lock().unwrap().is_empty());
271 }
272
273 #[test]
274 fn put_event_handles_invalid_detail_json_gracefully() {
275 let state = make_shared();
276 let q_arn = "arn:aws:sqs:us-east-1:123456789012:q".to_string();
277 {
278 let mut s_accounts = state.write();
279 let s = s_accounts.default_mut();
280 let rule = make_rule("r", None, &q_arn);
281 s.rules
282 .insert(("default".to_string(), "r".to_string()), rule);
283 }
284 let recorder = Arc::new(Recorder::default());
285 let bus = Arc::new(DeliveryBus::new().with_sqs(recorder.clone()));
286 let delivery = EventBridgeDeliveryImpl::new(state, bus);
287 delivery.put_event("app", "Type", "not-json", "default");
288 let calls = recorder.sqs.lock().unwrap();
289 assert_eq!(calls.len(), 1);
290 let env: serde_json::Value = serde_json::from_str(&calls[0].1).unwrap();
291 assert_eq!(env["detail"], serde_json::json!({}));
292 }
293}