Skip to main content

alien_core/events/
bus.rs

1use crate::events::{AlienEvent, EventChange, EventHandle, EventHandler, EventState};
2use crate::{ErrorData, Result};
3use alien_error::Context;
4use chrono::Utc;
5use std::sync::Arc;
6use tokio::sync::RwLock;
7use uuid::Uuid;
8
9tokio::task_local! {
10    /// Task-local event bus instance
11    static EVENT_BUS: EventBus;
12}
13
14tokio::task_local! {
15    /// Task-local parent event ID for automatic hierarchy
16    static PARENT_EVENT_ID: Option<String>;
17}
18
19/// The event bus for managing events within a task context
20pub struct EventBus {
21    /// Registered event handlers
22    handlers: Arc<RwLock<Vec<Arc<dyn EventHandler>>>>,
23}
24
25impl EventBus {
26    /// Create a new event bus
27    pub fn new() -> Self {
28        Self {
29            handlers: Arc::new(RwLock::new(Vec::new())),
30        }
31    }
32
33    /// Create a new event bus with handlers
34    pub fn with_handlers(handlers: Vec<Arc<dyn EventHandler>>) -> Self {
35        Self {
36            handlers: Arc::new(RwLock::new(handlers)),
37        }
38    }
39
40    /// Run a function with this event bus as the task-local context
41    pub async fn run<F, Fut, T>(&self, f: F) -> T
42    where
43        F: FnOnce() -> Fut,
44        Fut: std::future::Future<Output = T>,
45    {
46        EVENT_BUS.scope(self.clone(), f()).await
47    }
48
49    /// Register an event handler to the current task-local event bus
50    pub async fn register_handler(handler: Arc<dyn EventHandler>) -> Result<()> {
51        let bus = match EVENT_BUS.try_with(|bus| bus.clone()) {
52            Ok(bus) => bus,
53            Err(_) => return Ok(()), // No bus context, silently ignore
54        };
55
56        let mut handlers = bus.handlers.write().await;
57        handlers.push(handler);
58        Ok(())
59    }
60
61    /// Emit a new event using the current task-local event bus
62    pub async fn emit(
63        event: AlienEvent,
64        parent_id: Option<String>,
65        state: EventState,
66    ) -> Result<EventHandle> {
67        let bus = match EVENT_BUS.try_with(|bus| bus.clone()) {
68            Ok(bus) => bus,
69            Err(_) => return Ok(EventHandle::noop()), // No bus context, return no-op handle
70        };
71
72        // Generate unique ID
73        let id = Uuid::new_v4().to_string();
74
75        // Use provided parent_id or check thread-local context
76        let effective_parent_id =
77            parent_id.or_else(|| PARENT_EVENT_ID.try_with(|p| p.clone()).ok().flatten());
78
79        let now = Utc::now();
80
81        // Create the event change
82        let change = EventChange::Created {
83            id: id.clone(),
84            parent_id: effective_parent_id.clone(),
85            created_at: now,
86            event: event.clone(),
87            state: state.clone(),
88        };
89
90        // Notify handlers and collect any errors
91        {
92            let handlers = bus.handlers.read().await;
93            for handler in handlers.iter() {
94                handler
95                    .on_event_change(change.clone())
96                    .await
97                    .context(ErrorData::GenericError {
98                        message: "Event handler failed".to_string(),
99                    })?;
100            }
101        }
102
103        Ok(EventHandle::new(id, effective_parent_id))
104    }
105
106    /// Update an existing event using the current task-local event bus
107    pub async fn update(id: &str, event: AlienEvent) -> Result<()> {
108        let bus = match EVENT_BUS.try_with(|bus| bus.clone()) {
109            Ok(bus) => bus,
110            Err(_) => return Ok(()), // No bus context, silently ignore
111        };
112
113        let now = Utc::now();
114        let change = EventChange::Updated {
115            id: id.to_string(),
116            updated_at: now,
117            event,
118        };
119
120        // Notify handlers and collect any errors
121        let handlers = bus.handlers.read().await;
122        for handler in handlers.iter() {
123            handler
124                .on_event_change(change.clone())
125                .await
126                .context(ErrorData::GenericError {
127                    message: "Event handler failed".to_string(),
128                })?;
129        }
130
131        Ok(())
132    }
133
134    /// Update the state of an event using the current task-local event bus
135    pub async fn update_state(id: &str, new_state: EventState) -> Result<()> {
136        let bus = match EVENT_BUS.try_with(|bus| bus.clone()) {
137            Ok(bus) => bus,
138            Err(_) => return Ok(()), // No bus context, silently ignore
139        };
140
141        let now = Utc::now();
142        let change = EventChange::StateChanged {
143            id: id.to_string(),
144            updated_at: now,
145            new_state,
146        };
147
148        // Notify handlers and collect any errors
149        let handlers = bus.handlers.read().await;
150        for handler in handlers.iter() {
151            if let Err(e) = handler.on_event_change(change.clone()).await {
152                // Return the first handler error we encounter
153                return Err(e).context(ErrorData::GenericError {
154                    message: "Event handler failed".to_string(),
155                });
156            }
157        }
158
159        Ok(())
160    }
161
162    /// Run a function with a parent event context
163    pub async fn with_parent<F, Fut, T>(parent_id: Option<String>, f: F) -> T
164    where
165        F: FnOnce(&EventHandle) -> Fut,
166        Fut: std::future::Future<Output = T>,
167    {
168        // Create a dummy handle for the context
169        let handle = EventHandle::new(parent_id.clone().unwrap_or_else(|| String::new()), None);
170
171        if let Some(parent) = parent_id {
172            PARENT_EVENT_ID.scope(Some(parent), f(&handle)).await
173        } else {
174            f(&handle).await
175        }
176    }
177
178    /// Get the current event bus from task-local storage if available
179    pub fn current() -> Option<Self> {
180        EVENT_BUS.try_with(|bus| bus.clone()).ok()
181    }
182}
183
184impl Clone for EventBus {
185    fn clone(&self) -> Self {
186        Self {
187            handlers: self.handlers.clone(),
188        }
189    }
190}
191
192#[cfg(test)]
193mod tests {
194    use alien_error::AlienError;
195
196    use crate::ErrorData;
197
198    use super::*;
199    use std::sync::Mutex;
200
201    struct TestHandler {
202        changes: Arc<Mutex<Vec<EventChange>>>,
203    }
204
205    #[async_trait::async_trait]
206    impl EventHandler for TestHandler {
207        async fn on_event_change(&self, change: EventChange) -> Result<()> {
208            let mut changes = self.changes.lock().unwrap();
209            changes.push(change);
210            Ok(())
211        }
212    }
213
214    #[tokio::test]
215    async fn test_event_emission() {
216        let changes = Arc::new(Mutex::new(Vec::new()));
217        let handler = Arc::new(TestHandler {
218            changes: changes.clone(),
219        });
220        let bus = EventBus::with_handlers(vec![handler]);
221
222        bus.run(|| async {
223            let _handle = AlienEvent::BuildingStack {
224                stack: "test".to_string(),
225            }
226            .emit()
227            .await
228            .unwrap();
229
230            // Check that we got a Created change
231            let changes = changes.lock().unwrap();
232            assert_eq!(changes.len(), 1);
233            match &changes[0] {
234                EventChange::Created { event, .. } => match event {
235                    AlienEvent::BuildingStack { stack } => assert_eq!(stack, "test"),
236                    _ => panic!("Wrong event type"),
237                },
238                _ => panic!("Expected Created change"),
239            }
240        })
241        .await;
242    }
243
244    #[tokio::test]
245    async fn test_event_hierarchy() {
246        let changes = Arc::new(Mutex::new(Vec::new()));
247        let handler = Arc::new(TestHandler {
248            changes: changes.clone(),
249        });
250        let bus = EventBus::with_handlers(vec![handler]);
251
252        bus.run(|| async {
253            let parent = AlienEvent::BuildingStack {
254                stack: "parent".to_string(),
255            }
256            .emit()
257            .await
258            .unwrap();
259
260            // Use as_parent to establish context for child events
261            parent
262                .as_parent(|_| async {
263                    AlienEvent::TestBuildImage {
264                        image: "child".to_string(),
265                        stage: "test".to_string(),
266                    }
267                    .emit()
268                    .await
269                    .unwrap();
270                })
271                .await;
272
273            let changes = changes.lock().unwrap();
274            assert_eq!(changes.len(), 2);
275
276            // Check parent
277            match &changes[0] {
278                EventChange::Created { id, parent_id, .. } => {
279                    assert_eq!(id, &parent.id);
280                    assert_eq!(parent_id, &None);
281                }
282                _ => panic!("Expected Created change for parent"),
283            }
284
285            // Check child
286            match &changes[1] {
287                EventChange::Created { parent_id, .. } => {
288                    assert_eq!(parent_id, &Some(parent.id.clone()));
289                }
290                _ => panic!("Expected Created change for child"),
291            }
292        })
293        .await;
294    }
295
296    #[tokio::test]
297    async fn test_event_update() {
298        let changes = Arc::new(Mutex::new(Vec::new()));
299        let handler = Arc::new(TestHandler {
300            changes: changes.clone(),
301        });
302        let bus = EventBus::with_handlers(vec![handler]);
303
304        bus.run(|| async {
305            let handle = AlienEvent::TestBuildImage {
306                image: "test".to_string(),
307                stage: "stage1".to_string(),
308            }
309            .emit()
310            .await
311            .unwrap();
312
313            handle
314                .update(AlienEvent::TestBuildImage {
315                    image: "test".to_string(),
316                    stage: "stage2".to_string(),
317                })
318                .await
319                .unwrap();
320
321            let changes = changes.lock().unwrap();
322            assert_eq!(changes.len(), 2);
323
324            // Check update
325            match &changes[1] {
326                EventChange::Updated { id, event, .. } => {
327                    assert_eq!(id, &handle.id);
328                    match event {
329                        AlienEvent::TestBuildImage { stage, .. } => assert_eq!(stage, "stage2"),
330                        _ => panic!("Wrong event type"),
331                    }
332                }
333                _ => panic!("Expected Updated change"),
334            }
335        })
336        .await;
337    }
338
339    #[tokio::test]
340    async fn test_scoped_success() {
341        let changes = Arc::new(Mutex::new(Vec::new()));
342        let handler = Arc::new(TestHandler {
343            changes: changes.clone(),
344        });
345        let bus = EventBus::with_handlers(vec![handler]);
346
347        bus.run(|| async {
348            let result = AlienEvent::BuildingStack {
349                stack: "test".to_string(),
350            }
351            .in_scope(|_handle| async move {
352                // Emit a child event - this will automatically be a child due to in_scope
353                AlienEvent::TestBuildImage {
354                    image: "child".to_string(),
355                    stage: "test".to_string(),
356                }
357                .emit()
358                .await
359                .unwrap();
360                Ok::<_, AlienError<ErrorData>>(42)
361            })
362            .await
363            .unwrap();
364
365            assert_eq!(result, 42);
366
367            let changes = changes.lock().unwrap();
368            assert_eq!(changes.len(), 3); // Created (Started), Created (child), StateChanged (Success)
369
370            // Check final state change
371            match &changes[2] {
372                EventChange::StateChanged { new_state, .. } => {
373                    assert_eq!(new_state, &EventState::Success);
374                }
375                _ => panic!("Expected StateChanged to Success"),
376            }
377        })
378        .await;
379    }
380
381    #[tokio::test]
382    async fn test_scoped_failure() {
383        let changes = Arc::new(Mutex::new(Vec::new()));
384        let handler = Arc::new(TestHandler {
385            changes: changes.clone(),
386        });
387        let bus = EventBus::with_handlers(vec![handler]);
388
389        bus.run(|| async {
390            let result = AlienEvent::BuildingStack {
391                stack: "test".to_string(),
392            }
393            .in_scope(|_handle| async move {
394                Err::<i32, _>(AlienError::new(ErrorData::InvalidResourceUpdate { resource_id: "my_resource".to_string(), reason: "hummus".to_string() }))
395            })
396            .await;
397
398            assert!(result.is_err());
399            let err = result.err().unwrap();
400            assert!(matches!(&err.error, Some(ErrorData::InvalidResourceUpdate { resource_id, .. }) if resource_id == "my_resource"));
401
402            let changes = changes.lock().unwrap();
403            assert_eq!(changes.len(), 2); // Created (Started), StateChanged (Failed)
404
405            // Check final state change
406            match &changes[1] {
407                EventChange::StateChanged { new_state, .. } => match new_state {
408                    EventState::Failed { error } => {
409                        let error = error.as_ref().expect("Expected error to be present");
410                        assert_eq!(error.message, "Resource 'my_resource' cannot be updated: hummus")
411                    }
412                    _ => panic!("Expected Failed state"),
413                },
414                _ => panic!("Expected StateChanged to Failed"),
415            }
416        })
417        .await;
418    }
419
420    #[tokio::test]
421    async fn test_no_event_bus_context() {
422        // Try to emit an event without an event bus context
423        let result = AlienEvent::BuildingStack {
424            stack: "test".to_string(),
425        }
426        .emit()
427        .await;
428
429        // Should succeed with a no-op handle
430        assert!(result.is_ok());
431        let handle = result.unwrap();
432        assert!(handle.is_noop);
433    }
434}