1use std::collections::HashMap;
2use std::sync::Arc;
3
4use chrono::Utc;
5use serde_json::json;
6
7use fakecloud_core::delivery::DeliveryBus;
8use fakecloud_lambda::state::{LambdaInvocation, SharedLambdaState};
9use fakecloud_logs::state::SharedLogsState;
10
11use crate::state::{EventTarget, SharedEventBridgeState};
12
13#[derive(Debug)]
15pub struct FiredTarget {
16 pub target_type: String,
18 pub arn: String,
20}
21
22pub fn fire_rule(
28 state: &SharedEventBridgeState,
29 delivery: &Arc<DeliveryBus>,
30 lambda_state: &Option<SharedLambdaState>,
31 logs_state: &Option<SharedLogsState>,
32 container_runtime: &Option<Arc<fakecloud_lambda::runtime::ContainerRuntime>>,
33 bus_name: &str,
34 rule_name: &str,
35) -> Result<Vec<FiredTarget>, String> {
36 let (targets, account_id, region) = {
37 let state = state.read();
38
39 if !state.buses.contains_key(bus_name) {
41 return Err(format!("Event bus '{bus_name}' not found"));
42 }
43
44 let key = (bus_name.to_string(), rule_name.to_string());
45 let rule = match state.rules.get(&key) {
46 Some(r) => r,
47 None => return Err(format!("Rule '{rule_name}' not found on bus '{bus_name}'")),
48 };
49
50 (
51 rule.targets.clone(),
52 state.account_id.clone(),
53 state.region.clone(),
54 )
55 };
56
57 if targets.is_empty() {
58 return Ok(Vec::new());
59 }
60
61 let now = Utc::now();
62 let event_id = uuid::Uuid::new_v4().to_string();
63
64 let event_json = json!({
66 "version": "0",
67 "id": event_id,
68 "source": "aws.events",
69 "account": account_id,
70 "detail-type": "Scheduled Event",
71 "detail": {},
72 "time": now.format("%Y-%m-%dT%H:%M:%SZ").to_string(),
73 "region": region,
74 "resources": [],
75 });
76 let event_str = event_json.to_string();
77
78 {
80 let mut s = state.write();
81 s.events.push(crate::state::PutEvent {
82 event_id: event_id.clone(),
83 source: "aws.events".to_string(),
84 detail_type: "Scheduled Event".to_string(),
85 detail: "{}".to_string(),
86 event_bus_name: bus_name.to_string(),
87 time: now,
88 resources: Vec::new(),
89 });
90 }
91
92 let mut fired = Vec::new();
93
94 for target in &targets {
95 let arn = &target.arn;
96 let body_str = resolve_target_body(target, &event_json, &event_str);
97
98 if arn.contains(":sqs:") {
99 let message_group_id = target
101 .sqs_parameters
102 .as_ref()
103 .and_then(|sp| sp["MessageGroupId"].as_str())
104 .map(|s| s.to_string());
105
106 if message_group_id.is_some() {
107 delivery.send_to_sqs_with_attrs(
108 arn,
109 &body_str,
110 &HashMap::new(),
111 message_group_id.as_deref(),
112 None,
113 );
114 } else {
115 delivery.send_to_sqs(arn, &body_str, &HashMap::new());
116 }
117 fired.push(FiredTarget {
118 target_type: "sqs".to_string(),
119 arn: arn.clone(),
120 });
121 } else if arn.contains(":sns:") {
122 delivery.publish_to_sns(arn, &body_str, Some("Scheduled Event"));
123 fired.push(FiredTarget {
124 target_type: "sns".to_string(),
125 arn: arn.clone(),
126 });
127 } else if arn.contains(":lambda:") {
128 let mut s = state.write();
129 s.lambda_invocations.push(crate::state::LambdaInvocation {
130 function_arn: arn.clone(),
131 payload: body_str.clone(),
132 timestamp: now,
133 });
134 drop(s);
135 if let Some(ref ls) = lambda_state {
136 ls.write().invocations.push(LambdaInvocation {
137 function_arn: arn.clone(),
138 payload: body_str.clone(),
139 timestamp: now,
140 source: "aws:events".to_string(),
141 });
142 }
143 crate::service::invoke_lambda_async(container_runtime, lambda_state, arn, &body_str);
144 fired.push(FiredTarget {
145 target_type: "lambda".to_string(),
146 arn: arn.clone(),
147 });
148 } else if arn.contains(":logs:") {
149 let mut s = state.write();
150 s.log_deliveries.push(crate::state::LogDelivery {
151 log_group_arn: arn.clone(),
152 payload: body_str.clone(),
153 timestamp: now,
154 });
155 drop(s);
156 if let Some(ref log_state) = logs_state {
157 crate::service::deliver_to_logs(log_state, arn, &body_str, now);
158 }
159 fired.push(FiredTarget {
160 target_type: "logs".to_string(),
161 arn: arn.clone(),
162 });
163 }
164 }
165
166 Ok(fired)
167}
168
169fn resolve_target_body(
179 target: &EventTarget,
180 event_json: &serde_json::Value,
181 event_str: &str,
182) -> String {
183 if let Some(ref input) = target.input {
184 return input.clone();
185 }
186
187 if let Some(ref input_path) = target.input_path {
188 if let Some(key) = input_path.strip_prefix("$.") {
190 if !key.contains('.') && !key.contains('[') {
191 if let Some(val) = event_json.get(key) {
192 return val.to_string();
193 }
194 }
195 }
196 }
197
198 event_str.to_string()
201}
202
203#[cfg(test)]
204mod tests {
205 use super::*;
206 use crate::state::{EventBridgeState, EventRule};
207 use parking_lot::RwLock;
208
209 fn make_state() -> SharedEventBridgeState {
210 Arc::new(RwLock::new(EventBridgeState::new(
211 "123456789012",
212 "us-east-1",
213 )))
214 }
215
216 fn add_rule(
217 state: &SharedEventBridgeState,
218 bus: &str,
219 name: &str,
220 enabled: bool,
221 targets: Vec<EventTarget>,
222 ) {
223 let mut s = state.write();
224 let key = (bus.to_string(), name.to_string());
225 s.rules.insert(
226 key,
227 EventRule {
228 name: name.to_string(),
229 arn: format!("arn:aws:events:us-east-1:123456789012:rule/{bus}/{name}"),
230 event_bus_name: bus.to_string(),
231 event_pattern: None,
232 schedule_expression: Some("rate(1 minute)".to_string()),
233 state: if enabled { "ENABLED" } else { "DISABLED" }.to_string(),
234 description: None,
235 role_arn: None,
236 managed_by: None,
237 created_by: None,
238 targets,
239 tags: HashMap::new(),
240 last_fired: None,
241 },
242 );
243 }
244
245 #[test]
246 fn fire_rule_with_valid_rule() {
247 let state = make_state();
248 let delivery = Arc::new(DeliveryBus::new());
249
250 add_rule(
251 &state,
252 "default",
253 "my-rule",
254 true,
255 vec![EventTarget {
256 id: "t1".to_string(),
257 arn: "arn:aws:sqs:us-east-1:123456789012:target-queue".to_string(),
258 input: None,
259 input_path: None,
260 input_transformer: None,
261 sqs_parameters: None,
262 }],
263 );
264
265 let result = fire_rule(&state, &delivery, &None, &None, &None, "default", "my-rule");
266 let targets = result.unwrap();
267 assert_eq!(targets.len(), 1);
268 assert_eq!(targets[0].target_type, "sqs");
269 assert_eq!(
270 targets[0].arn,
271 "arn:aws:sqs:us-east-1:123456789012:target-queue"
272 );
273
274 let s = state.read();
276 assert!(s.events.iter().any(|e| e.source == "aws.events"));
277 }
278
279 #[test]
280 fn fire_rule_nonexistent_rule() {
281 let state = make_state();
282 let delivery = Arc::new(DeliveryBus::new());
283
284 let result = fire_rule(
285 &state,
286 &delivery,
287 &None,
288 &None,
289 &None,
290 "default",
291 "no-such-rule",
292 );
293 assert!(result.is_err());
294 assert!(result.unwrap_err().contains("not found"));
295 }
296
297 #[test]
298 fn fire_rule_disabled_still_fires() {
299 let state = make_state();
300 let delivery = Arc::new(DeliveryBus::new());
301
302 add_rule(
303 &state,
304 "default",
305 "disabled-rule",
306 false, vec![EventTarget {
308 id: "t1".to_string(),
309 arn: "arn:aws:sqs:us-east-1:123456789012:target-queue".to_string(),
310 input: None,
311 input_path: None,
312 input_transformer: None,
313 sqs_parameters: None,
314 }],
315 );
316
317 let result = fire_rule(
318 &state,
319 &delivery,
320 &None,
321 &None,
322 &None,
323 "default",
324 "disabled-rule",
325 );
326 let targets = result.unwrap();
328 assert_eq!(targets.len(), 1);
329 }
330}