Skip to main content

fakecloud_eventbridge/
simulation.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use chrono::Utc;
5use serde_json::json;
6
7use fakecloud_core::delivery::DeliveryBus;
8use fakecloud_lambda::state::{LambdaInvocation, SharedLambdaState};
9use fakecloud_logs::state::SharedLogsState;
10
11use crate::state::{EventTarget, SharedEventBridgeState};
12
13/// Result of firing a rule via simulation.
14#[derive(Debug)]
15pub struct FiredTarget {
16    /// The target type (e.g. "sqs", "sns", "lambda", "logs").
17    pub target_type: String,
18    /// The target ARN.
19    pub arn: String,
20}
21
22/// Borrowed context passed to `fire_rule` — all the surrounding state
23/// it needs to deliver to the different target protocols. Bundled so
24/// the callers don't have to thread five positional args through.
25pub struct FireRuleContext<'a> {
26    pub state: &'a SharedEventBridgeState,
27    pub delivery: &'a Arc<DeliveryBus>,
28    pub lambda_state: &'a Option<SharedLambdaState>,
29    pub logs_state: &'a Option<SharedLogsState>,
30    pub container_runtime: &'a Option<Arc<fakecloud_lambda::runtime::ContainerRuntime>>,
31}
32
33/// Fire a specific rule by bus name and rule name, delivering to all its
34/// targets regardless of the rule's enabled/disabled state.
35///
36/// Returns `Ok(targets)` with the list of targets that were delivered to,
37/// or `Err(message)` if the bus or rule doesn't exist.
38pub fn fire_rule(
39    ctx: &FireRuleContext<'_>,
40    bus_name: &str,
41    rule_name: &str,
42) -> Result<Vec<FiredTarget>, String> {
43    let state = ctx.state;
44    let delivery = ctx.delivery;
45    let lambda_state = ctx.lambda_state;
46    let logs_state = ctx.logs_state;
47    let container_runtime = ctx.container_runtime;
48
49    let (targets, account_id, region) = {
50        let state = state.read();
51
52        // Verify bus exists
53        if !state.buses.contains_key(bus_name) {
54            return Err(format!("Event bus '{bus_name}' not found"));
55        }
56
57        let key = (bus_name.to_string(), rule_name.to_string());
58        let rule = match state.rules.get(&key) {
59            Some(r) => r,
60            None => return Err(format!("Rule '{rule_name}' not found on bus '{bus_name}'")),
61        };
62
63        (
64            rule.targets.clone(),
65            state.account_id.clone(),
66            state.region.clone(),
67        )
68    };
69
70    if targets.is_empty() {
71        return Ok(Vec::new());
72    }
73
74    let now = Utc::now();
75    let event_id = uuid::Uuid::new_v4().to_string();
76
77    // Build the scheduled-event envelope (same shape as the real scheduler)
78    let event_json = json!({
79        "version": "0",
80        "id": event_id,
81        "source": "aws.events",
82        "account": account_id,
83        "detail-type": "Scheduled Event",
84        "detail": {},
85        "time": now.format("%Y-%m-%dT%H:%M:%SZ").to_string(),
86        "region": region,
87        "resources": [],
88    });
89    let event_str = event_json.to_string();
90
91    // Record the event in state
92    {
93        let mut s = state.write();
94        s.events.push(crate::state::PutEvent {
95            event_id: event_id.clone(),
96            source: "aws.events".to_string(),
97            detail_type: "Scheduled Event".to_string(),
98            detail: "{}".to_string(),
99            event_bus_name: bus_name.to_string(),
100            time: now,
101            resources: Vec::new(),
102        });
103    }
104
105    let mut fired = Vec::new();
106
107    for target in &targets {
108        let arn = &target.arn;
109        let body_str = resolve_target_body(target, &event_json, &event_str);
110
111        if arn.contains(":sqs:") {
112            // Extract MessageGroupId from SqsParameters if present (required for FIFO queues)
113            let message_group_id = target
114                .sqs_parameters
115                .as_ref()
116                .and_then(|sp| sp["MessageGroupId"].as_str())
117                .map(|s| s.to_string());
118
119            if message_group_id.is_some() {
120                delivery.send_to_sqs_with_attrs(
121                    arn,
122                    &body_str,
123                    &HashMap::new(),
124                    message_group_id.as_deref(),
125                    None,
126                );
127            } else {
128                delivery.send_to_sqs(arn, &body_str, &HashMap::new());
129            }
130            fired.push(FiredTarget {
131                target_type: "sqs".to_string(),
132                arn: arn.clone(),
133            });
134        } else if arn.contains(":sns:") {
135            delivery.publish_to_sns(arn, &body_str, Some("Scheduled Event"));
136            fired.push(FiredTarget {
137                target_type: "sns".to_string(),
138                arn: arn.clone(),
139            });
140        } else if arn.contains(":lambda:") {
141            let mut s = state.write();
142            s.lambda_invocations.push(crate::state::LambdaInvocation {
143                function_arn: arn.clone(),
144                payload: body_str.clone(),
145                timestamp: now,
146            });
147            drop(s);
148            if let Some(ref ls) = lambda_state {
149                ls.write().invocations.push(LambdaInvocation {
150                    function_arn: arn.clone(),
151                    payload: body_str.clone(),
152                    timestamp: now,
153                    source: "aws:events".to_string(),
154                });
155            }
156            crate::service::invoke_lambda_async(container_runtime, lambda_state, arn, &body_str);
157            fired.push(FiredTarget {
158                target_type: "lambda".to_string(),
159                arn: arn.clone(),
160            });
161        } else if arn.contains(":logs:") {
162            let mut s = state.write();
163            s.log_deliveries.push(crate::state::LogDelivery {
164                log_group_arn: arn.clone(),
165                payload: body_str.clone(),
166                timestamp: now,
167            });
168            drop(s);
169            if let Some(ref log_state) = logs_state {
170                crate::service::deliver_to_logs(log_state, arn, &body_str, now);
171            }
172            fired.push(FiredTarget {
173                target_type: "logs".to_string(),
174                arn: arn.clone(),
175            });
176        }
177    }
178
179    Ok(fired)
180}
181
182/// Compute the message body for a target, applying Input / InputPath if
183/// present.
184///
185/// **Limitations**: `InputTransformer` is not yet implemented — if a target
186/// has one configured, we fall through to the full event envelope. Real AWS
187/// evaluates the `InputPathsMap` + `InputTemplate` to build the payload;
188/// implementing that requires a JSONPath evaluator. `InputPath` supports
189/// only the simple `$.field` case (single top-level key); deeper paths
190/// fall back to the full event.
191fn resolve_target_body(
192    target: &EventTarget,
193    event_json: &serde_json::Value,
194    event_str: &str,
195) -> String {
196    if let Some(ref input) = target.input {
197        return input.clone();
198    }
199
200    if let Some(ref input_path) = target.input_path {
201        // Support simple top-level JSONPath like "$.detail"
202        if let Some(key) = input_path.strip_prefix("$.") {
203            if !key.contains('.') && !key.contains('[') {
204                if let Some(val) = event_json.get(key) {
205                    return val.to_string();
206                }
207            }
208        }
209    }
210
211    // InputTransformer is not yet supported — fall through to full event.
212
213    event_str.to_string()
214}
215
216#[cfg(test)]
217mod tests {
218    use super::*;
219    use crate::state::{EventBridgeState, EventRule};
220    use parking_lot::RwLock;
221
222    fn make_state() -> SharedEventBridgeState {
223        Arc::new(RwLock::new(EventBridgeState::new(
224            "123456789012",
225            "us-east-1",
226        )))
227    }
228
229    fn add_rule(
230        state: &SharedEventBridgeState,
231        bus: &str,
232        name: &str,
233        enabled: bool,
234        targets: Vec<EventTarget>,
235    ) {
236        let mut s = state.write();
237        let key = (bus.to_string(), name.to_string());
238        s.rules.insert(
239            key,
240            EventRule {
241                name: name.to_string(),
242                arn: format!("arn:aws:events:us-east-1:123456789012:rule/{bus}/{name}"),
243                event_bus_name: bus.to_string(),
244                event_pattern: None,
245                schedule_expression: Some("rate(1 minute)".to_string()),
246                state: if enabled { "ENABLED" } else { "DISABLED" }.to_string(),
247                description: None,
248                role_arn: None,
249                managed_by: None,
250                created_by: None,
251                targets,
252                tags: HashMap::new(),
253                last_fired: None,
254            },
255        );
256    }
257
258    #[test]
259    fn fire_rule_with_valid_rule() {
260        let state = make_state();
261        let delivery = Arc::new(DeliveryBus::new());
262
263        add_rule(
264            &state,
265            "default",
266            "my-rule",
267            true,
268            vec![EventTarget {
269                id: "t1".to_string(),
270                arn: "arn:aws:sqs:us-east-1:123456789012:target-queue".to_string(),
271                input: None,
272                input_path: None,
273                input_transformer: None,
274                sqs_parameters: None,
275            }],
276        );
277
278        let ctx = FireRuleContext {
279            state: &state,
280            delivery: &delivery,
281            lambda_state: &None,
282            logs_state: &None,
283            container_runtime: &None,
284        };
285        let result = fire_rule(&ctx, "default", "my-rule");
286        let targets = result.unwrap();
287        assert_eq!(targets.len(), 1);
288        assert_eq!(targets[0].target_type, "sqs");
289        assert_eq!(
290            targets[0].arn,
291            "arn:aws:sqs:us-east-1:123456789012:target-queue"
292        );
293
294        // Verify event was recorded
295        let s = state.read();
296        assert!(s.events.iter().any(|e| e.source == "aws.events"));
297    }
298
299    #[test]
300    fn fire_rule_nonexistent_rule() {
301        let state = make_state();
302        let delivery = Arc::new(DeliveryBus::new());
303
304        let ctx = FireRuleContext {
305            state: &state,
306            delivery: &delivery,
307            lambda_state: &None,
308            logs_state: &None,
309            container_runtime: &None,
310        };
311        let result = fire_rule(&ctx, "default", "no-such-rule");
312        assert!(result.is_err());
313        assert!(result.unwrap_err().contains("not found"));
314    }
315
316    #[test]
317    fn fire_rule_disabled_still_fires() {
318        let state = make_state();
319        let delivery = Arc::new(DeliveryBus::new());
320
321        add_rule(
322            &state,
323            "default",
324            "disabled-rule",
325            false, // DISABLED
326            vec![EventTarget {
327                id: "t1".to_string(),
328                arn: "arn:aws:sqs:us-east-1:123456789012:target-queue".to_string(),
329                input: None,
330                input_path: None,
331                input_transformer: None,
332                sqs_parameters: None,
333            }],
334        );
335
336        let ctx = FireRuleContext {
337            state: &state,
338            delivery: &delivery,
339            lambda_state: &None,
340            logs_state: &None,
341            container_runtime: &None,
342        };
343        let result = fire_rule(&ctx, "default", "disabled-rule");
344        // Simulation overrides disabled state
345        let targets = result.unwrap();
346        assert_eq!(targets.len(), 1);
347    }
348}