1use std::collections::HashMap;
2use std::sync::Arc;
3
4use chrono::Utc;
5use serde_json::json;
6
7use fakecloud_core::delivery::DeliveryBus;
8use fakecloud_lambda::{LambdaInvocation, SharedLambdaState};
9use fakecloud_logs::SharedLogsState;
10
11use crate::state::{EventTarget, SharedEventBridgeState};
12
13#[derive(Debug)]
15pub struct FiredTarget {
16 pub target_type: String,
18 pub arn: String,
20}
21
22pub struct FireRuleContext<'a> {
26 pub state: &'a SharedEventBridgeState,
27 pub delivery: &'a Arc<DeliveryBus>,
28 pub lambda_state: &'a Option<SharedLambdaState>,
29 pub logs_state: &'a Option<SharedLogsState>,
30 pub container_runtime: &'a Option<Arc<fakecloud_lambda::runtime::ContainerRuntime>>,
31}
32
33pub fn fire_rule(
39 ctx: &FireRuleContext<'_>,
40 bus_name: &str,
41 rule_name: &str,
42) -> Result<Vec<FiredTarget>, String> {
43 let state = ctx.state;
44 let delivery = ctx.delivery;
45 let lambda_state = ctx.lambda_state;
46 let logs_state = ctx.logs_state;
47 let container_runtime = ctx.container_runtime;
48
49 let (targets, account_id, region) = {
50 let eb_accounts = state.read();
51 let eb_state = eb_accounts.default_ref();
52
53 if !eb_state.buses.contains_key(bus_name) {
55 return Err(format!("Event bus '{bus_name}' not found"));
56 }
57
58 let key = (bus_name.to_string(), rule_name.to_string());
59 let rule = match eb_state.rules.get(&key) {
60 Some(r) => r,
61 None => return Err(format!("Rule '{rule_name}' not found on bus '{bus_name}'")),
62 };
63
64 (
65 rule.targets.clone(),
66 eb_state.account_id.clone(),
67 eb_state.region.clone(),
68 )
69 };
70
71 if targets.is_empty() {
72 return Ok(Vec::new());
73 }
74
75 let now = Utc::now();
76 let event_id = uuid::Uuid::new_v4().to_string();
77
78 let event_json = json!({
80 "version": "0",
81 "id": event_id,
82 "source": "aws.events",
83 "account": account_id,
84 "detail-type": "Scheduled Event",
85 "detail": {},
86 "time": now.format("%Y-%m-%dT%H:%M:%SZ").to_string(),
87 "region": region,
88 "resources": [],
89 });
90 let event_str = event_json.to_string();
91
92 {
94 let mut s_accounts = state.write();
95 let s = s_accounts.default_mut();
96 s.events.push(crate::state::PutEvent {
97 event_id: event_id.clone(),
98 source: "aws.events".to_string(),
99 detail_type: "Scheduled Event".to_string(),
100 detail: "{}".to_string(),
101 event_bus_name: bus_name.to_string(),
102 time: now,
103 resources: Vec::new(),
104 });
105 }
106
107 let mut fired = Vec::new();
108
109 for target in &targets {
110 let arn = &target.arn;
111 let body_str = resolve_target_body(target, &event_json, &event_str);
112
113 if arn.contains(":sqs:") {
114 let message_group_id = target
116 .sqs_parameters
117 .as_ref()
118 .and_then(|sp| sp["MessageGroupId"].as_str())
119 .map(|s| s.to_string());
120
121 if message_group_id.is_some() {
122 delivery.send_to_sqs_with_attrs(
123 arn,
124 &body_str,
125 &HashMap::new(),
126 message_group_id.as_deref(),
127 None,
128 );
129 } else {
130 delivery.send_to_sqs(arn, &body_str, &HashMap::new());
131 }
132 fired.push(FiredTarget {
133 target_type: "sqs".to_string(),
134 arn: arn.clone(),
135 });
136 } else if arn.contains(":sns:") {
137 delivery.publish_to_sns(arn, &body_str, Some("Scheduled Event"));
138 fired.push(FiredTarget {
139 target_type: "sns".to_string(),
140 arn: arn.clone(),
141 });
142 } else if arn.contains(":lambda:") {
143 let mut s_accounts = state.write();
144 let s = s_accounts.default_mut();
145 s.lambda_invocations.push(crate::state::LambdaInvocation {
146 function_arn: arn.clone(),
147 payload: body_str.clone(),
148 timestamp: now,
149 });
150 drop(s_accounts);
151 if let Some(ref ls) = lambda_state {
152 ls.write().default_mut().invocations.push(LambdaInvocation {
153 function_arn: arn.clone(),
154 payload: body_str.clone(),
155 timestamp: now,
156 source: "aws:events".to_string(),
157 });
158 }
159 crate::service::invoke_lambda_async(container_runtime, lambda_state, arn, &body_str);
160 fired.push(FiredTarget {
161 target_type: "lambda".to_string(),
162 arn: arn.clone(),
163 });
164 } else if arn.contains(":logs:") {
165 let mut s_accounts = state.write();
166 let s = s_accounts.default_mut();
167 s.log_deliveries.push(crate::state::LogDelivery {
168 log_group_arn: arn.clone(),
169 payload: body_str.clone(),
170 timestamp: now,
171 });
172 drop(s_accounts);
173 if let Some(ref log_state) = logs_state {
174 crate::service::deliver_to_logs(log_state, arn, &body_str, now);
175 }
176 fired.push(FiredTarget {
177 target_type: "logs".to_string(),
178 arn: arn.clone(),
179 });
180 }
181 }
182
183 Ok(fired)
184}
185
186fn resolve_target_body(
196 target: &EventTarget,
197 event_json: &serde_json::Value,
198 event_str: &str,
199) -> String {
200 if let Some(ref input) = target.input {
201 return input.clone();
202 }
203
204 if let Some(ref input_path) = target.input_path {
205 if let Some(key) = input_path.strip_prefix("$.") {
207 if !key.contains('.') && !key.contains('[') {
208 if let Some(val) = event_json.get(key) {
209 return val.to_string();
210 }
211 }
212 }
213 }
214
215 event_str.to_string()
218}
219
220#[cfg(test)]
221mod tests {
222 use super::*;
223 use crate::state::EventRule;
224 use fakecloud_aws::arn::Arn;
225 use parking_lot::RwLock;
226 use std::collections::BTreeMap;
227
228 fn make_state() -> SharedEventBridgeState {
229 Arc::new(RwLock::new(
230 fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
231 ))
232 }
233
234 fn add_rule(
235 state: &SharedEventBridgeState,
236 bus: &str,
237 name: &str,
238 enabled: bool,
239 targets: Vec<EventTarget>,
240 ) {
241 let mut s_accounts = state.write();
242 let s = s_accounts.default_mut();
243 let key = (bus.to_string(), name.to_string());
244 s.rules.insert(
245 key,
246 EventRule {
247 name: name.to_string(),
248 arn: Arn::new(
249 "events",
250 "us-east-1",
251 "123456789012",
252 &format!("rule/{bus}/{name}"),
253 )
254 .to_string(),
255 event_bus_name: bus.to_string(),
256 event_pattern: None,
257 schedule_expression: Some("rate(1 minute)".to_string()),
258 state: if enabled { "ENABLED" } else { "DISABLED" }.to_string(),
259 description: None,
260 role_arn: None,
261 managed_by: None,
262 created_by: None,
263 targets,
264 tags: BTreeMap::new(),
265 last_fired: None,
266 },
267 );
268 }
269
270 #[test]
271 fn fire_rule_with_valid_rule() {
272 let state = make_state();
273 let delivery = Arc::new(DeliveryBus::new());
274
275 add_rule(
276 &state,
277 "default",
278 "my-rule",
279 true,
280 vec![EventTarget {
281 id: "t1".to_string(),
282 arn: "arn:aws:sqs:us-east-1:123456789012:target-queue".to_string(),
283 input: None,
284 input_path: None,
285 input_transformer: None,
286 sqs_parameters: None,
287 ..Default::default()
288 }],
289 );
290
291 let ctx = FireRuleContext {
292 state: &state,
293 delivery: &delivery,
294 lambda_state: &None,
295 logs_state: &None,
296 container_runtime: &None,
297 };
298 let result = fire_rule(&ctx, "default", "my-rule");
299 let targets = result.unwrap();
300 assert_eq!(targets.len(), 1);
301 assert_eq!(targets[0].target_type, "sqs");
302 assert_eq!(
303 targets[0].arn,
304 "arn:aws:sqs:us-east-1:123456789012:target-queue"
305 );
306
307 let s_accounts = state.read();
309 let s = s_accounts.default_ref();
310 assert!(s.events.iter().any(|e| e.source == "aws.events"));
311 }
312
313 #[test]
314 fn fire_rule_nonexistent_rule() {
315 let state = make_state();
316 let delivery = Arc::new(DeliveryBus::new());
317
318 let ctx = FireRuleContext {
319 state: &state,
320 delivery: &delivery,
321 lambda_state: &None,
322 logs_state: &None,
323 container_runtime: &None,
324 };
325 let result = fire_rule(&ctx, "default", "no-such-rule");
326 assert!(result.is_err());
327 assert!(result.unwrap_err().contains("not found"));
328 }
329
330 #[test]
331 fn fire_rule_disabled_still_fires() {
332 let state = make_state();
333 let delivery = Arc::new(DeliveryBus::new());
334
335 add_rule(
336 &state,
337 "default",
338 "disabled-rule",
339 false, vec![EventTarget {
341 id: "t1".to_string(),
342 arn: "arn:aws:sqs:us-east-1:123456789012:target-queue".to_string(),
343 input: None,
344 input_path: None,
345 input_transformer: None,
346 sqs_parameters: None,
347 ..Default::default()
348 }],
349 );
350
351 let ctx = FireRuleContext {
352 state: &state,
353 delivery: &delivery,
354 lambda_state: &None,
355 logs_state: &None,
356 container_runtime: &None,
357 };
358 let result = fire_rule(&ctx, "default", "disabled-rule");
359 let targets = result.unwrap();
361 assert_eq!(targets.len(), 1);
362 }
363
364 #[test]
365 fn fire_rule_unknown_bus_errors() {
366 let state = make_state();
367 let delivery = Arc::new(DeliveryBus::new());
368 let ctx = FireRuleContext {
369 state: &state,
370 delivery: &delivery,
371 lambda_state: &None,
372 logs_state: &None,
373 container_runtime: &None,
374 };
375 let err = fire_rule(&ctx, "missing-bus", "rule").unwrap_err();
376 assert!(err.contains("missing-bus"));
377 }
378
379 #[test]
380 fn fire_rule_no_targets_returns_empty() {
381 let state = make_state();
382 let delivery = Arc::new(DeliveryBus::new());
383 add_rule(&state, "default", "no-targets", true, Vec::new());
384 let ctx = FireRuleContext {
385 state: &state,
386 delivery: &delivery,
387 lambda_state: &None,
388 logs_state: &None,
389 container_runtime: &None,
390 };
391 let targets = fire_rule(&ctx, "default", "no-targets").unwrap();
392 assert!(targets.is_empty());
393 }
394
395 #[test]
396 fn fire_rule_with_sns_and_lambda_and_logs_targets() {
397 let state = make_state();
398 let delivery = Arc::new(DeliveryBus::new());
399 add_rule(
400 &state,
401 "default",
402 "multi",
403 true,
404 vec![
405 EventTarget {
406 id: "t-sns".to_string(),
407 arn: "arn:aws:sns:us-east-1:123456789012:topic".to_string(),
408 input: None,
409 input_path: None,
410 input_transformer: None,
411 sqs_parameters: None,
412 ..Default::default()
413 },
414 EventTarget {
415 id: "t-lambda".to_string(),
416 arn: "arn:aws:lambda:us-east-1:123456789012:function:F".to_string(),
417 input: None,
418 input_path: None,
419 input_transformer: None,
420 sqs_parameters: None,
421 ..Default::default()
422 },
423 EventTarget {
424 id: "t-logs".to_string(),
425 arn: "arn:aws:logs:us-east-1:123456789012:log-group:lg".to_string(),
426 input: None,
427 input_path: None,
428 input_transformer: None,
429 sqs_parameters: None,
430 ..Default::default()
431 },
432 ],
433 );
434 let ctx = FireRuleContext {
435 state: &state,
436 delivery: &delivery,
437 lambda_state: &None,
438 logs_state: &None,
439 container_runtime: &None,
440 };
441 let fired = fire_rule(&ctx, "default", "multi").unwrap();
442 let types: Vec<&str> = fired.iter().map(|t| t.target_type.as_str()).collect();
443 assert!(types.contains(&"sns"));
444 assert!(types.contains(&"lambda"));
445 assert!(types.contains(&"logs"));
446 }
447
448 #[test]
449 fn fire_rule_with_sqs_fifo_message_group() {
450 let state = make_state();
451 let delivery = Arc::new(DeliveryBus::new());
452 add_rule(
453 &state,
454 "default",
455 "fifo",
456 true,
457 vec![EventTarget {
458 id: "t1".to_string(),
459 arn: "arn:aws:sqs:us-east-1:123456789012:queue.fifo".to_string(),
460 input: None,
461 input_path: None,
462 input_transformer: None,
463 sqs_parameters: Some(json!({"MessageGroupId": "g1"})),
464 ..Default::default()
465 }],
466 );
467 let ctx = FireRuleContext {
468 state: &state,
469 delivery: &delivery,
470 lambda_state: &None,
471 logs_state: &None,
472 container_runtime: &None,
473 };
474 let fired = fire_rule(&ctx, "default", "fifo").unwrap();
475 assert_eq!(fired.len(), 1);
476 assert_eq!(fired[0].target_type, "sqs");
477 }
478
479 #[test]
480 fn resolve_target_body_uses_literal_input() {
481 let target = EventTarget {
482 id: "t".to_string(),
483 arn: "arn:aws:sqs:us-east-1:123:q".to_string(),
484 input: Some("{\"literal\":true}".to_string()),
485 input_path: None,
486 input_transformer: None,
487 sqs_parameters: None,
488 ..Default::default()
489 };
490 let body = resolve_target_body(&target, &json!({"ignored": 1}), "ignored");
491 assert_eq!(body, "{\"literal\":true}");
492 }
493
494 #[test]
495 fn resolve_target_body_uses_input_path_for_top_level() {
496 let target = EventTarget {
497 id: "t".to_string(),
498 arn: "arn:aws:sqs:us-east-1:123:q".to_string(),
499 input: None,
500 input_path: Some("$.detail".to_string()),
501 input_transformer: None,
502 sqs_parameters: None,
503 ..Default::default()
504 };
505 let event = json!({"detail": {"k": 1}, "other": 2});
506 let body = resolve_target_body(&target, &event, "fallback");
507 assert!(body.contains("\"k\""));
508 }
509
510 #[test]
511 fn resolve_target_body_falls_back_for_nested_input_path() {
512 let target = EventTarget {
513 id: "t".to_string(),
514 arn: "arn:aws:sqs:us-east-1:123:q".to_string(),
515 input: None,
516 input_path: Some("$.detail.nested".to_string()),
517 input_transformer: None,
518 sqs_parameters: None,
519 ..Default::default()
520 };
521 let body = resolve_target_body(&target, &json!({}), "full-event");
522 assert_eq!(body, "full-event");
523 }
524
525 #[test]
526 fn resolve_target_body_no_transform_returns_full_event() {
527 let target = EventTarget {
528 id: "t".to_string(),
529 arn: "arn:aws:sqs:us-east-1:123:q".to_string(),
530 input: None,
531 input_path: None,
532 input_transformer: None,
533 sqs_parameters: None,
534 ..Default::default()
535 };
536 let body = resolve_target_body(&target, &json!({}), "full-event");
537 assert_eq!(body, "full-event");
538 }
539}