Skip to main content

fakecloud_eventbridge/
state.rs

1use chrono::{DateTime, Utc};
2use fakecloud_aws::arn::Arn;
3use parking_lot::RwLock;
4use serde::{Deserialize, Serialize};
5use serde_json::Value;
6use std::collections::BTreeMap;
7use std::sync::Arc;
8
9#[derive(Debug, Clone, Serialize, Deserialize)]
10pub struct EventBus {
11    pub name: String,
12    pub arn: String,
13    pub tags: BTreeMap<String, String>,
14    pub policy: Option<Value>,
15    pub description: Option<String>,
16    pub kms_key_identifier: Option<String>,
17    pub dead_letter_config: Option<Value>,
18    pub creation_time: DateTime<Utc>,
19    pub last_modified_time: DateTime<Utc>,
20}
21
22#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct EventRule {
24    pub name: String,
25    pub arn: String,
26    pub event_bus_name: String,
27    pub event_pattern: Option<String>,
28    pub schedule_expression: Option<String>,
29    pub state: String,
30    pub description: Option<String>,
31    pub role_arn: Option<String>,
32    pub managed_by: Option<String>,
33    pub created_by: Option<String>,
34    pub targets: Vec<EventTarget>,
35    pub tags: BTreeMap<String, String>,
36    pub last_fired: Option<DateTime<Utc>>,
37}
38
39/// Composite key for rules: (event_bus_name, rule_name)
40pub type RuleKey = (String, String);
41
42#[derive(Debug, Clone, Default, Serialize, Deserialize)]
43pub struct EventTarget {
44    pub id: String,
45    pub arn: String,
46    pub input: Option<String>,
47    pub input_path: Option<String>,
48    pub input_transformer: Option<Value>,
49    pub sqs_parameters: Option<Value>,
50    #[serde(default, skip_serializing_if = "Option::is_none")]
51    pub role_arn: Option<String>,
52    #[serde(default, skip_serializing_if = "Option::is_none")]
53    pub dead_letter_config: Option<Value>,
54    #[serde(default, skip_serializing_if = "Option::is_none")]
55    pub retry_policy: Option<Value>,
56    #[serde(default, skip_serializing_if = "Option::is_none")]
57    pub ecs_parameters: Option<Value>,
58    #[serde(default, skip_serializing_if = "Option::is_none")]
59    pub batch_parameters: Option<Value>,
60    #[serde(default, skip_serializing_if = "Option::is_none")]
61    pub kinesis_parameters: Option<Value>,
62    #[serde(default, skip_serializing_if = "Option::is_none")]
63    pub redshift_data_parameters: Option<Value>,
64    #[serde(default, skip_serializing_if = "Option::is_none")]
65    pub http_parameters: Option<Value>,
66    #[serde(default, skip_serializing_if = "Option::is_none")]
67    pub sage_maker_pipeline_parameters: Option<Value>,
68    #[serde(default, skip_serializing_if = "Option::is_none")]
69    pub app_sync_parameters: Option<Value>,
70    #[serde(default, skip_serializing_if = "Option::is_none")]
71    pub run_command_parameters: Option<Value>,
72}
73
74#[derive(Debug, Clone, Serialize, Deserialize)]
75pub struct PutEvent {
76    pub event_id: String,
77    pub source: String,
78    pub detail_type: String,
79    pub detail: String,
80    pub event_bus_name: String,
81    pub time: DateTime<Utc>,
82    pub resources: Vec<String>,
83}
84
85#[derive(Debug, Clone, Serialize, Deserialize)]
86pub struct Archive {
87    pub name: String,
88    pub arn: String,
89    pub event_source_arn: String,
90    pub description: Option<String>,
91    pub event_pattern: Option<String>,
92    pub retention_days: i64,
93    pub state: String,
94    pub creation_time: DateTime<Utc>,
95    pub event_count: i64,
96    pub size_bytes: i64,
97    pub events: Vec<PutEvent>,
98}
99
100#[derive(Debug, Clone, Serialize, Deserialize)]
101pub struct Connection {
102    pub name: String,
103    pub arn: String,
104    pub description: Option<String>,
105    pub authorization_type: String,
106    pub auth_parameters: Value,
107    pub connection_state: String,
108    pub secret_arn: String,
109    pub creation_time: DateTime<Utc>,
110    pub last_modified_time: DateTime<Utc>,
111    pub last_authorized_time: DateTime<Utc>,
112}
113
114#[derive(Debug, Clone, Serialize, Deserialize)]
115pub struct ApiDestination {
116    pub name: String,
117    pub arn: String,
118    pub description: Option<String>,
119    pub connection_arn: String,
120    pub invocation_endpoint: String,
121    pub http_method: String,
122    pub invocation_rate_limit_per_second: Option<i64>,
123    pub state: String,
124    pub creation_time: DateTime<Utc>,
125    pub last_modified_time: DateTime<Utc>,
126}
127
128#[derive(Debug, Clone, Serialize, Deserialize)]
129pub struct Replay {
130    pub name: String,
131    pub arn: String,
132    pub description: Option<String>,
133    pub event_source_arn: String,
134    pub destination: Value,
135    pub event_start_time: DateTime<Utc>,
136    pub event_end_time: DateTime<Utc>,
137    pub state: String,
138    pub replay_start_time: DateTime<Utc>,
139    pub replay_end_time: Option<DateTime<Utc>>,
140}
141
142#[derive(Debug, Clone, Serialize, Deserialize)]
143pub struct Endpoint {
144    pub name: String,
145    pub arn: String,
146    pub endpoint_id: String,
147    pub endpoint_url: Option<String>,
148    pub description: Option<String>,
149    pub routing_config: Value,
150    pub replication_config: Option<Value>,
151    pub event_buses: Vec<Value>,
152    pub role_arn: Option<String>,
153    pub state: String,
154    pub creation_time: DateTime<Utc>,
155    pub last_modified_time: DateTime<Utc>,
156}
157
158#[derive(Debug, Clone, Serialize, Deserialize)]
159pub struct PartnerEventSource {
160    pub name: String,
161    pub arn: String,
162    pub account: String,
163    pub creation_time: DateTime<Utc>,
164    pub expiration_time: Option<DateTime<Utc>>,
165    pub state: String,
166}
167
168/// A recorded Lambda invocation from EventBridge delivery.
169#[derive(Debug, Clone, Serialize, Deserialize)]
170pub struct LambdaInvocation {
171    pub function_arn: String,
172    pub payload: String,
173    pub timestamp: DateTime<Utc>,
174}
175
176/// A recorded CloudWatch Logs delivery from EventBridge.
177#[derive(Debug, Clone, Serialize, Deserialize)]
178pub struct LogDelivery {
179    pub log_group_arn: String,
180    pub payload: String,
181    pub timestamp: DateTime<Utc>,
182}
183
184/// A recorded Step Functions invocation from EventBridge delivery.
185#[derive(Debug, Clone, Serialize, Deserialize)]
186pub struct StepFunctionExecution {
187    pub state_machine_arn: String,
188    pub payload: String,
189    pub timestamp: DateTime<Utc>,
190}
191
192/// JSON object keys must be strings, so serialize `HashMap<(String,String), V>`
193/// as a list of `[bus, rule, value]` tuples.
194mod rule_map_serde {
195    use super::{EventRule, RuleKey};
196    use serde::{Deserialize, Deserializer, Serialize, Serializer};
197    use std::collections::BTreeMap;
198
199    pub fn serialize<S: Serializer>(
200        map: &BTreeMap<RuleKey, EventRule>,
201        s: S,
202    ) -> Result<S::Ok, S::Error> {
203        let entries: Vec<(&String, &String, &EventRule)> = map
204            .iter()
205            .map(|((bus, name), rule)| (bus, name, rule))
206            .collect();
207        entries.serialize(s)
208    }
209
210    pub fn deserialize<'de, D: Deserializer<'de>>(
211        d: D,
212    ) -> Result<BTreeMap<RuleKey, EventRule>, D::Error> {
213        let entries: Vec<(String, String, EventRule)> = Vec::deserialize(d)?;
214        Ok(entries
215            .into_iter()
216            .map(|(bus, name, rule)| ((bus, name), rule))
217            .collect())
218    }
219}
220
221#[derive(Debug, Clone, Serialize, Deserialize)]
222pub struct EventBridgeState {
223    pub account_id: String,
224    pub region: String,
225    pub buses: BTreeMap<String, EventBus>,
226    #[serde(with = "rule_map_serde")]
227    pub rules: BTreeMap<RuleKey, EventRule>,
228    pub events: Vec<PutEvent>,
229    pub archives: BTreeMap<String, Archive>,
230    pub connections: BTreeMap<String, Connection>,
231    pub api_destinations: BTreeMap<String, ApiDestination>,
232    pub replays: BTreeMap<String, Replay>,
233    /// Partner event sources: name -> PartnerEventSource
234    pub partner_event_sources: BTreeMap<String, PartnerEventSource>,
235    /// Endpoints: name -> Endpoint
236    pub endpoints: BTreeMap<String, Endpoint>,
237    /// Recorded Lambda invocations (stub deliveries).
238    pub lambda_invocations: Vec<LambdaInvocation>,
239    /// Recorded CloudWatch Logs deliveries (stub deliveries).
240    pub log_deliveries: Vec<LogDelivery>,
241    /// Recorded Step Functions executions (stub deliveries).
242    pub step_function_executions: Vec<StepFunctionExecution>,
243}
244
245impl EventBridgeState {
246    pub fn new(account_id: &str, region: &str) -> Self {
247        let now = Utc::now();
248        let default_bus_arn =
249            Arn::new("events", region, account_id, "event-bus/default").to_string();
250        let mut buses = BTreeMap::new();
251        buses.insert(
252            "default".to_string(),
253            EventBus {
254                name: "default".to_string(),
255                arn: default_bus_arn,
256                tags: BTreeMap::new(),
257                policy: None,
258                description: None,
259                kms_key_identifier: None,
260                dead_letter_config: None,
261                creation_time: now,
262                last_modified_time: now,
263            },
264        );
265
266        Self {
267            account_id: account_id.to_string(),
268            region: region.to_string(),
269            buses,
270            rules: BTreeMap::new(),
271            events: Vec::new(),
272            archives: BTreeMap::new(),
273            connections: BTreeMap::new(),
274            api_destinations: BTreeMap::new(),
275            replays: BTreeMap::new(),
276            partner_event_sources: BTreeMap::new(),
277            endpoints: BTreeMap::new(),
278            lambda_invocations: Vec::new(),
279            log_deliveries: Vec::new(),
280            step_function_executions: Vec::new(),
281        }
282    }
283
284    /// Get the bus name from an ARN or a plain name.
285    pub fn resolve_bus_name(&self, name_or_arn: &str) -> String {
286        if name_or_arn.starts_with("arn:") {
287            // Extract bus name from ARN: arn:aws:events:region:account:event-bus/NAME
288            name_or_arn
289                .rsplit_once("event-bus/")
290                .map(|(_, n)| n.to_string())
291                .unwrap_or_else(|| name_or_arn.to_string())
292        } else {
293            name_or_arn.to_string()
294        }
295    }
296
297    pub fn reset(&mut self) {
298        self.buses.clear();
299        self.rules.clear();
300        self.events.clear();
301        self.partner_event_sources.clear();
302        self.endpoints.clear();
303        self.lambda_invocations.clear();
304        self.log_deliveries.clear();
305        self.step_function_executions.clear();
306        // Re-create default bus
307        let default_bus_arn = format!(
308            "arn:aws:events:{}:{}:event-bus/default",
309            self.region, self.account_id
310        );
311        self.buses.insert(
312            "default".to_string(),
313            EventBus {
314                name: "default".to_string(),
315                arn: default_bus_arn,
316                tags: BTreeMap::new(),
317                policy: None,
318                description: None,
319                kms_key_identifier: None,
320                dead_letter_config: None,
321                creation_time: Utc::now(),
322                last_modified_time: Utc::now(),
323            },
324        );
325    }
326}
327
328pub type SharedEventBridgeState =
329    Arc<RwLock<fakecloud_core::multi_account::MultiAccountState<EventBridgeState>>>;
330
331impl fakecloud_core::multi_account::AccountState for EventBridgeState {
332    fn new_for_account(account_id: &str, region: &str, _endpoint: &str) -> Self {
333        Self::new(account_id, region)
334    }
335}
336
337#[cfg(test)]
338mod tests {
339    use super::*;
340
341    #[test]
342    fn new_creates_default_bus() {
343        let state = EventBridgeState::new("123456789012", "us-east-1");
344        assert!(state.buses.contains_key("default"));
345        assert_eq!(state.account_id, "123456789012");
346        assert_eq!(state.region, "us-east-1");
347    }
348
349    #[test]
350    fn resolve_bus_name_from_arn() {
351        let state = EventBridgeState::new("123456789012", "us-east-1");
352        assert_eq!(
353            state.resolve_bus_name("arn:aws:events:us-east-1:123456789012:event-bus/my-bus"),
354            "my-bus"
355        );
356    }
357
358    #[test]
359    fn resolve_bus_name_plain() {
360        let state = EventBridgeState::new("123456789012", "us-east-1");
361        assert_eq!(state.resolve_bus_name("my-bus"), "my-bus");
362    }
363
364    #[test]
365    fn resolve_bus_name_invalid_arn_falls_back() {
366        let state = EventBridgeState::new("123456789012", "us-east-1");
367        // ARN-looking string without event-bus/ prefix
368        assert_eq!(
369            state.resolve_bus_name("arn:aws:events:us-east-1:123456789012:rule/r"),
370            "arn:aws:events:us-east-1:123456789012:rule/r"
371        );
372    }
373
374    #[test]
375    fn reset_recreates_default_bus() {
376        let mut state = EventBridgeState::new("123456789012", "us-east-1");
377        state.buses.clear();
378        assert!(!state.buses.contains_key("default"));
379        state.reset();
380        assert!(state.buses.contains_key("default"));
381    }
382}
383
384/// On-disk snapshot envelope for EventBridge state. Versioned so
385/// format changes fail loudly on upgrade.
386#[derive(Debug, Clone, Serialize, Deserialize)]
387pub struct EventBridgeSnapshot {
388    pub schema_version: u32,
389    #[serde(default)]
390    pub accounts: Option<fakecloud_core::multi_account::MultiAccountState<EventBridgeState>>,
391    #[serde(default)]
392    pub state: Option<EventBridgeState>,
393}
394
395pub const EVENTBRIDGE_SNAPSHOT_SCHEMA_VERSION: u32 = 2;