1use std::collections::HashMap;
2use std::sync::Arc;
3
4use chrono::Utc;
5use serde_json::json;
6
7use fakecloud_core::delivery::DeliveryBus;
8use fakecloud_lambda::state::{LambdaInvocation, SharedLambdaState};
9use fakecloud_logs::state::SharedLogsState;
10
11use crate::state::{EventTarget, SharedEventBridgeState};
12
13#[derive(Debug)]
15pub struct FiredTarget {
16 pub target_type: String,
18 pub arn: String,
20}
21
22pub struct FireRuleContext<'a> {
26 pub state: &'a SharedEventBridgeState,
27 pub delivery: &'a Arc<DeliveryBus>,
28 pub lambda_state: &'a Option<SharedLambdaState>,
29 pub logs_state: &'a Option<SharedLogsState>,
30 pub container_runtime: &'a Option<Arc<fakecloud_lambda::runtime::ContainerRuntime>>,
31}
32
33pub fn fire_rule(
39 ctx: &FireRuleContext<'_>,
40 bus_name: &str,
41 rule_name: &str,
42) -> Result<Vec<FiredTarget>, String> {
43 let state = ctx.state;
44 let delivery = ctx.delivery;
45 let lambda_state = ctx.lambda_state;
46 let logs_state = ctx.logs_state;
47 let container_runtime = ctx.container_runtime;
48
49 let (targets, account_id, region) = {
50 let state = state.read();
51
52 if !state.buses.contains_key(bus_name) {
54 return Err(format!("Event bus '{bus_name}' not found"));
55 }
56
57 let key = (bus_name.to_string(), rule_name.to_string());
58 let rule = match state.rules.get(&key) {
59 Some(r) => r,
60 None => return Err(format!("Rule '{rule_name}' not found on bus '{bus_name}'")),
61 };
62
63 (
64 rule.targets.clone(),
65 state.account_id.clone(),
66 state.region.clone(),
67 )
68 };
69
70 if targets.is_empty() {
71 return Ok(Vec::new());
72 }
73
74 let now = Utc::now();
75 let event_id = uuid::Uuid::new_v4().to_string();
76
77 let event_json = json!({
79 "version": "0",
80 "id": event_id,
81 "source": "aws.events",
82 "account": account_id,
83 "detail-type": "Scheduled Event",
84 "detail": {},
85 "time": now.format("%Y-%m-%dT%H:%M:%SZ").to_string(),
86 "region": region,
87 "resources": [],
88 });
89 let event_str = event_json.to_string();
90
91 {
93 let mut s = state.write();
94 s.events.push(crate::state::PutEvent {
95 event_id: event_id.clone(),
96 source: "aws.events".to_string(),
97 detail_type: "Scheduled Event".to_string(),
98 detail: "{}".to_string(),
99 event_bus_name: bus_name.to_string(),
100 time: now,
101 resources: Vec::new(),
102 });
103 }
104
105 let mut fired = Vec::new();
106
107 for target in &targets {
108 let arn = &target.arn;
109 let body_str = resolve_target_body(target, &event_json, &event_str);
110
111 if arn.contains(":sqs:") {
112 let message_group_id = target
114 .sqs_parameters
115 .as_ref()
116 .and_then(|sp| sp["MessageGroupId"].as_str())
117 .map(|s| s.to_string());
118
119 if message_group_id.is_some() {
120 delivery.send_to_sqs_with_attrs(
121 arn,
122 &body_str,
123 &HashMap::new(),
124 message_group_id.as_deref(),
125 None,
126 );
127 } else {
128 delivery.send_to_sqs(arn, &body_str, &HashMap::new());
129 }
130 fired.push(FiredTarget {
131 target_type: "sqs".to_string(),
132 arn: arn.clone(),
133 });
134 } else if arn.contains(":sns:") {
135 delivery.publish_to_sns(arn, &body_str, Some("Scheduled Event"));
136 fired.push(FiredTarget {
137 target_type: "sns".to_string(),
138 arn: arn.clone(),
139 });
140 } else if arn.contains(":lambda:") {
141 let mut s = state.write();
142 s.lambda_invocations.push(crate::state::LambdaInvocation {
143 function_arn: arn.clone(),
144 payload: body_str.clone(),
145 timestamp: now,
146 });
147 drop(s);
148 if let Some(ref ls) = lambda_state {
149 ls.write().invocations.push(LambdaInvocation {
150 function_arn: arn.clone(),
151 payload: body_str.clone(),
152 timestamp: now,
153 source: "aws:events".to_string(),
154 });
155 }
156 crate::service::invoke_lambda_async(container_runtime, lambda_state, arn, &body_str);
157 fired.push(FiredTarget {
158 target_type: "lambda".to_string(),
159 arn: arn.clone(),
160 });
161 } else if arn.contains(":logs:") {
162 let mut s = state.write();
163 s.log_deliveries.push(crate::state::LogDelivery {
164 log_group_arn: arn.clone(),
165 payload: body_str.clone(),
166 timestamp: now,
167 });
168 drop(s);
169 if let Some(ref log_state) = logs_state {
170 crate::service::deliver_to_logs(log_state, arn, &body_str, now);
171 }
172 fired.push(FiredTarget {
173 target_type: "logs".to_string(),
174 arn: arn.clone(),
175 });
176 }
177 }
178
179 Ok(fired)
180}
181
182fn resolve_target_body(
192 target: &EventTarget,
193 event_json: &serde_json::Value,
194 event_str: &str,
195) -> String {
196 if let Some(ref input) = target.input {
197 return input.clone();
198 }
199
200 if let Some(ref input_path) = target.input_path {
201 if let Some(key) = input_path.strip_prefix("$.") {
203 if !key.contains('.') && !key.contains('[') {
204 if let Some(val) = event_json.get(key) {
205 return val.to_string();
206 }
207 }
208 }
209 }
210
211 event_str.to_string()
214}
215
216#[cfg(test)]
217mod tests {
218 use super::*;
219 use crate::state::{EventBridgeState, EventRule};
220 use parking_lot::RwLock;
221
222 fn make_state() -> SharedEventBridgeState {
223 Arc::new(RwLock::new(EventBridgeState::new(
224 "123456789012",
225 "us-east-1",
226 )))
227 }
228
229 fn add_rule(
230 state: &SharedEventBridgeState,
231 bus: &str,
232 name: &str,
233 enabled: bool,
234 targets: Vec<EventTarget>,
235 ) {
236 let mut s = state.write();
237 let key = (bus.to_string(), name.to_string());
238 s.rules.insert(
239 key,
240 EventRule {
241 name: name.to_string(),
242 arn: format!("arn:aws:events:us-east-1:123456789012:rule/{bus}/{name}"),
243 event_bus_name: bus.to_string(),
244 event_pattern: None,
245 schedule_expression: Some("rate(1 minute)".to_string()),
246 state: if enabled { "ENABLED" } else { "DISABLED" }.to_string(),
247 description: None,
248 role_arn: None,
249 managed_by: None,
250 created_by: None,
251 targets,
252 tags: HashMap::new(),
253 last_fired: None,
254 },
255 );
256 }
257
258 #[test]
259 fn fire_rule_with_valid_rule() {
260 let state = make_state();
261 let delivery = Arc::new(DeliveryBus::new());
262
263 add_rule(
264 &state,
265 "default",
266 "my-rule",
267 true,
268 vec![EventTarget {
269 id: "t1".to_string(),
270 arn: "arn:aws:sqs:us-east-1:123456789012:target-queue".to_string(),
271 input: None,
272 input_path: None,
273 input_transformer: None,
274 sqs_parameters: None,
275 }],
276 );
277
278 let ctx = FireRuleContext {
279 state: &state,
280 delivery: &delivery,
281 lambda_state: &None,
282 logs_state: &None,
283 container_runtime: &None,
284 };
285 let result = fire_rule(&ctx, "default", "my-rule");
286 let targets = result.unwrap();
287 assert_eq!(targets.len(), 1);
288 assert_eq!(targets[0].target_type, "sqs");
289 assert_eq!(
290 targets[0].arn,
291 "arn:aws:sqs:us-east-1:123456789012:target-queue"
292 );
293
294 let s = state.read();
296 assert!(s.events.iter().any(|e| e.source == "aws.events"));
297 }
298
299 #[test]
300 fn fire_rule_nonexistent_rule() {
301 let state = make_state();
302 let delivery = Arc::new(DeliveryBus::new());
303
304 let ctx = FireRuleContext {
305 state: &state,
306 delivery: &delivery,
307 lambda_state: &None,
308 logs_state: &None,
309 container_runtime: &None,
310 };
311 let result = fire_rule(&ctx, "default", "no-such-rule");
312 assert!(result.is_err());
313 assert!(result.unwrap_err().contains("not found"));
314 }
315
316 #[test]
317 fn fire_rule_disabled_still_fires() {
318 let state = make_state();
319 let delivery = Arc::new(DeliveryBus::new());
320
321 add_rule(
322 &state,
323 "default",
324 "disabled-rule",
325 false, vec![EventTarget {
327 id: "t1".to_string(),
328 arn: "arn:aws:sqs:us-east-1:123456789012:target-queue".to_string(),
329 input: None,
330 input_path: None,
331 input_transformer: None,
332 sqs_parameters: None,
333 }],
334 );
335
336 let ctx = FireRuleContext {
337 state: &state,
338 delivery: &delivery,
339 lambda_state: &None,
340 logs_state: &None,
341 container_runtime: &None,
342 };
343 let result = fire_rule(&ctx, "default", "disabled-rule");
344 let targets = result.unwrap();
346 assert_eq!(targets.len(), 1);
347 }
348}