Skip to main content

fakecloud_eventbridge/
simulation.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use chrono::Utc;
5use serde_json::json;
6
7use fakecloud_core::delivery::DeliveryBus;
8use fakecloud_lambda::state::{LambdaInvocation, SharedLambdaState};
9use fakecloud_logs::state::SharedLogsState;
10
11use crate::state::{EventTarget, SharedEventBridgeState};
12
13/// Result of firing a rule via simulation.
14#[derive(Debug)]
15pub struct FiredTarget {
16    /// The target type (e.g. "sqs", "sns", "lambda", "logs").
17    pub target_type: String,
18    /// The target ARN.
19    pub arn: String,
20}
21
22/// Borrowed context passed to `fire_rule` — all the surrounding state
23/// it needs to deliver to the different target protocols. Bundled so
24/// the callers don't have to thread five positional args through.
25pub struct FireRuleContext<'a> {
26    pub state: &'a SharedEventBridgeState,
27    pub delivery: &'a Arc<DeliveryBus>,
28    pub lambda_state: &'a Option<SharedLambdaState>,
29    pub logs_state: &'a Option<SharedLogsState>,
30    pub container_runtime: &'a Option<Arc<fakecloud_lambda::runtime::ContainerRuntime>>,
31}
32
33/// Fire a specific rule by bus name and rule name, delivering to all its
34/// targets regardless of the rule's enabled/disabled state.
35///
36/// Returns `Ok(targets)` with the list of targets that were delivered to,
37/// or `Err(message)` if the bus or rule doesn't exist.
38pub fn fire_rule(
39    ctx: &FireRuleContext<'_>,
40    bus_name: &str,
41    rule_name: &str,
42) -> Result<Vec<FiredTarget>, String> {
43    let state = ctx.state;
44    let delivery = ctx.delivery;
45    let lambda_state = ctx.lambda_state;
46    let logs_state = ctx.logs_state;
47    let container_runtime = ctx.container_runtime;
48
49    let (targets, account_id, region) = {
50        let eb_accounts = state.read();
51        let eb_state = eb_accounts.default_ref();
52
53        // Verify bus exists
54        if !eb_state.buses.contains_key(bus_name) {
55            return Err(format!("Event bus '{bus_name}' not found"));
56        }
57
58        let key = (bus_name.to_string(), rule_name.to_string());
59        let rule = match eb_state.rules.get(&key) {
60            Some(r) => r,
61            None => return Err(format!("Rule '{rule_name}' not found on bus '{bus_name}'")),
62        };
63
64        (
65            rule.targets.clone(),
66            eb_state.account_id.clone(),
67            eb_state.region.clone(),
68        )
69    };
70
71    if targets.is_empty() {
72        return Ok(Vec::new());
73    }
74
75    let now = Utc::now();
76    let event_id = uuid::Uuid::new_v4().to_string();
77
78    // Build the scheduled-event envelope (same shape as the real scheduler)
79    let event_json = json!({
80        "version": "0",
81        "id": event_id,
82        "source": "aws.events",
83        "account": account_id,
84        "detail-type": "Scheduled Event",
85        "detail": {},
86        "time": now.format("%Y-%m-%dT%H:%M:%SZ").to_string(),
87        "region": region,
88        "resources": [],
89    });
90    let event_str = event_json.to_string();
91
92    // Record the event in state
93    {
94        let mut s_accounts = state.write();
95        let s = s_accounts.default_mut();
96        s.events.push(crate::state::PutEvent {
97            event_id: event_id.clone(),
98            source: "aws.events".to_string(),
99            detail_type: "Scheduled Event".to_string(),
100            detail: "{}".to_string(),
101            event_bus_name: bus_name.to_string(),
102            time: now,
103            resources: Vec::new(),
104        });
105    }
106
107    let mut fired = Vec::new();
108
109    for target in &targets {
110        let arn = &target.arn;
111        let body_str = resolve_target_body(target, &event_json, &event_str);
112
113        if arn.contains(":sqs:") {
114            // Extract MessageGroupId from SqsParameters if present (required for FIFO queues)
115            let message_group_id = target
116                .sqs_parameters
117                .as_ref()
118                .and_then(|sp| sp["MessageGroupId"].as_str())
119                .map(|s| s.to_string());
120
121            if message_group_id.is_some() {
122                delivery.send_to_sqs_with_attrs(
123                    arn,
124                    &body_str,
125                    &HashMap::new(),
126                    message_group_id.as_deref(),
127                    None,
128                );
129            } else {
130                delivery.send_to_sqs(arn, &body_str, &HashMap::new());
131            }
132            fired.push(FiredTarget {
133                target_type: "sqs".to_string(),
134                arn: arn.clone(),
135            });
136        } else if arn.contains(":sns:") {
137            delivery.publish_to_sns(arn, &body_str, Some("Scheduled Event"));
138            fired.push(FiredTarget {
139                target_type: "sns".to_string(),
140                arn: arn.clone(),
141            });
142        } else if arn.contains(":lambda:") {
143            let mut s_accounts = state.write();
144            let s = s_accounts.default_mut();
145            s.lambda_invocations.push(crate::state::LambdaInvocation {
146                function_arn: arn.clone(),
147                payload: body_str.clone(),
148                timestamp: now,
149            });
150            drop(s_accounts);
151            if let Some(ref ls) = lambda_state {
152                ls.write().default_mut().invocations.push(LambdaInvocation {
153                    function_arn: arn.clone(),
154                    payload: body_str.clone(),
155                    timestamp: now,
156                    source: "aws:events".to_string(),
157                });
158            }
159            crate::service::invoke_lambda_async(container_runtime, lambda_state, arn, &body_str);
160            fired.push(FiredTarget {
161                target_type: "lambda".to_string(),
162                arn: arn.clone(),
163            });
164        } else if arn.contains(":logs:") {
165            let mut s_accounts = state.write();
166            let s = s_accounts.default_mut();
167            s.log_deliveries.push(crate::state::LogDelivery {
168                log_group_arn: arn.clone(),
169                payload: body_str.clone(),
170                timestamp: now,
171            });
172            drop(s_accounts);
173            if let Some(ref log_state) = logs_state {
174                crate::service::deliver_to_logs(log_state, arn, &body_str, now);
175            }
176            fired.push(FiredTarget {
177                target_type: "logs".to_string(),
178                arn: arn.clone(),
179            });
180        }
181    }
182
183    Ok(fired)
184}
185
186/// Compute the message body for a target, applying Input / InputPath if
187/// present.
188///
189/// **Limitations**: `InputTransformer` is not yet implemented — if a target
190/// has one configured, we fall through to the full event envelope. Real AWS
191/// evaluates the `InputPathsMap` + `InputTemplate` to build the payload;
192/// implementing that requires a JSONPath evaluator. `InputPath` supports
193/// only the simple `$.field` case (single top-level key); deeper paths
194/// fall back to the full event.
195fn resolve_target_body(
196    target: &EventTarget,
197    event_json: &serde_json::Value,
198    event_str: &str,
199) -> String {
200    if let Some(ref input) = target.input {
201        return input.clone();
202    }
203
204    if let Some(ref input_path) = target.input_path {
205        // Support simple top-level JSONPath like "$.detail"
206        if let Some(key) = input_path.strip_prefix("$.") {
207            if !key.contains('.') && !key.contains('[') {
208                if let Some(val) = event_json.get(key) {
209                    return val.to_string();
210                }
211            }
212        }
213    }
214
215    // InputTransformer is not yet supported — fall through to full event.
216
217    event_str.to_string()
218}
219
220#[cfg(test)]
221mod tests {
222    use super::*;
223    use crate::state::EventRule;
224    use parking_lot::RwLock;
225
226    fn make_state() -> SharedEventBridgeState {
227        Arc::new(RwLock::new(
228            fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
229        ))
230    }
231
232    fn add_rule(
233        state: &SharedEventBridgeState,
234        bus: &str,
235        name: &str,
236        enabled: bool,
237        targets: Vec<EventTarget>,
238    ) {
239        let mut s_accounts = state.write();
240        let s = s_accounts.default_mut();
241        let key = (bus.to_string(), name.to_string());
242        s.rules.insert(
243            key,
244            EventRule {
245                name: name.to_string(),
246                arn: format!("arn:aws:events:us-east-1:123456789012:rule/{bus}/{name}"),
247                event_bus_name: bus.to_string(),
248                event_pattern: None,
249                schedule_expression: Some("rate(1 minute)".to_string()),
250                state: if enabled { "ENABLED" } else { "DISABLED" }.to_string(),
251                description: None,
252                role_arn: None,
253                managed_by: None,
254                created_by: None,
255                targets,
256                tags: HashMap::new(),
257                last_fired: None,
258            },
259        );
260    }
261
262    #[test]
263    fn fire_rule_with_valid_rule() {
264        let state = make_state();
265        let delivery = Arc::new(DeliveryBus::new());
266
267        add_rule(
268            &state,
269            "default",
270            "my-rule",
271            true,
272            vec![EventTarget {
273                id: "t1".to_string(),
274                arn: "arn:aws:sqs:us-east-1:123456789012:target-queue".to_string(),
275                input: None,
276                input_path: None,
277                input_transformer: None,
278                sqs_parameters: None,
279            }],
280        );
281
282        let ctx = FireRuleContext {
283            state: &state,
284            delivery: &delivery,
285            lambda_state: &None,
286            logs_state: &None,
287            container_runtime: &None,
288        };
289        let result = fire_rule(&ctx, "default", "my-rule");
290        let targets = result.unwrap();
291        assert_eq!(targets.len(), 1);
292        assert_eq!(targets[0].target_type, "sqs");
293        assert_eq!(
294            targets[0].arn,
295            "arn:aws:sqs:us-east-1:123456789012:target-queue"
296        );
297
298        // Verify event was recorded
299        let s_accounts = state.read();
300        let s = s_accounts.default_ref();
301        assert!(s.events.iter().any(|e| e.source == "aws.events"));
302    }
303
304    #[test]
305    fn fire_rule_nonexistent_rule() {
306        let state = make_state();
307        let delivery = Arc::new(DeliveryBus::new());
308
309        let ctx = FireRuleContext {
310            state: &state,
311            delivery: &delivery,
312            lambda_state: &None,
313            logs_state: &None,
314            container_runtime: &None,
315        };
316        let result = fire_rule(&ctx, "default", "no-such-rule");
317        assert!(result.is_err());
318        assert!(result.unwrap_err().contains("not found"));
319    }
320
321    #[test]
322    fn fire_rule_disabled_still_fires() {
323        let state = make_state();
324        let delivery = Arc::new(DeliveryBus::new());
325
326        add_rule(
327            &state,
328            "default",
329            "disabled-rule",
330            false, // DISABLED
331            vec![EventTarget {
332                id: "t1".to_string(),
333                arn: "arn:aws:sqs:us-east-1:123456789012:target-queue".to_string(),
334                input: None,
335                input_path: None,
336                input_transformer: None,
337                sqs_parameters: None,
338            }],
339        );
340
341        let ctx = FireRuleContext {
342            state: &state,
343            delivery: &delivery,
344            lambda_state: &None,
345            logs_state: &None,
346            container_runtime: &None,
347        };
348        let result = fire_rule(&ctx, "default", "disabled-rule");
349        // Simulation overrides disabled state
350        let targets = result.unwrap();
351        assert_eq!(targets.len(), 1);
352    }
353
354    #[test]
355    fn fire_rule_unknown_bus_errors() {
356        let state = make_state();
357        let delivery = Arc::new(DeliveryBus::new());
358        let ctx = FireRuleContext {
359            state: &state,
360            delivery: &delivery,
361            lambda_state: &None,
362            logs_state: &None,
363            container_runtime: &None,
364        };
365        let err = fire_rule(&ctx, "missing-bus", "rule").unwrap_err();
366        assert!(err.contains("missing-bus"));
367    }
368
369    #[test]
370    fn fire_rule_no_targets_returns_empty() {
371        let state = make_state();
372        let delivery = Arc::new(DeliveryBus::new());
373        add_rule(&state, "default", "no-targets", true, Vec::new());
374        let ctx = FireRuleContext {
375            state: &state,
376            delivery: &delivery,
377            lambda_state: &None,
378            logs_state: &None,
379            container_runtime: &None,
380        };
381        let targets = fire_rule(&ctx, "default", "no-targets").unwrap();
382        assert!(targets.is_empty());
383    }
384
385    #[test]
386    fn fire_rule_with_sns_and_lambda_and_logs_targets() {
387        let state = make_state();
388        let delivery = Arc::new(DeliveryBus::new());
389        add_rule(
390            &state,
391            "default",
392            "multi",
393            true,
394            vec![
395                EventTarget {
396                    id: "t-sns".to_string(),
397                    arn: "arn:aws:sns:us-east-1:123456789012:topic".to_string(),
398                    input: None,
399                    input_path: None,
400                    input_transformer: None,
401                    sqs_parameters: None,
402                },
403                EventTarget {
404                    id: "t-lambda".to_string(),
405                    arn: "arn:aws:lambda:us-east-1:123456789012:function:F".to_string(),
406                    input: None,
407                    input_path: None,
408                    input_transformer: None,
409                    sqs_parameters: None,
410                },
411                EventTarget {
412                    id: "t-logs".to_string(),
413                    arn: "arn:aws:logs:us-east-1:123456789012:log-group:lg".to_string(),
414                    input: None,
415                    input_path: None,
416                    input_transformer: None,
417                    sqs_parameters: None,
418                },
419            ],
420        );
421        let ctx = FireRuleContext {
422            state: &state,
423            delivery: &delivery,
424            lambda_state: &None,
425            logs_state: &None,
426            container_runtime: &None,
427        };
428        let fired = fire_rule(&ctx, "default", "multi").unwrap();
429        let types: Vec<&str> = fired.iter().map(|t| t.target_type.as_str()).collect();
430        assert!(types.contains(&"sns"));
431        assert!(types.contains(&"lambda"));
432        assert!(types.contains(&"logs"));
433    }
434
435    #[test]
436    fn fire_rule_with_sqs_fifo_message_group() {
437        let state = make_state();
438        let delivery = Arc::new(DeliveryBus::new());
439        add_rule(
440            &state,
441            "default",
442            "fifo",
443            true,
444            vec![EventTarget {
445                id: "t1".to_string(),
446                arn: "arn:aws:sqs:us-east-1:123456789012:queue.fifo".to_string(),
447                input: None,
448                input_path: None,
449                input_transformer: None,
450                sqs_parameters: Some(json!({"MessageGroupId": "g1"})),
451            }],
452        );
453        let ctx = FireRuleContext {
454            state: &state,
455            delivery: &delivery,
456            lambda_state: &None,
457            logs_state: &None,
458            container_runtime: &None,
459        };
460        let fired = fire_rule(&ctx, "default", "fifo").unwrap();
461        assert_eq!(fired.len(), 1);
462        assert_eq!(fired[0].target_type, "sqs");
463    }
464
465    #[test]
466    fn resolve_target_body_uses_literal_input() {
467        let target = EventTarget {
468            id: "t".to_string(),
469            arn: "arn:aws:sqs:us-east-1:123:q".to_string(),
470            input: Some("{\"literal\":true}".to_string()),
471            input_path: None,
472            input_transformer: None,
473            sqs_parameters: None,
474        };
475        let body = resolve_target_body(&target, &json!({"ignored": 1}), "ignored");
476        assert_eq!(body, "{\"literal\":true}");
477    }
478
479    #[test]
480    fn resolve_target_body_uses_input_path_for_top_level() {
481        let target = EventTarget {
482            id: "t".to_string(),
483            arn: "arn:aws:sqs:us-east-1:123:q".to_string(),
484            input: None,
485            input_path: Some("$.detail".to_string()),
486            input_transformer: None,
487            sqs_parameters: None,
488        };
489        let event = json!({"detail": {"k": 1}, "other": 2});
490        let body = resolve_target_body(&target, &event, "fallback");
491        assert!(body.contains("\"k\""));
492    }
493
494    #[test]
495    fn resolve_target_body_falls_back_for_nested_input_path() {
496        let target = EventTarget {
497            id: "t".to_string(),
498            arn: "arn:aws:sqs:us-east-1:123:q".to_string(),
499            input: None,
500            input_path: Some("$.detail.nested".to_string()),
501            input_transformer: None,
502            sqs_parameters: None,
503        };
504        let body = resolve_target_body(&target, &json!({}), "full-event");
505        assert_eq!(body, "full-event");
506    }
507
508    #[test]
509    fn resolve_target_body_no_transform_returns_full_event() {
510        let target = EventTarget {
511            id: "t".to_string(),
512            arn: "arn:aws:sqs:us-east-1:123:q".to_string(),
513            input: None,
514            input_path: None,
515            input_transformer: None,
516            sqs_parameters: None,
517        };
518        let body = resolve_target_body(&target, &json!({}), "full-event");
519        assert_eq!(body, "full-event");
520    }
521}