Skip to main content

fakecloud_eventbridge/
simulation.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use chrono::Utc;
5use serde_json::json;
6
7use fakecloud_core::delivery::DeliveryBus;
8use fakecloud_lambda::state::{LambdaInvocation, SharedLambdaState};
9use fakecloud_logs::state::SharedLogsState;
10
11use crate::state::{EventTarget, SharedEventBridgeState};
12
13/// Result of firing a rule via simulation.
14#[derive(Debug)]
15pub struct FiredTarget {
16    /// The target type (e.g. "sqs", "sns", "lambda", "logs").
17    pub target_type: String,
18    /// The target ARN.
19    pub arn: String,
20}
21
22/// Fire a specific rule by bus name and rule name, delivering to all its
23/// targets regardless of the rule's enabled/disabled state.
24///
25/// Returns `Ok(targets)` with the list of targets that were delivered to,
26/// or `Err(message)` if the bus or rule doesn't exist.
27pub fn fire_rule(
28    state: &SharedEventBridgeState,
29    delivery: &Arc<DeliveryBus>,
30    lambda_state: &Option<SharedLambdaState>,
31    logs_state: &Option<SharedLogsState>,
32    container_runtime: &Option<Arc<fakecloud_lambda::runtime::ContainerRuntime>>,
33    bus_name: &str,
34    rule_name: &str,
35) -> Result<Vec<FiredTarget>, String> {
36    let (targets, account_id, region) = {
37        let state = state.read();
38
39        // Verify bus exists
40        if !state.buses.contains_key(bus_name) {
41            return Err(format!("Event bus '{bus_name}' not found"));
42        }
43
44        let key = (bus_name.to_string(), rule_name.to_string());
45        let rule = match state.rules.get(&key) {
46            Some(r) => r,
47            None => return Err(format!("Rule '{rule_name}' not found on bus '{bus_name}'")),
48        };
49
50        (
51            rule.targets.clone(),
52            state.account_id.clone(),
53            state.region.clone(),
54        )
55    };
56
57    if targets.is_empty() {
58        return Ok(Vec::new());
59    }
60
61    let now = Utc::now();
62    let event_id = uuid::Uuid::new_v4().to_string();
63
64    // Build the scheduled-event envelope (same shape as the real scheduler)
65    let event_json = json!({
66        "version": "0",
67        "id": event_id,
68        "source": "aws.events",
69        "account": account_id,
70        "detail-type": "Scheduled Event",
71        "detail": {},
72        "time": now.format("%Y-%m-%dT%H:%M:%SZ").to_string(),
73        "region": region,
74        "resources": [],
75    });
76    let event_str = event_json.to_string();
77
78    // Record the event in state
79    {
80        let mut s = state.write();
81        s.events.push(crate::state::PutEvent {
82            event_id: event_id.clone(),
83            source: "aws.events".to_string(),
84            detail_type: "Scheduled Event".to_string(),
85            detail: "{}".to_string(),
86            event_bus_name: bus_name.to_string(),
87            time: now,
88            resources: Vec::new(),
89        });
90    }
91
92    let mut fired = Vec::new();
93
94    for target in &targets {
95        let arn = &target.arn;
96        let body_str = resolve_target_body(target, &event_json, &event_str);
97
98        if arn.contains(":sqs:") {
99            // Extract MessageGroupId from SqsParameters if present (required for FIFO queues)
100            let message_group_id = target
101                .sqs_parameters
102                .as_ref()
103                .and_then(|sp| sp["MessageGroupId"].as_str())
104                .map(|s| s.to_string());
105
106            if message_group_id.is_some() {
107                delivery.send_to_sqs_with_attrs(
108                    arn,
109                    &body_str,
110                    &HashMap::new(),
111                    message_group_id.as_deref(),
112                    None,
113                );
114            } else {
115                delivery.send_to_sqs(arn, &body_str, &HashMap::new());
116            }
117            fired.push(FiredTarget {
118                target_type: "sqs".to_string(),
119                arn: arn.clone(),
120            });
121        } else if arn.contains(":sns:") {
122            delivery.publish_to_sns(arn, &body_str, Some("Scheduled Event"));
123            fired.push(FiredTarget {
124                target_type: "sns".to_string(),
125                arn: arn.clone(),
126            });
127        } else if arn.contains(":lambda:") {
128            let mut s = state.write();
129            s.lambda_invocations.push(crate::state::LambdaInvocation {
130                function_arn: arn.clone(),
131                payload: body_str.clone(),
132                timestamp: now,
133            });
134            drop(s);
135            if let Some(ref ls) = lambda_state {
136                ls.write().invocations.push(LambdaInvocation {
137                    function_arn: arn.clone(),
138                    payload: body_str.clone(),
139                    timestamp: now,
140                    source: "aws:events".to_string(),
141                });
142            }
143            crate::service::invoke_lambda_async(container_runtime, lambda_state, arn, &body_str);
144            fired.push(FiredTarget {
145                target_type: "lambda".to_string(),
146                arn: arn.clone(),
147            });
148        } else if arn.contains(":logs:") {
149            let mut s = state.write();
150            s.log_deliveries.push(crate::state::LogDelivery {
151                log_group_arn: arn.clone(),
152                payload: body_str.clone(),
153                timestamp: now,
154            });
155            drop(s);
156            if let Some(ref log_state) = logs_state {
157                crate::service::deliver_to_logs(log_state, arn, &body_str, now);
158            }
159            fired.push(FiredTarget {
160                target_type: "logs".to_string(),
161                arn: arn.clone(),
162            });
163        }
164    }
165
166    Ok(fired)
167}
168
169/// Compute the message body for a target, applying Input / InputPath if
170/// present.
171///
172/// **Limitations**: `InputTransformer` is not yet implemented — if a target
173/// has one configured, we fall through to the full event envelope. Real AWS
174/// evaluates the `InputPathsMap` + `InputTemplate` to build the payload;
175/// implementing that requires a JSONPath evaluator. `InputPath` supports
176/// only the simple `$.field` case (single top-level key); deeper paths
177/// fall back to the full event.
178fn resolve_target_body(
179    target: &EventTarget,
180    event_json: &serde_json::Value,
181    event_str: &str,
182) -> String {
183    if let Some(ref input) = target.input {
184        return input.clone();
185    }
186
187    if let Some(ref input_path) = target.input_path {
188        // Support simple top-level JSONPath like "$.detail"
189        if let Some(key) = input_path.strip_prefix("$.") {
190            if !key.contains('.') && !key.contains('[') {
191                if let Some(val) = event_json.get(key) {
192                    return val.to_string();
193                }
194            }
195        }
196    }
197
198    // InputTransformer is not yet supported — fall through to full event.
199
200    event_str.to_string()
201}
202
203#[cfg(test)]
204mod tests {
205    use super::*;
206    use crate::state::{EventBridgeState, EventRule};
207    use parking_lot::RwLock;
208
209    fn make_state() -> SharedEventBridgeState {
210        Arc::new(RwLock::new(EventBridgeState::new(
211            "123456789012",
212            "us-east-1",
213        )))
214    }
215
216    fn add_rule(
217        state: &SharedEventBridgeState,
218        bus: &str,
219        name: &str,
220        enabled: bool,
221        targets: Vec<EventTarget>,
222    ) {
223        let mut s = state.write();
224        let key = (bus.to_string(), name.to_string());
225        s.rules.insert(
226            key,
227            EventRule {
228                name: name.to_string(),
229                arn: format!("arn:aws:events:us-east-1:123456789012:rule/{bus}/{name}"),
230                event_bus_name: bus.to_string(),
231                event_pattern: None,
232                schedule_expression: Some("rate(1 minute)".to_string()),
233                state: if enabled { "ENABLED" } else { "DISABLED" }.to_string(),
234                description: None,
235                role_arn: None,
236                managed_by: None,
237                created_by: None,
238                targets,
239                tags: HashMap::new(),
240                last_fired: None,
241            },
242        );
243    }
244
245    #[test]
246    fn fire_rule_with_valid_rule() {
247        let state = make_state();
248        let delivery = Arc::new(DeliveryBus::new());
249
250        add_rule(
251            &state,
252            "default",
253            "my-rule",
254            true,
255            vec![EventTarget {
256                id: "t1".to_string(),
257                arn: "arn:aws:sqs:us-east-1:123456789012:target-queue".to_string(),
258                input: None,
259                input_path: None,
260                input_transformer: None,
261                sqs_parameters: None,
262            }],
263        );
264
265        let result = fire_rule(&state, &delivery, &None, &None, &None, "default", "my-rule");
266        let targets = result.unwrap();
267        assert_eq!(targets.len(), 1);
268        assert_eq!(targets[0].target_type, "sqs");
269        assert_eq!(
270            targets[0].arn,
271            "arn:aws:sqs:us-east-1:123456789012:target-queue"
272        );
273
274        // Verify event was recorded
275        let s = state.read();
276        assert!(s.events.iter().any(|e| e.source == "aws.events"));
277    }
278
279    #[test]
280    fn fire_rule_nonexistent_rule() {
281        let state = make_state();
282        let delivery = Arc::new(DeliveryBus::new());
283
284        let result = fire_rule(
285            &state,
286            &delivery,
287            &None,
288            &None,
289            &None,
290            "default",
291            "no-such-rule",
292        );
293        assert!(result.is_err());
294        assert!(result.unwrap_err().contains("not found"));
295    }
296
297    #[test]
298    fn fire_rule_disabled_still_fires() {
299        let state = make_state();
300        let delivery = Arc::new(DeliveryBus::new());
301
302        add_rule(
303            &state,
304            "default",
305            "disabled-rule",
306            false, // DISABLED
307            vec![EventTarget {
308                id: "t1".to_string(),
309                arn: "arn:aws:sqs:us-east-1:123456789012:target-queue".to_string(),
310                input: None,
311                input_path: None,
312                input_transformer: None,
313                sqs_parameters: None,
314            }],
315        );
316
317        let result = fire_rule(
318            &state,
319            &delivery,
320            &None,
321            &None,
322            &None,
323            "default",
324            "disabled-rule",
325        );
326        // Simulation overrides disabled state
327        let targets = result.unwrap();
328        assert_eq!(targets.len(), 1);
329    }
330}