Skip to main content

fakecloud_eventbridge/
state.rs

1use chrono::{DateTime, Utc};
2use fakecloud_aws::arn::Arn;
3use parking_lot::RwLock;
4use serde::{Deserialize, Serialize};
5use serde_json::Value;
6use std::collections::HashMap;
7use std::sync::Arc;
8
9#[derive(Debug, Clone, Serialize, Deserialize)]
10pub struct EventBus {
11    pub name: String,
12    pub arn: String,
13    pub tags: HashMap<String, String>,
14    pub policy: Option<Value>,
15    pub description: Option<String>,
16    pub kms_key_identifier: Option<String>,
17    pub dead_letter_config: Option<Value>,
18    pub creation_time: DateTime<Utc>,
19    pub last_modified_time: DateTime<Utc>,
20}
21
22#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct EventRule {
24    pub name: String,
25    pub arn: String,
26    pub event_bus_name: String,
27    pub event_pattern: Option<String>,
28    pub schedule_expression: Option<String>,
29    pub state: String,
30    pub description: Option<String>,
31    pub role_arn: Option<String>,
32    pub managed_by: Option<String>,
33    pub created_by: Option<String>,
34    pub targets: Vec<EventTarget>,
35    pub tags: HashMap<String, String>,
36    pub last_fired: Option<DateTime<Utc>>,
37}
38
39/// Composite key for rules: (event_bus_name, rule_name)
40pub type RuleKey = (String, String);
41
42#[derive(Debug, Clone, Serialize, Deserialize)]
43pub struct EventTarget {
44    pub id: String,
45    pub arn: String,
46    pub input: Option<String>,
47    pub input_path: Option<String>,
48    pub input_transformer: Option<Value>,
49    pub sqs_parameters: Option<Value>,
50}
51
52#[derive(Debug, Clone, Serialize, Deserialize)]
53pub struct PutEvent {
54    pub event_id: String,
55    pub source: String,
56    pub detail_type: String,
57    pub detail: String,
58    pub event_bus_name: String,
59    pub time: DateTime<Utc>,
60    pub resources: Vec<String>,
61}
62
63#[derive(Debug, Clone, Serialize, Deserialize)]
64pub struct Archive {
65    pub name: String,
66    pub arn: String,
67    pub event_source_arn: String,
68    pub description: Option<String>,
69    pub event_pattern: Option<String>,
70    pub retention_days: i64,
71    pub state: String,
72    pub creation_time: DateTime<Utc>,
73    pub event_count: i64,
74    pub size_bytes: i64,
75    pub events: Vec<PutEvent>,
76}
77
78#[derive(Debug, Clone, Serialize, Deserialize)]
79pub struct Connection {
80    pub name: String,
81    pub arn: String,
82    pub description: Option<String>,
83    pub authorization_type: String,
84    pub auth_parameters: Value,
85    pub connection_state: String,
86    pub secret_arn: String,
87    pub creation_time: DateTime<Utc>,
88    pub last_modified_time: DateTime<Utc>,
89    pub last_authorized_time: DateTime<Utc>,
90}
91
92#[derive(Debug, Clone, Serialize, Deserialize)]
93pub struct ApiDestination {
94    pub name: String,
95    pub arn: String,
96    pub description: Option<String>,
97    pub connection_arn: String,
98    pub invocation_endpoint: String,
99    pub http_method: String,
100    pub invocation_rate_limit_per_second: Option<i64>,
101    pub state: String,
102    pub creation_time: DateTime<Utc>,
103    pub last_modified_time: DateTime<Utc>,
104}
105
106#[derive(Debug, Clone, Serialize, Deserialize)]
107pub struct Replay {
108    pub name: String,
109    pub arn: String,
110    pub description: Option<String>,
111    pub event_source_arn: String,
112    pub destination: Value,
113    pub event_start_time: DateTime<Utc>,
114    pub event_end_time: DateTime<Utc>,
115    pub state: String,
116    pub replay_start_time: DateTime<Utc>,
117    pub replay_end_time: Option<DateTime<Utc>>,
118}
119
120#[derive(Debug, Clone, Serialize, Deserialize)]
121pub struct Endpoint {
122    pub name: String,
123    pub arn: String,
124    pub endpoint_id: String,
125    pub endpoint_url: Option<String>,
126    pub description: Option<String>,
127    pub routing_config: Value,
128    pub replication_config: Option<Value>,
129    pub event_buses: Vec<Value>,
130    pub role_arn: Option<String>,
131    pub state: String,
132    pub creation_time: DateTime<Utc>,
133    pub last_modified_time: DateTime<Utc>,
134}
135
136#[derive(Debug, Clone, Serialize, Deserialize)]
137pub struct PartnerEventSource {
138    pub name: String,
139    pub arn: String,
140    pub account: String,
141    pub creation_time: DateTime<Utc>,
142    pub expiration_time: Option<DateTime<Utc>>,
143    pub state: String,
144}
145
146/// A recorded Lambda invocation from EventBridge delivery.
147#[derive(Debug, Clone, Serialize, Deserialize)]
148pub struct LambdaInvocation {
149    pub function_arn: String,
150    pub payload: String,
151    pub timestamp: DateTime<Utc>,
152}
153
154/// A recorded CloudWatch Logs delivery from EventBridge.
155#[derive(Debug, Clone, Serialize, Deserialize)]
156pub struct LogDelivery {
157    pub log_group_arn: String,
158    pub payload: String,
159    pub timestamp: DateTime<Utc>,
160}
161
162/// A recorded Step Functions invocation from EventBridge delivery.
163#[derive(Debug, Clone, Serialize, Deserialize)]
164pub struct StepFunctionExecution {
165    pub state_machine_arn: String,
166    pub payload: String,
167    pub timestamp: DateTime<Utc>,
168}
169
170/// JSON object keys must be strings, so serialize `HashMap<(String,String), V>`
171/// as a list of `[bus, rule, value]` tuples.
172mod rule_map_serde {
173    use super::{EventRule, RuleKey};
174    use serde::{Deserialize, Deserializer, Serialize, Serializer};
175    use std::collections::HashMap;
176
177    pub fn serialize<S: Serializer>(
178        map: &HashMap<RuleKey, EventRule>,
179        s: S,
180    ) -> Result<S::Ok, S::Error> {
181        let entries: Vec<(&String, &String, &EventRule)> = map
182            .iter()
183            .map(|((bus, name), rule)| (bus, name, rule))
184            .collect();
185        entries.serialize(s)
186    }
187
188    pub fn deserialize<'de, D: Deserializer<'de>>(
189        d: D,
190    ) -> Result<HashMap<RuleKey, EventRule>, D::Error> {
191        let entries: Vec<(String, String, EventRule)> = Vec::deserialize(d)?;
192        Ok(entries
193            .into_iter()
194            .map(|(bus, name, rule)| ((bus, name), rule))
195            .collect())
196    }
197}
198
199#[derive(Debug, Clone, Serialize, Deserialize)]
200pub struct EventBridgeState {
201    pub account_id: String,
202    pub region: String,
203    pub buses: HashMap<String, EventBus>,
204    #[serde(with = "rule_map_serde")]
205    pub rules: HashMap<RuleKey, EventRule>,
206    pub events: Vec<PutEvent>,
207    pub archives: HashMap<String, Archive>,
208    pub connections: HashMap<String, Connection>,
209    pub api_destinations: HashMap<String, ApiDestination>,
210    pub replays: HashMap<String, Replay>,
211    /// Partner event sources: name -> PartnerEventSource
212    pub partner_event_sources: HashMap<String, PartnerEventSource>,
213    /// Endpoints: name -> Endpoint
214    pub endpoints: HashMap<String, Endpoint>,
215    /// Recorded Lambda invocations (stub deliveries).
216    pub lambda_invocations: Vec<LambdaInvocation>,
217    /// Recorded CloudWatch Logs deliveries (stub deliveries).
218    pub log_deliveries: Vec<LogDelivery>,
219    /// Recorded Step Functions executions (stub deliveries).
220    pub step_function_executions: Vec<StepFunctionExecution>,
221}
222
223impl EventBridgeState {
224    pub fn new(account_id: &str, region: &str) -> Self {
225        let now = Utc::now();
226        let default_bus_arn =
227            Arn::new("events", region, account_id, "event-bus/default").to_string();
228        let mut buses = HashMap::new();
229        buses.insert(
230            "default".to_string(),
231            EventBus {
232                name: "default".to_string(),
233                arn: default_bus_arn,
234                tags: HashMap::new(),
235                policy: None,
236                description: None,
237                kms_key_identifier: None,
238                dead_letter_config: None,
239                creation_time: now,
240                last_modified_time: now,
241            },
242        );
243
244        Self {
245            account_id: account_id.to_string(),
246            region: region.to_string(),
247            buses,
248            rules: HashMap::new(),
249            events: Vec::new(),
250            archives: HashMap::new(),
251            connections: HashMap::new(),
252            api_destinations: HashMap::new(),
253            replays: HashMap::new(),
254            partner_event_sources: HashMap::new(),
255            endpoints: HashMap::new(),
256            lambda_invocations: Vec::new(),
257            log_deliveries: Vec::new(),
258            step_function_executions: Vec::new(),
259        }
260    }
261
262    /// Get the bus name from an ARN or a plain name.
263    pub fn resolve_bus_name(&self, name_or_arn: &str) -> String {
264        if name_or_arn.starts_with("arn:") {
265            // Extract bus name from ARN: arn:aws:events:region:account:event-bus/NAME
266            name_or_arn
267                .rsplit_once("event-bus/")
268                .map(|(_, n)| n.to_string())
269                .unwrap_or_else(|| name_or_arn.to_string())
270        } else {
271            name_or_arn.to_string()
272        }
273    }
274
275    pub fn reset(&mut self) {
276        self.buses.clear();
277        self.rules.clear();
278        self.events.clear();
279        self.partner_event_sources.clear();
280        self.endpoints.clear();
281        self.lambda_invocations.clear();
282        self.log_deliveries.clear();
283        self.step_function_executions.clear();
284        // Re-create default bus
285        let default_bus_arn = format!(
286            "arn:aws:events:{}:{}:event-bus/default",
287            self.region, self.account_id
288        );
289        self.buses.insert(
290            "default".to_string(),
291            EventBus {
292                name: "default".to_string(),
293                arn: default_bus_arn,
294                tags: HashMap::new(),
295                policy: None,
296                description: None,
297                kms_key_identifier: None,
298                dead_letter_config: None,
299                creation_time: Utc::now(),
300                last_modified_time: Utc::now(),
301            },
302        );
303    }
304}
305
306pub type SharedEventBridgeState =
307    Arc<RwLock<fakecloud_core::multi_account::MultiAccountState<EventBridgeState>>>;
308
309impl fakecloud_core::multi_account::AccountState for EventBridgeState {
310    fn new_for_account(account_id: &str, region: &str, _endpoint: &str) -> Self {
311        Self::new(account_id, region)
312    }
313}
314
315#[cfg(test)]
316mod tests {
317    use super::*;
318
319    #[test]
320    fn new_creates_default_bus() {
321        let state = EventBridgeState::new("123456789012", "us-east-1");
322        assert!(state.buses.contains_key("default"));
323        assert_eq!(state.account_id, "123456789012");
324        assert_eq!(state.region, "us-east-1");
325    }
326
327    #[test]
328    fn resolve_bus_name_from_arn() {
329        let state = EventBridgeState::new("123456789012", "us-east-1");
330        assert_eq!(
331            state.resolve_bus_name("arn:aws:events:us-east-1:123456789012:event-bus/my-bus"),
332            "my-bus"
333        );
334    }
335
336    #[test]
337    fn resolve_bus_name_plain() {
338        let state = EventBridgeState::new("123456789012", "us-east-1");
339        assert_eq!(state.resolve_bus_name("my-bus"), "my-bus");
340    }
341
342    #[test]
343    fn resolve_bus_name_invalid_arn_falls_back() {
344        let state = EventBridgeState::new("123456789012", "us-east-1");
345        // ARN-looking string without event-bus/ prefix
346        assert_eq!(
347            state.resolve_bus_name("arn:aws:events:us-east-1:123456789012:rule/r"),
348            "arn:aws:events:us-east-1:123456789012:rule/r"
349        );
350    }
351
352    #[test]
353    fn reset_recreates_default_bus() {
354        let mut state = EventBridgeState::new("123456789012", "us-east-1");
355        state.buses.clear();
356        assert!(!state.buses.contains_key("default"));
357        state.reset();
358        assert!(state.buses.contains_key("default"));
359    }
360}
361
362/// On-disk snapshot envelope for EventBridge state. Versioned so
363/// format changes fail loudly on upgrade.
364#[derive(Debug, Clone, Serialize, Deserialize)]
365pub struct EventBridgeSnapshot {
366    pub schema_version: u32,
367    #[serde(default)]
368    pub accounts: Option<fakecloud_core::multi_account::MultiAccountState<EventBridgeState>>,
369    #[serde(default)]
370    pub state: Option<EventBridgeState>,
371}
372
373pub const EVENTBRIDGE_SNAPSHOT_SCHEMA_VERSION: u32 = 2;