Skip to main content

fakecloud_eventbridge/
simulation.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use chrono::Utc;
5use serde_json::json;
6
7use fakecloud_core::delivery::DeliveryBus;
8use fakecloud_lambda::{LambdaInvocation, SharedLambdaState};
9use fakecloud_logs::SharedLogsState;
10
11use crate::state::{EventTarget, SharedEventBridgeState};
12
13/// Result of firing a rule via simulation.
14#[derive(Debug)]
15pub struct FiredTarget {
16    /// The target type (e.g. "sqs", "sns", "lambda", "logs").
17    pub target_type: String,
18    /// The target ARN.
19    pub arn: String,
20}
21
22/// Borrowed context passed to `fire_rule` — all the surrounding state
23/// it needs to deliver to the different target protocols. Bundled so
24/// the callers don't have to thread five positional args through.
25pub struct FireRuleContext<'a> {
26    pub state: &'a SharedEventBridgeState,
27    pub delivery: &'a Arc<DeliveryBus>,
28    pub lambda_state: &'a Option<SharedLambdaState>,
29    pub logs_state: &'a Option<SharedLogsState>,
30    pub container_runtime: &'a Option<Arc<fakecloud_lambda::runtime::ContainerRuntime>>,
31}
32
33/// Fire a specific rule by bus name and rule name, delivering to all its
34/// targets regardless of the rule's enabled/disabled state.
35///
36/// Returns `Ok(targets)` with the list of targets that were delivered to,
37/// or `Err(message)` if the bus or rule doesn't exist.
38pub fn fire_rule(
39    ctx: &FireRuleContext<'_>,
40    bus_name: &str,
41    rule_name: &str,
42) -> Result<Vec<FiredTarget>, String> {
43    let state = ctx.state;
44    let delivery = ctx.delivery;
45    let lambda_state = ctx.lambda_state;
46    let logs_state = ctx.logs_state;
47    let container_runtime = ctx.container_runtime;
48
49    let (targets, account_id, region) = {
50        let eb_accounts = state.read();
51        let eb_state = eb_accounts.default_ref();
52
53        // Verify bus exists
54        if !eb_state.buses.contains_key(bus_name) {
55            return Err(format!("Event bus '{bus_name}' not found"));
56        }
57
58        let key = (bus_name.to_string(), rule_name.to_string());
59        let rule = match eb_state.rules.get(&key) {
60            Some(r) => r,
61            None => return Err(format!("Rule '{rule_name}' not found on bus '{bus_name}'")),
62        };
63
64        (
65            rule.targets.clone(),
66            eb_state.account_id.clone(),
67            eb_state.region.clone(),
68        )
69    };
70
71    if targets.is_empty() {
72        return Ok(Vec::new());
73    }
74
75    let now = Utc::now();
76    let event_id = uuid::Uuid::new_v4().to_string();
77
78    // Build the scheduled-event envelope (same shape as the real scheduler)
79    let event_json = json!({
80        "version": "0",
81        "id": event_id,
82        "source": "aws.events",
83        "account": account_id,
84        "detail-type": "Scheduled Event",
85        "detail": {},
86        "time": now.format("%Y-%m-%dT%H:%M:%SZ").to_string(),
87        "region": region,
88        "resources": [],
89    });
90    let event_str = event_json.to_string();
91
92    // Record the event in state
93    {
94        let mut s_accounts = state.write();
95        let s = s_accounts.default_mut();
96        s.events.push(crate::state::PutEvent {
97            event_id: event_id.clone(),
98            source: "aws.events".to_string(),
99            detail_type: "Scheduled Event".to_string(),
100            detail: "{}".to_string(),
101            event_bus_name: bus_name.to_string(),
102            time: now,
103            resources: Vec::new(),
104        });
105    }
106
107    let mut fired = Vec::new();
108
109    for target in &targets {
110        let arn = &target.arn;
111        let body_str = resolve_target_body(target, &event_json, &event_str);
112
113        if arn.contains(":sqs:") {
114            // Extract MessageGroupId from SqsParameters if present (required for FIFO queues)
115            let message_group_id = target
116                .sqs_parameters
117                .as_ref()
118                .and_then(|sp| sp["MessageGroupId"].as_str())
119                .map(|s| s.to_string());
120
121            if message_group_id.is_some() {
122                delivery.send_to_sqs_with_attrs(
123                    arn,
124                    &body_str,
125                    &HashMap::new(),
126                    message_group_id.as_deref(),
127                    None,
128                );
129            } else {
130                delivery.send_to_sqs(arn, &body_str, &HashMap::new());
131            }
132            fired.push(FiredTarget {
133                target_type: "sqs".to_string(),
134                arn: arn.clone(),
135            });
136        } else if arn.contains(":sns:") {
137            delivery.publish_to_sns(arn, &body_str, Some("Scheduled Event"));
138            fired.push(FiredTarget {
139                target_type: "sns".to_string(),
140                arn: arn.clone(),
141            });
142        } else if arn.contains(":lambda:") {
143            let mut s_accounts = state.write();
144            let s = s_accounts.default_mut();
145            s.lambda_invocations.push(crate::state::LambdaInvocation {
146                function_arn: arn.clone(),
147                payload: body_str.clone(),
148                timestamp: now,
149            });
150            drop(s_accounts);
151            if let Some(ref ls) = lambda_state {
152                ls.write().default_mut().invocations.push(LambdaInvocation {
153                    function_arn: arn.clone(),
154                    payload: body_str.clone(),
155                    timestamp: now,
156                    source: "aws:events".to_string(),
157                });
158            }
159            crate::service::invoke_lambda_async(container_runtime, lambda_state, arn, &body_str);
160            fired.push(FiredTarget {
161                target_type: "lambda".to_string(),
162                arn: arn.clone(),
163            });
164        } else if arn.contains(":logs:") {
165            let mut s_accounts = state.write();
166            let s = s_accounts.default_mut();
167            s.log_deliveries.push(crate::state::LogDelivery {
168                log_group_arn: arn.clone(),
169                payload: body_str.clone(),
170                timestamp: now,
171            });
172            drop(s_accounts);
173            if let Some(ref log_state) = logs_state {
174                crate::service::deliver_to_logs(log_state, arn, &body_str, now);
175            }
176            fired.push(FiredTarget {
177                target_type: "logs".to_string(),
178                arn: arn.clone(),
179            });
180        }
181    }
182
183    Ok(fired)
184}
185
186/// Compute the message body for a target, applying Input / InputPath if
187/// present.
188///
189/// **Limitations**: `InputTransformer` is not yet implemented — if a target
190/// has one configured, we fall through to the full event envelope. Real AWS
191/// evaluates the `InputPathsMap` + `InputTemplate` to build the payload;
192/// implementing that requires a JSONPath evaluator. `InputPath` supports
193/// only the simple `$.field` case (single top-level key); deeper paths
194/// fall back to the full event.
195fn resolve_target_body(
196    target: &EventTarget,
197    event_json: &serde_json::Value,
198    event_str: &str,
199) -> String {
200    if let Some(ref input) = target.input {
201        return input.clone();
202    }
203
204    if let Some(ref input_path) = target.input_path {
205        // Support simple top-level JSONPath like "$.detail"
206        if let Some(key) = input_path.strip_prefix("$.") {
207            if !key.contains('.') && !key.contains('[') {
208                if let Some(val) = event_json.get(key) {
209                    return val.to_string();
210                }
211            }
212        }
213    }
214
215    // InputTransformer is not yet supported — fall through to full event.
216
217    event_str.to_string()
218}
219
220#[cfg(test)]
221mod tests {
222    use super::*;
223    use crate::state::EventRule;
224    use fakecloud_aws::arn::Arn;
225    use parking_lot::RwLock;
226    use std::collections::BTreeMap;
227
228    fn make_state() -> SharedEventBridgeState {
229        Arc::new(RwLock::new(
230            fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
231        ))
232    }
233
234    fn add_rule(
235        state: &SharedEventBridgeState,
236        bus: &str,
237        name: &str,
238        enabled: bool,
239        targets: Vec<EventTarget>,
240    ) {
241        let mut s_accounts = state.write();
242        let s = s_accounts.default_mut();
243        let key = (bus.to_string(), name.to_string());
244        s.rules.insert(
245            key,
246            EventRule {
247                name: name.to_string(),
248                arn: Arn::new(
249                    "events",
250                    "us-east-1",
251                    "123456789012",
252                    &format!("rule/{bus}/{name}"),
253                )
254                .to_string(),
255                event_bus_name: bus.to_string(),
256                event_pattern: None,
257                schedule_expression: Some("rate(1 minute)".to_string()),
258                state: if enabled { "ENABLED" } else { "DISABLED" }.to_string(),
259                description: None,
260                role_arn: None,
261                managed_by: None,
262                created_by: None,
263                targets,
264                tags: BTreeMap::new(),
265                last_fired: None,
266            },
267        );
268    }
269
270    #[test]
271    fn fire_rule_with_valid_rule() {
272        let state = make_state();
273        let delivery = Arc::new(DeliveryBus::new());
274
275        add_rule(
276            &state,
277            "default",
278            "my-rule",
279            true,
280            vec![EventTarget {
281                id: "t1".to_string(),
282                arn: "arn:aws:sqs:us-east-1:123456789012:target-queue".to_string(),
283                input: None,
284                input_path: None,
285                input_transformer: None,
286                sqs_parameters: None,
287                ..Default::default()
288            }],
289        );
290
291        let ctx = FireRuleContext {
292            state: &state,
293            delivery: &delivery,
294            lambda_state: &None,
295            logs_state: &None,
296            container_runtime: &None,
297        };
298        let result = fire_rule(&ctx, "default", "my-rule");
299        let targets = result.unwrap();
300        assert_eq!(targets.len(), 1);
301        assert_eq!(targets[0].target_type, "sqs");
302        assert_eq!(
303            targets[0].arn,
304            "arn:aws:sqs:us-east-1:123456789012:target-queue"
305        );
306
307        // Verify event was recorded
308        let s_accounts = state.read();
309        let s = s_accounts.default_ref();
310        assert!(s.events.iter().any(|e| e.source == "aws.events"));
311    }
312
313    #[test]
314    fn fire_rule_nonexistent_rule() {
315        let state = make_state();
316        let delivery = Arc::new(DeliveryBus::new());
317
318        let ctx = FireRuleContext {
319            state: &state,
320            delivery: &delivery,
321            lambda_state: &None,
322            logs_state: &None,
323            container_runtime: &None,
324        };
325        let result = fire_rule(&ctx, "default", "no-such-rule");
326        assert!(result.is_err());
327        assert!(result.unwrap_err().contains("not found"));
328    }
329
330    #[test]
331    fn fire_rule_disabled_still_fires() {
332        let state = make_state();
333        let delivery = Arc::new(DeliveryBus::new());
334
335        add_rule(
336            &state,
337            "default",
338            "disabled-rule",
339            false, // DISABLED
340            vec![EventTarget {
341                id: "t1".to_string(),
342                arn: "arn:aws:sqs:us-east-1:123456789012:target-queue".to_string(),
343                input: None,
344                input_path: None,
345                input_transformer: None,
346                sqs_parameters: None,
347                ..Default::default()
348            }],
349        );
350
351        let ctx = FireRuleContext {
352            state: &state,
353            delivery: &delivery,
354            lambda_state: &None,
355            logs_state: &None,
356            container_runtime: &None,
357        };
358        let result = fire_rule(&ctx, "default", "disabled-rule");
359        // Simulation overrides disabled state
360        let targets = result.unwrap();
361        assert_eq!(targets.len(), 1);
362    }
363
364    #[test]
365    fn fire_rule_unknown_bus_errors() {
366        let state = make_state();
367        let delivery = Arc::new(DeliveryBus::new());
368        let ctx = FireRuleContext {
369            state: &state,
370            delivery: &delivery,
371            lambda_state: &None,
372            logs_state: &None,
373            container_runtime: &None,
374        };
375        let err = fire_rule(&ctx, "missing-bus", "rule").unwrap_err();
376        assert!(err.contains("missing-bus"));
377    }
378
379    #[test]
380    fn fire_rule_no_targets_returns_empty() {
381        let state = make_state();
382        let delivery = Arc::new(DeliveryBus::new());
383        add_rule(&state, "default", "no-targets", true, Vec::new());
384        let ctx = FireRuleContext {
385            state: &state,
386            delivery: &delivery,
387            lambda_state: &None,
388            logs_state: &None,
389            container_runtime: &None,
390        };
391        let targets = fire_rule(&ctx, "default", "no-targets").unwrap();
392        assert!(targets.is_empty());
393    }
394
395    #[test]
396    fn fire_rule_with_sns_and_lambda_and_logs_targets() {
397        let state = make_state();
398        let delivery = Arc::new(DeliveryBus::new());
399        add_rule(
400            &state,
401            "default",
402            "multi",
403            true,
404            vec![
405                EventTarget {
406                    id: "t-sns".to_string(),
407                    arn: "arn:aws:sns:us-east-1:123456789012:topic".to_string(),
408                    input: None,
409                    input_path: None,
410                    input_transformer: None,
411                    sqs_parameters: None,
412                    ..Default::default()
413                },
414                EventTarget {
415                    id: "t-lambda".to_string(),
416                    arn: "arn:aws:lambda:us-east-1:123456789012:function:F".to_string(),
417                    input: None,
418                    input_path: None,
419                    input_transformer: None,
420                    sqs_parameters: None,
421                    ..Default::default()
422                },
423                EventTarget {
424                    id: "t-logs".to_string(),
425                    arn: "arn:aws:logs:us-east-1:123456789012:log-group:lg".to_string(),
426                    input: None,
427                    input_path: None,
428                    input_transformer: None,
429                    sqs_parameters: None,
430                    ..Default::default()
431                },
432            ],
433        );
434        let ctx = FireRuleContext {
435            state: &state,
436            delivery: &delivery,
437            lambda_state: &None,
438            logs_state: &None,
439            container_runtime: &None,
440        };
441        let fired = fire_rule(&ctx, "default", "multi").unwrap();
442        let types: Vec<&str> = fired.iter().map(|t| t.target_type.as_str()).collect();
443        assert!(types.contains(&"sns"));
444        assert!(types.contains(&"lambda"));
445        assert!(types.contains(&"logs"));
446    }
447
448    #[test]
449    fn fire_rule_with_sqs_fifo_message_group() {
450        let state = make_state();
451        let delivery = Arc::new(DeliveryBus::new());
452        add_rule(
453            &state,
454            "default",
455            "fifo",
456            true,
457            vec![EventTarget {
458                id: "t1".to_string(),
459                arn: "arn:aws:sqs:us-east-1:123456789012:queue.fifo".to_string(),
460                input: None,
461                input_path: None,
462                input_transformer: None,
463                sqs_parameters: Some(json!({"MessageGroupId": "g1"})),
464                ..Default::default()
465            }],
466        );
467        let ctx = FireRuleContext {
468            state: &state,
469            delivery: &delivery,
470            lambda_state: &None,
471            logs_state: &None,
472            container_runtime: &None,
473        };
474        let fired = fire_rule(&ctx, "default", "fifo").unwrap();
475        assert_eq!(fired.len(), 1);
476        assert_eq!(fired[0].target_type, "sqs");
477    }
478
479    #[test]
480    fn resolve_target_body_uses_literal_input() {
481        let target = EventTarget {
482            id: "t".to_string(),
483            arn: "arn:aws:sqs:us-east-1:123:q".to_string(),
484            input: Some("{\"literal\":true}".to_string()),
485            input_path: None,
486            input_transformer: None,
487            sqs_parameters: None,
488            ..Default::default()
489        };
490        let body = resolve_target_body(&target, &json!({"ignored": 1}), "ignored");
491        assert_eq!(body, "{\"literal\":true}");
492    }
493
494    #[test]
495    fn resolve_target_body_uses_input_path_for_top_level() {
496        let target = EventTarget {
497            id: "t".to_string(),
498            arn: "arn:aws:sqs:us-east-1:123:q".to_string(),
499            input: None,
500            input_path: Some("$.detail".to_string()),
501            input_transformer: None,
502            sqs_parameters: None,
503            ..Default::default()
504        };
505        let event = json!({"detail": {"k": 1}, "other": 2});
506        let body = resolve_target_body(&target, &event, "fallback");
507        assert!(body.contains("\"k\""));
508    }
509
510    #[test]
511    fn resolve_target_body_falls_back_for_nested_input_path() {
512        let target = EventTarget {
513            id: "t".to_string(),
514            arn: "arn:aws:sqs:us-east-1:123:q".to_string(),
515            input: None,
516            input_path: Some("$.detail.nested".to_string()),
517            input_transformer: None,
518            sqs_parameters: None,
519            ..Default::default()
520        };
521        let body = resolve_target_body(&target, &json!({}), "full-event");
522        assert_eq!(body, "full-event");
523    }
524
525    #[test]
526    fn resolve_target_body_no_transform_returns_full_event() {
527        let target = EventTarget {
528            id: "t".to_string(),
529            arn: "arn:aws:sqs:us-east-1:123:q".to_string(),
530            input: None,
531            input_path: None,
532            input_transformer: None,
533            sqs_parameters: None,
534            ..Default::default()
535        };
536        let body = resolve_target_body(&target, &json!({}), "full-event");
537        assert_eq!(body, "full-event");
538    }
539}