1use chrono::{DateTime, Utc};
2use fakecloud_aws::arn::Arn;
3use parking_lot::RwLock;
4use serde::{Deserialize, Serialize};
5use serde_json::Value;
6use std::collections::BTreeMap;
7use std::sync::Arc;
8
9#[derive(Debug, Clone, Serialize, Deserialize)]
10pub struct EventBus {
11 pub name: String,
12 pub arn: String,
13 pub tags: BTreeMap<String, String>,
14 pub policy: Option<Value>,
15 pub description: Option<String>,
16 pub kms_key_identifier: Option<String>,
17 pub dead_letter_config: Option<Value>,
18 pub creation_time: DateTime<Utc>,
19 pub last_modified_time: DateTime<Utc>,
20}
21
22#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct EventRule {
24 pub name: String,
25 pub arn: String,
26 pub event_bus_name: String,
27 pub event_pattern: Option<String>,
28 pub schedule_expression: Option<String>,
29 pub state: String,
30 pub description: Option<String>,
31 pub role_arn: Option<String>,
32 pub managed_by: Option<String>,
33 pub created_by: Option<String>,
34 pub targets: Vec<EventTarget>,
35 pub tags: BTreeMap<String, String>,
36 pub last_fired: Option<DateTime<Utc>>,
37}
38
39pub type RuleKey = (String, String);
41
42#[derive(Debug, Clone, Default, Serialize, Deserialize)]
43pub struct EventTarget {
44 pub id: String,
45 pub arn: String,
46 pub input: Option<String>,
47 pub input_path: Option<String>,
48 pub input_transformer: Option<Value>,
49 pub sqs_parameters: Option<Value>,
50 #[serde(default, skip_serializing_if = "Option::is_none")]
51 pub role_arn: Option<String>,
52 #[serde(default, skip_serializing_if = "Option::is_none")]
53 pub dead_letter_config: Option<Value>,
54 #[serde(default, skip_serializing_if = "Option::is_none")]
55 pub retry_policy: Option<Value>,
56 #[serde(default, skip_serializing_if = "Option::is_none")]
57 pub ecs_parameters: Option<Value>,
58 #[serde(default, skip_serializing_if = "Option::is_none")]
59 pub batch_parameters: Option<Value>,
60 #[serde(default, skip_serializing_if = "Option::is_none")]
61 pub kinesis_parameters: Option<Value>,
62 #[serde(default, skip_serializing_if = "Option::is_none")]
63 pub redshift_data_parameters: Option<Value>,
64 #[serde(default, skip_serializing_if = "Option::is_none")]
65 pub http_parameters: Option<Value>,
66 #[serde(default, skip_serializing_if = "Option::is_none")]
67 pub sage_maker_pipeline_parameters: Option<Value>,
68 #[serde(default, skip_serializing_if = "Option::is_none")]
69 pub app_sync_parameters: Option<Value>,
70 #[serde(default, skip_serializing_if = "Option::is_none")]
71 pub run_command_parameters: Option<Value>,
72}
73
74#[derive(Debug, Clone, Serialize, Deserialize)]
75pub struct PutEvent {
76 pub event_id: String,
77 pub source: String,
78 pub detail_type: String,
79 pub detail: String,
80 pub event_bus_name: String,
81 pub time: DateTime<Utc>,
82 pub resources: Vec<String>,
83}
84
85#[derive(Debug, Clone, Serialize, Deserialize)]
86pub struct Archive {
87 pub name: String,
88 pub arn: String,
89 pub event_source_arn: String,
90 pub description: Option<String>,
91 pub event_pattern: Option<String>,
92 pub retention_days: i64,
93 pub state: String,
94 pub creation_time: DateTime<Utc>,
95 pub event_count: i64,
96 pub size_bytes: i64,
97 pub events: Vec<PutEvent>,
98}
99
100#[derive(Debug, Clone, Serialize, Deserialize)]
101pub struct Connection {
102 pub name: String,
103 pub arn: String,
104 pub description: Option<String>,
105 pub authorization_type: String,
106 pub auth_parameters: Value,
107 pub connection_state: String,
108 pub secret_arn: String,
109 pub creation_time: DateTime<Utc>,
110 pub last_modified_time: DateTime<Utc>,
111 pub last_authorized_time: DateTime<Utc>,
112}
113
114#[derive(Debug, Clone, Serialize, Deserialize)]
115pub struct ApiDestination {
116 pub name: String,
117 pub arn: String,
118 pub description: Option<String>,
119 pub connection_arn: String,
120 pub invocation_endpoint: String,
121 pub http_method: String,
122 pub invocation_rate_limit_per_second: Option<i64>,
123 pub state: String,
124 pub creation_time: DateTime<Utc>,
125 pub last_modified_time: DateTime<Utc>,
126}
127
128#[derive(Debug, Clone, Serialize, Deserialize)]
129pub struct Replay {
130 pub name: String,
131 pub arn: String,
132 pub description: Option<String>,
133 pub event_source_arn: String,
134 pub destination: Value,
135 pub event_start_time: DateTime<Utc>,
136 pub event_end_time: DateTime<Utc>,
137 pub state: String,
138 pub replay_start_time: DateTime<Utc>,
139 pub replay_end_time: Option<DateTime<Utc>>,
140}
141
142#[derive(Debug, Clone, Serialize, Deserialize)]
143pub struct Endpoint {
144 pub name: String,
145 pub arn: String,
146 pub endpoint_id: String,
147 pub endpoint_url: Option<String>,
148 pub description: Option<String>,
149 pub routing_config: Value,
150 pub replication_config: Option<Value>,
151 pub event_buses: Vec<Value>,
152 pub role_arn: Option<String>,
153 pub state: String,
154 pub creation_time: DateTime<Utc>,
155 pub last_modified_time: DateTime<Utc>,
156}
157
158#[derive(Debug, Clone, Serialize, Deserialize)]
159pub struct PartnerEventSource {
160 pub name: String,
161 pub arn: String,
162 pub account: String,
163 pub creation_time: DateTime<Utc>,
164 pub expiration_time: Option<DateTime<Utc>>,
165 pub state: String,
166}
167
168#[derive(Debug, Clone, Serialize, Deserialize)]
170pub struct LambdaInvocation {
171 pub function_arn: String,
172 pub payload: String,
173 pub timestamp: DateTime<Utc>,
174}
175
176#[derive(Debug, Clone, Serialize, Deserialize)]
178pub struct LogDelivery {
179 pub log_group_arn: String,
180 pub payload: String,
181 pub timestamp: DateTime<Utc>,
182}
183
184#[derive(Debug, Clone, Serialize, Deserialize)]
186pub struct StepFunctionExecution {
187 pub state_machine_arn: String,
188 pub payload: String,
189 pub timestamp: DateTime<Utc>,
190}
191
192mod rule_map_serde {
195 use super::{EventRule, RuleKey};
196 use serde::{Deserialize, Deserializer, Serialize, Serializer};
197 use std::collections::BTreeMap;
198
199 pub fn serialize<S: Serializer>(
200 map: &BTreeMap<RuleKey, EventRule>,
201 s: S,
202 ) -> Result<S::Ok, S::Error> {
203 let entries: Vec<(&String, &String, &EventRule)> = map
204 .iter()
205 .map(|((bus, name), rule)| (bus, name, rule))
206 .collect();
207 entries.serialize(s)
208 }
209
210 pub fn deserialize<'de, D: Deserializer<'de>>(
211 d: D,
212 ) -> Result<BTreeMap<RuleKey, EventRule>, D::Error> {
213 let entries: Vec<(String, String, EventRule)> = Vec::deserialize(d)?;
214 Ok(entries
215 .into_iter()
216 .map(|(bus, name, rule)| ((bus, name), rule))
217 .collect())
218 }
219}
220
221#[derive(Debug, Clone, Serialize, Deserialize)]
222pub struct EventBridgeState {
223 pub account_id: String,
224 pub region: String,
225 pub buses: BTreeMap<String, EventBus>,
226 #[serde(with = "rule_map_serde")]
227 pub rules: BTreeMap<RuleKey, EventRule>,
228 pub events: Vec<PutEvent>,
229 pub archives: BTreeMap<String, Archive>,
230 pub connections: BTreeMap<String, Connection>,
231 pub api_destinations: BTreeMap<String, ApiDestination>,
232 pub replays: BTreeMap<String, Replay>,
233 pub partner_event_sources: BTreeMap<String, PartnerEventSource>,
235 pub endpoints: BTreeMap<String, Endpoint>,
237 pub lambda_invocations: Vec<LambdaInvocation>,
239 pub log_deliveries: Vec<LogDelivery>,
241 pub step_function_executions: Vec<StepFunctionExecution>,
243}
244
245impl EventBridgeState {
246 pub fn new(account_id: &str, region: &str) -> Self {
247 let now = Utc::now();
248 let default_bus_arn =
249 Arn::new("events", region, account_id, "event-bus/default").to_string();
250 let mut buses = BTreeMap::new();
251 buses.insert(
252 "default".to_string(),
253 EventBus {
254 name: "default".to_string(),
255 arn: default_bus_arn,
256 tags: BTreeMap::new(),
257 policy: None,
258 description: None,
259 kms_key_identifier: None,
260 dead_letter_config: None,
261 creation_time: now,
262 last_modified_time: now,
263 },
264 );
265
266 Self {
267 account_id: account_id.to_string(),
268 region: region.to_string(),
269 buses,
270 rules: BTreeMap::new(),
271 events: Vec::new(),
272 archives: BTreeMap::new(),
273 connections: BTreeMap::new(),
274 api_destinations: BTreeMap::new(),
275 replays: BTreeMap::new(),
276 partner_event_sources: BTreeMap::new(),
277 endpoints: BTreeMap::new(),
278 lambda_invocations: Vec::new(),
279 log_deliveries: Vec::new(),
280 step_function_executions: Vec::new(),
281 }
282 }
283
284 pub fn resolve_bus_name(&self, name_or_arn: &str) -> String {
286 if name_or_arn.starts_with("arn:") {
287 name_or_arn
289 .rsplit_once("event-bus/")
290 .map(|(_, n)| n.to_string())
291 .unwrap_or_else(|| name_or_arn.to_string())
292 } else {
293 name_or_arn.to_string()
294 }
295 }
296
297 pub fn reset(&mut self) {
298 self.buses.clear();
299 self.rules.clear();
300 self.events.clear();
301 self.partner_event_sources.clear();
302 self.endpoints.clear();
303 self.lambda_invocations.clear();
304 self.log_deliveries.clear();
305 self.step_function_executions.clear();
306 let default_bus_arn = format!(
308 "arn:aws:events:{}:{}:event-bus/default",
309 self.region, self.account_id
310 );
311 self.buses.insert(
312 "default".to_string(),
313 EventBus {
314 name: "default".to_string(),
315 arn: default_bus_arn,
316 tags: BTreeMap::new(),
317 policy: None,
318 description: None,
319 kms_key_identifier: None,
320 dead_letter_config: None,
321 creation_time: Utc::now(),
322 last_modified_time: Utc::now(),
323 },
324 );
325 }
326}
327
328pub type SharedEventBridgeState =
329 Arc<RwLock<fakecloud_core::multi_account::MultiAccountState<EventBridgeState>>>;
330
331impl fakecloud_core::multi_account::AccountState for EventBridgeState {
332 fn new_for_account(account_id: &str, region: &str, _endpoint: &str) -> Self {
333 Self::new(account_id, region)
334 }
335}
336
337#[cfg(test)]
338mod tests {
339 use super::*;
340
341 #[test]
342 fn new_creates_default_bus() {
343 let state = EventBridgeState::new("123456789012", "us-east-1");
344 assert!(state.buses.contains_key("default"));
345 assert_eq!(state.account_id, "123456789012");
346 assert_eq!(state.region, "us-east-1");
347 }
348
349 #[test]
350 fn resolve_bus_name_from_arn() {
351 let state = EventBridgeState::new("123456789012", "us-east-1");
352 assert_eq!(
353 state.resolve_bus_name("arn:aws:events:us-east-1:123456789012:event-bus/my-bus"),
354 "my-bus"
355 );
356 }
357
358 #[test]
359 fn resolve_bus_name_plain() {
360 let state = EventBridgeState::new("123456789012", "us-east-1");
361 assert_eq!(state.resolve_bus_name("my-bus"), "my-bus");
362 }
363
364 #[test]
365 fn resolve_bus_name_invalid_arn_falls_back() {
366 let state = EventBridgeState::new("123456789012", "us-east-1");
367 assert_eq!(
369 state.resolve_bus_name("arn:aws:events:us-east-1:123456789012:rule/r"),
370 "arn:aws:events:us-east-1:123456789012:rule/r"
371 );
372 }
373
374 #[test]
375 fn reset_recreates_default_bus() {
376 let mut state = EventBridgeState::new("123456789012", "us-east-1");
377 state.buses.clear();
378 assert!(!state.buses.contains_key("default"));
379 state.reset();
380 assert!(state.buses.contains_key("default"));
381 }
382}
383
384#[derive(Debug, Clone, Serialize, Deserialize)]
387pub struct EventBridgeSnapshot {
388 pub schema_version: u32,
389 #[serde(default)]
390 pub accounts: Option<fakecloud_core::multi_account::MultiAccountState<EventBridgeState>>,
391 #[serde(default)]
392 pub state: Option<EventBridgeState>,
393}
394
395pub const EVENTBRIDGE_SNAPSHOT_SCHEMA_VERSION: u32 = 2;