Skip to main content

alien_core/events/
mod.rs

1//! # Alien Events System
2//!
3//! A Rust-based events system for user-facing events in Alien. This system is designed to handle
4//! events that can be consumed by dashboards, CLIs, and other monitoring tools to show users
5//! what's happening during builds, deployments, and other operations.
6//!
7//! ## Why Not Use Traces?
8//!
9//! This events system is **not** an alternative to traces - traces are for engineering observability
10//! and internal debugging, while this events module is about user-facing events that you show in UIs
11//! (either GUI or TUI). Key differences:
12//!
13//! - **Reliable User Flow**: Events are blocking (unlike non-blocking traces) because we want to
14//!   guarantee users see critical progress updates and fail fast if we can't communicate status
15//! - **Rich UI Components**: Events have strict schemas (see `AlienEvent` enum) enabling translation
16//!   to custom React components, progress bars, and interactive UI elements that show meaningful progress
17//! - **Flexible Granularity**: Unlike traces (spans + events), we only have events, with infinite
18//!   hierarchy and unique IDs. Users can choose their view: high-level "Building" → "Deploying" or
19//!   drill down to see "Building image", "Pushing image", etc. Each event can be scoped for perfect
20//!   user control over detail level
21//! - **Live Progress Updates**: Events can be updated with new information (e.g., "pushing image...
22//!   layer 1/5, layer 2/5, layer 3/5") enabling real-time progress indicators, which is impossible
23//!   with immutable traces
24//! - **Clear Success/Failure States**: Each scoped event has explicit states (Started, Success, Failed)
25//!   with detailed error information, giving users immediate feedback on what succeeded, what failed,
26//!   and exactly why
27//!
28//! ## Key Features
29//!
30//! - **Global Event Bus**: Events can be emitted from anywhere in the code without passing around
31//!   event bus instances, making it easy to add to large Rust workspaces with many crates.
32//! - **Hierarchical Events**: Events can have parent-child relationships for organizing complex operations.
33//! - **State Management**: Events can track their lifecycle (None, Started, Success, Failed).
34//! - **Durable Execution Support**: Designed to work with frameworks like Temporal, Inngest, and Restate
35//!   where processes can restart and state needs to be preserved externally.
36//! - **Change-based Architecture**: Instead of storing events in memory, the system emits changes
37//!   (Created, Updated, StateChanged) that handlers can persist or react to.
38//! - **Macro Support**: The `#[alien_event]` macro provides a convenient way to instrument functions
39//!   with events, similar to tracing's `#[instrument]` macro.
40//!
41//! ## Basic Usage
42//!
43//! ### Using the `#[alien_event]` Macro (Recommended)
44//!
45//! The easiest way to instrument functions with events is using the `#[alien_event]` macro:
46//!
47//! ```rust,ignore
48//! use alien_core::{AlienEvent, EventBus, alien_event, Result};
49//!
50//! #[alien_event(AlienEvent::BuildingStack { stack: "my-stack".to_string() })]
51//! async fn build_stack() -> Result<()> {
52//!     // All events emitted within this function will automatically be children
53//!     // of the BuildingStack event. The event will be marked as successful
54//!     // if the function returns Ok, or failed if it returns an error.
55//!     
56//!     AlienEvent::BuildingImage {
57//!         image: "api:latest".to_string(),
58//!     }
59//!     .emit()
60//!     .await?;
61//!     
62//!     Ok(())
63//! }
64//!
65//! # async fn example() -> Result<()> {
66//! let bus = EventBus::new();
67//! bus.run(|| async {
68//!     build_stack().await?;
69//!     Ok(())
70//! }).await?;
71//! # Ok(())
72//! # }
73//! ```
74//!
75//! ### Simple Event Emission
76//!
77//! For more control, you can emit events manually:
78//!
79//! ```rust
80//! use alien_core::{AlienEvent, EventBus};
81//!
82//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
83//! let bus = EventBus::new();
84//! bus.run(|| async {
85//!     // Emit a simple event
86//!     let handle = AlienEvent::BuildingStack {
87//!         stack: "my-stack".to_string(),
88//!     }
89//!     .emit()
90//!     .await?;
91//!     
92//!     println!("Emitted event with ID: {}", handle.id);
93//!     Ok::<_, Box<dyn std::error::Error>>(())
94//! }).await?;
95//! # Ok(())
96//! # }
97//! ```
98//!
99//! ### Event Updates
100//!
101//! ```rust
102//! use alien_core::{AlienEvent, EventBus};
103//!
104//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
105//! let bus = EventBus::new();
106//! bus.run(|| async {
107//!     // Emit an event and get a handle
108//!     let handle = AlienEvent::BuildingImage {
109//!         image: "api:latest".to_string(),
110//!     }
111//!     .emit()
112//!     .await?;
113//!     
114//!     // Update the event with new information
115//!     handle.update(AlienEvent::BuildingImage {
116//!         image: "api:latest-v2".to_string(),
117//!     }).await;
118//!     
119//!     // Mark as completed
120//!     handle.complete().await;
121//!     Ok::<_, Box<dyn std::error::Error>>(())
122//! }).await?;
123//! # Ok(())
124//! # }
125//! ```
126//!
127//! ### Scoped Events with Automatic State Management
128//!
129//! You can also use `in_scope` directly for more control over the event lifecycle:
130//!
131//! ```rust,ignore
132//! use alien_core::{AlienEvent, EventBus, ErrorData, Result};
133//! use alien_error::AlienError;
134//!
135//! # async fn example() -> Result<()> {
136//! let bus = EventBus::new();
137//! bus.run(|| async {
138//!     // Use in_scope for automatic success/failure tracking
139//!     // All events emitted within the scope automatically become children
140//!     let result = AlienEvent::BuildingStack {
141//!         stack: "my-stack".to_string(),
142//!     }
143//!     .in_scope(|_handle| async move {
144//!         // This event will automatically be a child of BuildingStack
145//!         AlienEvent::BuildingImage {
146//!             image: "api:latest".to_string(),
147//!         }
148//!         .emit()
149//!         .await?;
150//!         
151//!         // Do some work that might fail
152//!         std::fs::create_dir_all("/tmp/build")
153//!             .map_err(|e| AlienError::new(ErrorData::GenericError {
154//!                 message: e.to_string(),
155//!             }))?;
156//!         
157//!         // Return success
158//!         Ok::<_, AlienError<ErrorData>>(42)
159//!     })
160//!     .await?;
161//!     
162//!     println!("Operation completed with result: {}", result);
163//!     Ok(())
164//! }).await?;
165//! # Ok(())
166//! # }
167//! ```
168//!
169//! ### Event Hierarchy
170//!
171//! ```rust,ignore
172//! use alien_core::{AlienEvent, EventBus, Result};
173//!
174//! # async fn example() -> Result<()> {
175//! let bus = EventBus::new();
176//! bus.run(|| async {
177//!     let parent = AlienEvent::BuildingStack {
178//!         stack: "my-stack".to_string(),
179//!     }
180//!     .emit()
181//!     .await?;
182//!     
183//!     // Create a parent context for multiple child events
184//!     parent.as_parent(|_handle| async {
185//!         // All events emitted here will be children of the parent
186//!         AlienEvent::BuildingImage {
187//!             image: "api:latest".to_string(),
188//!         }
189//!         .emit()
190//!         .await?;
191//!         
192//!         AlienEvent::PushingImage {
193//!             image: "api:latest".to_string(),
194//!             progress: None,
195//!         }
196//!         .emit()
197//!         .await?;
198//!         
199//!         Ok(())
200//!     }).await?;
201//!     
202//!     // Complete the parent when all children are done
203//!     parent.complete().await;
204//!     Ok(())
205//! }).await?;
206//! # Ok(())
207//! # }
208//! ```
209//!
210//! ## Durable Execution Support
211//!
212//! The events system is designed to work with durable execution frameworks where processes
213//! can restart and lose in-memory state. Instead of storing events in memory, the system
214//! emits changes that external handlers can persist.
215//!
216//! ### Manual State Management for Durable Workflows
217//!
218//! ```rust,ignore
219//! use alien_core::{AlienEvent, EventBus, EventState, Result};
220//!
221//! # async fn example() -> Result<()> {
222//! let bus = EventBus::new();
223//! bus.run(|| async {
224//!     // In a durable execution framework like Temporal:
225//!     
226//!     // Step 1: Start a long-running operation
227//!     let parent_handle = AlienEvent::BuildingStack {
228//!         stack: "my-stack".to_string(),
229//!     }
230//!     .emit_with_state(EventState::Started)
231//!     .await?;
232//!     
233//!     // Step 2: Perform work across multiple durable steps
234//!     parent_handle.as_parent(|_handle| async {
235//!         // ctx.run(|| { ... }) - durable step 1
236//!         AlienEvent::BuildingImage {
237//!             image: "api:latest".to_string(),
238//!         }
239//!         .emit()
240//!         .await?;
241//!         
242//!         // ctx.run(|| { ... }) - durable step 2
243//!         AlienEvent::PushingImage {
244//!             image: "api:latest".to_string(),
245//!             progress: None,
246//!         }
247//!         .emit()
248//!         .await?;
249//!         
250//!         Ok(())
251//!     }).await?;
252//!     
253//!     // Step 3: Complete the operation
254//!     parent_handle.complete().await;
255//!     Ok(())
256//! }).await?;
257//! # Ok(())
258//! # }
259//! ```
260//!
261//! ## The `#[alien_event]` Macro
262//!
263//! The `#[alien_event]` macro provides a convenient way to instrument async functions with events,
264//! similar to how tracing's `#[instrument]` macro works for logging. It automatically:
265//!
266//! - Creates an event when the function starts (with `EventState::Started`)
267//! - Establishes a parent context so all events emitted within the function become children
268//! - Marks the event as successful (`EventState::Success`) if the function returns `Ok`
269//! - Marks the event as failed (`EventState::Failed`) if the function returns an `Err`
270//!
271//! ### Basic Usage
272//!
273//! ```rust,ignore
274//! use alien_core::{AlienEvent, alien_event, Result};
275//!
276//! #[alien_event(AlienEvent::BuildingStack { stack: "my-stack".to_string() })]
277//! async fn build_stack() -> Result<()> {
278//!     // Function implementation
279//!     Ok(())
280//! }
281//! ```
282//!
283//! ### Dynamic Values
284//!
285//! You can use function parameters and expressions in the event definition:
286//!
287//! ```rust,ignore
288//! use alien_core::{AlienEvent, alien_event, Result};
289//!
290//! #[alien_event(AlienEvent::BuildingStack { stack: format!("stack-{}", stack_id) })]
291//! async fn build_dynamic_stack(stack_id: u32) -> Result<()> {
292//!     // Function implementation
293//!     Ok(())
294//! }
295//! ```
296//!
297//! ### Comparison with Manual Event Management
298//!
299//! The macro transforms this:
300//!
301//! ```rust,ignore
302//! #[alien_event(AlienEvent::BuildingStack { stack: "my-stack".to_string() })]
303//! async fn build_stack() -> Result<()> {
304//!     // function body
305//!     Ok(())
306//! }
307//! ```
308//!
309//! Into this:
310//!
311//! ```rust,ignore
312//! async fn build_stack() -> Result<()> {
313//!     AlienEvent::BuildingStack { stack: "my-stack".to_string() }
314//!         .in_scope(|_event_handle| async move {
315//!             // function body
316//!             Ok(())
317//!         })
318//!         .await
319//! }
320//! ```
321//!
322//! ### Limitations
323//!
324//! - The macro only works with `async` functions
325//! - For sync functions, use `AlienEvent::emit()` manually
326//! - The event expression is evaluated when the function is called, not when the macro is expanded
327//!
328//! ## Event Handlers
329//!
330//! Event handlers receive changes and can persist them, update UIs, or trigger other actions:
331//!
332//! ```rust
333//! use alien_core::{EventHandler, EventChange, EventBus, AlienEvent};
334//! use async_trait::async_trait;
335//!
336//! struct PostgresEventHandler {
337//!     // database connection pool, etc.
338//! }
339//!
340//! #[async_trait]
341//! impl EventHandler for PostgresEventHandler {
342//!     async fn on_event_change(&self, change: EventChange) -> alien_core::Result<()> {
343//!         match change {
344//!             EventChange::Created { id, parent_id, created_at, event, state } => {
345//!                 // Insert new event record into database
346//!                 println!("Creating event {} with parent {:?}", id, parent_id);
347//!             }
348//!             EventChange::Updated { id, updated_at, event } => {
349//!                 // Update event data in database
350//!                 println!("Updating event {} at {}", id, updated_at);
351//!             }
352//!             EventChange::StateChanged { id, updated_at, new_state } => {
353//!                 // Update event state in database
354//!                 println!("Event {} state changed to {:?} at {}", id, new_state, updated_at);
355//!             }
356//!         }
357//!         Ok(())
358//!     }
359//! }
360//!
361//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
362//! let handler = std::sync::Arc::new(PostgresEventHandler {});
363//! let bus = EventBus::with_handlers(vec![handler]);
364//!
365//! bus.run(|| async {
366//!     // Events will now be persisted to PostgreSQL
367//!     AlienEvent::BuildingStack {
368//!         stack: "my-stack".to_string(),
369//!     }
370//!     .emit()
371//!     .await?;
372//!     Ok::<_, Box<dyn std::error::Error>>(())
373//! }).await?;
374//! # Ok(())
375//! # }
376//! ```
377//!
378//! ## Architecture Notes
379//!
380//! ### Change-Based Design
381//!
382//! Unlike traditional event systems that store complete event state, this system emits
383//! incremental changes:
384//!
385//! - `EventChange::Created`: A new event was created with initial data and state
386//! - `EventChange::Updated`: An event's data was updated
387//! - `EventChange::StateChanged`: An event's state transitioned (None → Started → Success/Failed)
388//!
389//! This design is crucial for durable execution where the event bus itself doesn't persist
390//! state across process restarts.
391//!
392//! ### Task-Local Context
393//!
394//! The event bus uses Tokio's task-local storage to provide a global context without
395//! requiring explicit parameter passing. This makes it easy to add event emission to
396//! existing codebases. The `#[alien_event]` macro leverages this design to provide
397//! seamless instrumentation without requiring changes to function signatures.
398//!
399//! ### Error Handling
400//!
401//! Event emission is designed to be non-blocking and fault-tolerant. If no event bus
402//! context is available, operations will return `EventBusError::NoEventBusContext`
403//! but won't panic, allowing code to continue running even without event tracking.
404
405mod event;
406pub use event::*;
407
408mod handler;
409pub use handler::*;
410
411mod state;
412pub use state::*;
413
414mod handle;
415pub use handle::*;
416
417mod bus;
418pub use bus::*;
419
420#[cfg(test)]
421mod tests {
422    use crate::events::{AlienEvent, EventBus, EventChange, EventHandler, EventState};
423    use crate::{ErrorData, Result};
424    use alien_error::{AlienError, GenericError};
425    use async_trait::async_trait;
426    use insta::assert_debug_snapshot;
427    use rstest::*;
428    use std::sync::{Arc, Mutex};
429
430    /// Test event handler that captures all events for testing
431    #[derive(Debug, Clone)]
432    struct TestEventHandler {
433        events: Arc<Mutex<Vec<EventChange>>>,
434    }
435
436    impl TestEventHandler {
437        fn new() -> Self {
438            Self {
439                events: Arc::new(Mutex::new(Vec::new())),
440            }
441        }
442
443        fn events(&self) -> Vec<EventChange> {
444            self.events.lock().unwrap().clone()
445        }
446
447        #[allow(dead_code)]
448        fn clear(&self) {
449            self.events.lock().unwrap().clear();
450        }
451    }
452
453    #[async_trait]
454    impl EventHandler for TestEventHandler {
455        async fn on_event_change(&self, change: EventChange) -> Result<()> {
456            self.events.lock().unwrap().push(change);
457            Ok(())
458        }
459    }
460
461    /// Helper to run tests with event bus context
462    async fn with_test_bus<F, Fut, R>(f: F) -> (R, TestEventHandler)
463    where
464        F: FnOnce() -> Fut,
465        Fut: std::future::Future<Output = R>,
466    {
467        let handler = TestEventHandler::new();
468        let bus = EventBus::with_handlers(vec![Arc::new(handler.clone())]);
469        let result = bus.run(f).await;
470        (result, handler)
471    }
472
473    #[tokio::test]
474    async fn test_simple_event_emission() {
475        let (result, handler) = with_test_bus(|| async {
476            let handle = AlienEvent::TestBuildingStack {
477                stack: "test-stack".to_string(),
478            }
479            .emit()
480            .await
481            .unwrap();
482
483            assert!(!handle.id.is_empty());
484            handle
485        })
486        .await;
487
488        let events = handler.events();
489        assert_eq!(events.len(), 1);
490
491        match &events[0] {
492            EventChange::Created {
493                id,
494                parent_id,
495                event,
496                state,
497                ..
498            } => {
499                assert_eq!(id, &result.id);
500                assert_eq!(parent_id, &None);
501                assert!(
502                    matches!(event, AlienEvent::TestBuildingStack { stack } if stack == "test-stack")
503                );
504                assert_eq!(state, &EventState::None);
505            }
506            _ => panic!("Expected Created event"),
507        }
508    }
509
510    #[tokio::test]
511    async fn test_event_update() {
512        let (_, handler) = with_test_bus(|| async {
513            let handle = AlienEvent::TestBuildImage {
514                image: "api:latest".to_string(),
515                stage: "stage 1".to_string(),
516            }
517            .emit()
518            .await
519            .unwrap();
520
521            handle
522                .update(AlienEvent::TestBuildImage {
523                    image: "api:latest".to_string(),
524                    stage: "stage 2".to_string(),
525                })
526                .await
527                .unwrap();
528
529            handle
530                .update(AlienEvent::TestBuildImage {
531                    image: "api:latest".to_string(),
532                    stage: "stage 3".to_string(),
533                })
534                .await
535                .unwrap();
536        })
537        .await;
538
539        let events = handler.events();
540        assert_eq!(events.len(), 3); // 1 create + 2 updates
541
542        // Check updates
543        match &events[1] {
544            EventChange::Updated { event, .. } => {
545                assert!(
546                    matches!(event, AlienEvent::TestBuildImage { stage, .. } if stage == "stage 2")
547                );
548            }
549            _ => panic!("Expected Updated event"),
550        }
551
552        match &events[2] {
553            EventChange::Updated { event, .. } => {
554                assert!(
555                    matches!(event, AlienEvent::TestBuildImage { stage, .. } if stage == "stage 3")
556                );
557            }
558            _ => panic!("Expected Updated event"),
559        }
560    }
561
562    #[tokio::test]
563    async fn test_scoped_event_success() {
564        let (result, handler) = with_test_bus(|| async {
565            AlienEvent::TestBuildingImage {
566                image: "api:latest".to_string(),
567            }
568            .in_scope(|_handle| async move {
569                // Emit child events
570                AlienEvent::TestBuildImage {
571                    image: "api:latest".to_string(),
572                    stage: "compile".to_string(),
573                }
574                .emit()
575                .await
576                .unwrap();
577
578                AlienEvent::TestBuildImage {
579                    image: "api:latest".to_string(),
580                    stage: "link".to_string(),
581                }
582                .emit()
583                .await
584                .unwrap();
585
586                Ok::<_, AlienError<ErrorData>>(42)
587            })
588            .await
589            .unwrap()
590        })
591        .await;
592
593        assert_eq!(result, 42);
594
595        let events = handler.events();
596
597        // Should have: parent created (started), 2 children created, parent state changed to success
598        assert!(events.len() >= 4);
599
600        // Verify parent started
601        match &events[0] {
602            EventChange::Created { state, .. } => {
603                assert_eq!(state, &EventState::Started);
604            }
605            _ => panic!("Expected Created event"),
606        }
607
608        // Verify parent completed successfully
609        let last_event = events.last().unwrap();
610        match last_event {
611            EventChange::StateChanged { new_state, .. } => {
612                assert_eq!(new_state, &EventState::Success);
613            }
614            _ => panic!("Expected StateChanged event"),
615        }
616    }
617
618    #[tokio::test]
619    async fn test_scoped_event_failure() {
620        let (result, handler) = with_test_bus(|| async {
621            AlienEvent::TestBuildingImage {
622                image: "api:latest".to_string(),
623            }
624            .in_scope(|_handle| async move {
625                // Emit a child event
626                AlienEvent::TestBuildImage {
627                    image: "api:latest".to_string(),
628                    stage: "compile".to_string(),
629                }
630                .emit()
631                .await
632                .unwrap();
633
634                // Then fail
635                Err::<(), _>(AlienError::new(ErrorData::GenericError {
636                    message: "Test error".to_string(),
637                }))
638            })
639            .await
640        })
641        .await;
642
643        assert!(result.is_err());
644
645        let events = handler.events();
646
647        // Verify parent failed
648        let last_event = events.last().unwrap();
649        match last_event {
650            EventChange::StateChanged { new_state, .. } => {
651                assert!(matches!(new_state, EventState::Failed { .. }));
652            }
653            _ => panic!("Expected StateChanged event"),
654        }
655    }
656
657    #[tokio::test]
658    async fn test_deep_hierarchy() {
659        let (_, handler) = with_test_bus(|| async {
660            AlienEvent::TestBuildingStack {
661                stack: "root-stack".to_string(),
662            }
663            .in_scope(|_| async {
664                // Level 1
665                AlienEvent::TestBuildingImage {
666                    image: "app1".to_string(),
667                }
668                .in_scope(|_| async {
669                    // Level 2
670                    AlienEvent::TestBuildImage {
671                        image: "app1".to_string(),
672                        stage: "compile".to_string(),
673                    }
674                    .in_scope(|_| async {
675                        // Level 3
676                        AlienEvent::TestBuildImage {
677                            image: "app1".to_string(),
678                            stage: "optimize".to_string(),
679                        }
680                        .emit()
681                        .await
682                        .unwrap();
683                        Ok::<_, AlienError<ErrorData>>(())
684                    })
685                    .await
686                    .unwrap();
687                    Ok::<_, AlienError<ErrorData>>(())
688                })
689                .await
690                .unwrap();
691
692                // Another branch at level 1
693                AlienEvent::TestBuildingImage {
694                    image: "app2".to_string(),
695                }
696                .emit()
697                .await
698                .unwrap();
699
700                Ok::<_, AlienError<ErrorData>>(())
701            })
702            .await
703            .unwrap()
704        })
705        .await;
706
707        // Create a hierarchy representation for snapshot testing
708        let events = handler.events();
709        let mut hierarchy = Vec::new();
710        let mut id_map = std::collections::HashMap::new();
711        let mut counter = 0;
712
713        for event in &events {
714            match event {
715                EventChange::Created {
716                    id,
717                    parent_id,
718                    event,
719                    ..
720                } => {
721                    // Map real IDs to stable IDs for snapshot testing
722                    let stable_id = id_map
723                        .entry(id.clone())
724                        .or_insert_with(|| {
725                            counter += 1;
726                            format!("event-{}", counter)
727                        })
728                        .clone();
729
730                    let stable_parent_id = parent_id.as_ref().map(|p| {
731                        id_map
732                            .entry(p.clone())
733                            .or_insert_with(|| {
734                                counter += 1;
735                                format!("event-{}", counter)
736                            })
737                            .clone()
738                    });
739
740                    hierarchy.push((stable_id, stable_parent_id, format!("{:?}", event)));
741                }
742                _ => {}
743            }
744        }
745
746        assert_debug_snapshot!(hierarchy);
747    }
748
749    #[tokio::test]
750    async fn test_wide_hierarchy() {
751        let (_, handler) = with_test_bus(|| async {
752            let parent = AlienEvent::TestBuildingStack {
753                stack: "wide-stack".to_string(),
754            }
755            .emit()
756            .await
757            .unwrap();
758
759            parent
760                .as_parent(|_| async {
761                    // Emit many child events
762                    for i in 0..10 {
763                        AlienEvent::TestBuildImage {
764                            image: format!("image-{}", i),
765                            stage: "build".to_string(),
766                        }
767                        .emit()
768                        .await
769                        .unwrap();
770                    }
771                    Ok::<_, ErrorData>(())
772                })
773                .await
774                .unwrap();
775
776            parent.complete().await.unwrap();
777        })
778        .await;
779
780        let events = handler.events();
781
782        // Count children
783        let children_count = events
784            .iter()
785            .filter(|e| {
786                matches!(
787                    e,
788                    EventChange::Created {
789                        parent_id: Some(_),
790                        ..
791                    }
792                )
793            })
794            .count();
795
796        assert_eq!(children_count, 10);
797    }
798
799    #[tokio::test]
800    async fn test_durable_execution_simulation() {
801        // Simulate a durable execution with multiple steps
802        let (_, handler) = with_test_bus(|| async {
803            // Step 1: Start parent event
804            let parent_handle = AlienEvent::TestBuildingStack {
805                stack: "durable-stack".to_string(),
806            }
807            .emit_with_state(EventState::Started)
808            .await
809            .unwrap();
810
811            // Simulate ctx.run() boundaries
812            let parent_id = parent_handle.id.clone();
813
814            // Step 2: First child in separate "execution"
815            parent_handle
816                .as_parent(|_| async {
817                    AlienEvent::TestBuildImage {
818                        image: "api:latest".to_string(),
819                        stage: "compile".to_string(),
820                    }
821                    .emit()
822                    .await
823                    .unwrap();
824                    Ok::<_, ErrorData>(())
825                })
826                .await
827                .unwrap();
828
829            // Step 3: Second child in separate "execution"
830            parent_handle
831                .as_parent(|_| async {
832                    AlienEvent::TestPushImage {
833                        image: "api:latest".to_string(),
834                    }
835                    .emit()
836                    .await
837                    .unwrap();
838                    Ok::<_, ErrorData>(())
839                })
840                .await
841                .unwrap();
842
843            // Step 4: Complete parent
844            parent_handle.complete().await.unwrap();
845
846            parent_id
847        })
848        .await;
849
850        let events = handler.events();
851
852        // Verify sequence of events
853        assert!(events.len() >= 4); // parent created, 2 children, parent completed
854
855        // Verify all events are properly linked
856        let created_events: Vec<_> = events
857            .iter()
858            .filter_map(|e| match e {
859                EventChange::Created { id, parent_id, .. } => Some((id.clone(), parent_id.clone())),
860                _ => None,
861            })
862            .collect();
863
864        assert_eq!(created_events.len(), 3); // 1 parent + 2 children
865    }
866
867    #[tokio::test]
868    async fn test_manual_state_management() {
869        let (_, handler) = with_test_bus(|| async {
870            let handle = AlienEvent::TestBuildingStack {
871                stack: "manual-stack".to_string(),
872            }
873            .emit_with_state(EventState::Started)
874            .await
875            .unwrap();
876
877            // Do some work...
878
879            // Manually fail the event
880            handle
881                .fail(AlienError::new(GenericError {
882                    message: "Something went wrong".to_string(),
883                }))
884                .await
885                .unwrap();
886        })
887        .await;
888
889        let events = handler.events();
890
891        // Check initial state
892        match &events[0] {
893            EventChange::Created { state, .. } => {
894                assert_eq!(state, &EventState::Started);
895            }
896            _ => panic!("Expected Created event"),
897        }
898
899        // Check failure
900        match &events[1] {
901            EventChange::StateChanged { new_state, .. } => match new_state {
902                EventState::Failed { error } => {
903                    let error = error.as_ref().expect("Expected error to be present");
904                    assert_eq!(error.message, "Something went wrong");
905                }
906                _ => panic!("Expected Failed state"),
907            },
908            _ => panic!("Expected StateChanged event"),
909        }
910    }
911
912    #[tokio::test]
913    async fn test_concurrent_events() {
914        let (_, handler) = with_test_bus(|| async {
915            let handles = tokio::join!(
916                AlienEvent::TestBuildImage {
917                    image: "image1".to_string(),
918                    stage: "build".to_string(),
919                }
920                .emit(),
921                AlienEvent::TestBuildImage {
922                    image: "image2".to_string(),
923                    stage: "build".to_string(),
924                }
925                .emit(),
926                AlienEvent::TestBuildImage {
927                    image: "image3".to_string(),
928                    stage: "build".to_string(),
929                }
930                .emit(),
931            );
932
933            assert!(handles.0.is_ok());
934            assert!(handles.1.is_ok());
935            assert!(handles.2.is_ok());
936        })
937        .await;
938
939        let events = handler.events();
940        assert_eq!(events.len(), 3);
941    }
942
943    #[tokio::test]
944    async fn test_nested_scopes_with_errors() {
945        let (result, handler) = with_test_bus(|| async {
946            AlienEvent::TestBuildingStack {
947                stack: "nested-error-stack".to_string(),
948            }
949            .in_scope(|_| async {
950                // This should succeed
951                AlienEvent::TestBuildImage {
952                    image: "image1".to_string(),
953                    stage: "stage1".to_string(),
954                }
955                .emit()
956                .await
957                .unwrap();
958
959                // This scope should fail
960                let inner_result = AlienEvent::TestBuildImage {
961                    image: "image2".to_string(),
962                    stage: "stage2".to_string(),
963                }
964                .in_scope(|_| async {
965                    Err::<(), _>(AlienError::new(ErrorData::GenericError {
966                        message: "Inner error".to_string(),
967                    }))
968                })
969                .await;
970
971                // Continue despite inner failure
972                assert!(inner_result.is_err());
973
974                Ok::<_, AlienError<ErrorData>>("Outer succeeded")
975            })
976            .await
977        })
978        .await;
979
980        assert!(result.is_ok());
981        assert_eq!(result.unwrap(), "Outer succeeded");
982
983        // Verify mixed success/failure states
984        let state_changes: Vec<_> = handler
985            .events()
986            .iter()
987            .filter_map(|e| match e {
988                EventChange::StateChanged { id, new_state, .. } => {
989                    Some((id.clone(), new_state.clone()))
990                }
991                _ => None,
992            })
993            .collect();
994
995        assert!(state_changes
996            .iter()
997            .any(|(_, state)| matches!(state, EventState::Failed { .. })));
998        assert!(state_changes
999            .iter()
1000            .any(|(_, state)| matches!(state, EventState::Success)));
1001    }
1002
1003    #[tokio::test]
1004    async fn test_no_event_bus_context() {
1005        // Test behavior when no event bus is present
1006        let result = AlienEvent::TestBuildingStack {
1007            stack: "no-context".to_string(),
1008        }
1009        .emit()
1010        .await;
1011
1012        // Should succeed with a no-op handle
1013        assert!(result.is_ok());
1014        let handle = result.unwrap();
1015        assert!(handle.is_noop);
1016    }
1017
1018    #[rstest]
1019    #[case("stack1", "image1", "stage1")]
1020    #[case("stack2", "image2", "stage2")]
1021    #[case("stack3", "image3", "stage3")]
1022    #[tokio::test]
1023    async fn test_parameterized_events(
1024        #[case] stack: &str,
1025        #[case] image: &str,
1026        #[case] stage: &str,
1027    ) {
1028        let (_, handler) = with_test_bus(|| async {
1029            AlienEvent::TestBuildingStack {
1030                stack: stack.to_string(),
1031            }
1032            .in_scope(|_| async move {
1033                AlienEvent::TestBuildImage {
1034                    image: image.to_string(),
1035                    stage: stage.to_string(),
1036                }
1037                .emit()
1038                .await
1039                .unwrap();
1040                Ok::<_, AlienError<ErrorData>>(())
1041            })
1042            .await
1043            .unwrap()
1044        })
1045        .await;
1046
1047        let events = handler.events();
1048        assert!(events.len() >= 3); // parent created, child created, parent success
1049    }
1050
1051    #[tokio::test]
1052    async fn test_complex_workflow_snapshot() {
1053        let (_, handler) = with_test_bus(|| async {
1054            // Simulate a complex deployment workflow
1055            AlienEvent::TestBuildingStack {
1056                stack: "production-stack".to_string(),
1057            }
1058            .in_scope(|_| async {
1059                // Build phase
1060                AlienEvent::TestBuildingImage {
1061                    image: "api-service".to_string(),
1062                }
1063                .in_scope(|_| async {
1064                    for stage in ["download-deps", "compile", "optimize", "build"] {
1065                        AlienEvent::TestBuildImage {
1066                            image: "api-service".to_string(),
1067                            stage: stage.to_string(),
1068                        }
1069                        .emit()
1070                        .await
1071                        .unwrap();
1072                    }
1073                    Ok::<_, AlienError<ErrorData>>(())
1074                })
1075                .await
1076                .unwrap();
1077
1078                // Push phase
1079                AlienEvent::TestPushImage {
1080                    image: "api-service".to_string(),
1081                }
1082                .emit()
1083                .await
1084                .unwrap();
1085
1086                // Deploy phase
1087                AlienEvent::TestDeployingStack {
1088                    stack: "api-service".to_string(),
1089                }
1090                .in_scope(|_| async {
1091                    AlienEvent::TestCreatingResource {
1092                        resource_type: "LoadBalancer".to_string(),
1093                        resource_name: "api-lb".to_string(),
1094                        details: Some("Updating target groups".to_string()),
1095                    }
1096                    .emit()
1097                    .await
1098                    .unwrap();
1099
1100                    AlienEvent::TestPerformingHealthCheck {
1101                        target: "https://api.example.com/health".to_string(),
1102                        check_type: "HTTP".to_string(),
1103                    }
1104                    .emit()
1105                    .await
1106                    .unwrap();
1107
1108                    Ok::<_, AlienError<ErrorData>>(())
1109                })
1110                .await
1111                .unwrap();
1112
1113                Ok::<_, AlienError<ErrorData>>(())
1114            })
1115            .await
1116            .unwrap()
1117        })
1118        .await;
1119
1120        // Create a structured view of all events for snapshot
1121        let events = handler.events();
1122        let mut id_map = std::collections::HashMap::new();
1123        let mut counter = 0;
1124
1125        let snapshot_data: Vec<_> = events
1126            .iter()
1127            .map(|e| match e {
1128                EventChange::Created {
1129                    id,
1130                    parent_id,
1131                    event,
1132                    state,
1133                    ..
1134                } => {
1135                    let stable_id = id_map
1136                        .entry(id.clone())
1137                        .or_insert_with(|| {
1138                            counter += 1;
1139                            format!("event-{}", counter)
1140                        })
1141                        .clone();
1142
1143                    let stable_parent_id = parent_id.as_ref().map(|p| {
1144                        id_map
1145                            .entry(p.clone())
1146                            .or_insert_with(|| {
1147                                counter += 1;
1148                                format!("event-{}", counter)
1149                            })
1150                            .clone()
1151                    });
1152
1153                    format!(
1154                        "Created: id={}, parent={:?}, event={:?}, state={:?}",
1155                        stable_id, stable_parent_id, event, state
1156                    )
1157                }
1158                EventChange::Updated { id, event, .. } => {
1159                    let stable_id = id_map
1160                        .entry(id.clone())
1161                        .or_insert_with(|| {
1162                            counter += 1;
1163                            format!("event-{}", counter)
1164                        })
1165                        .clone();
1166
1167                    format!("Updated: id={}, event={:?}", stable_id, event)
1168                }
1169                EventChange::StateChanged { id, new_state, .. } => {
1170                    let stable_id = id_map
1171                        .entry(id.clone())
1172                        .or_insert_with(|| {
1173                            counter += 1;
1174                            format!("event-{}", counter)
1175                        })
1176                        .clone();
1177
1178                    format!("StateChanged: id={}, new_state={:?}", stable_id, new_state)
1179                }
1180            })
1181            .collect();
1182
1183        assert_debug_snapshot!(snapshot_data);
1184    }
1185
1186    #[tokio::test]
1187    async fn test_multi_tenancy_with_http_server() {
1188        use axum::{http::header::HeaderMap, http::StatusCode, response::Response};
1189        use futures::future::join_all;
1190        use std::collections::HashMap;
1191        use tokio::sync::mpsc;
1192
1193        // Create a channel to collect all events from all tenants
1194        let (event_tx, mut event_rx) = mpsc::unbounded_channel::<(String, EventChange)>();
1195
1196        // Create a test handler that captures events with tenant context
1197        #[derive(Clone)]
1198        struct TenantAwareEventHandler {
1199            tenant_id: String,
1200            sender: mpsc::UnboundedSender<(String, EventChange)>,
1201        }
1202
1203        #[async_trait]
1204        impl EventHandler for TenantAwareEventHandler {
1205            async fn on_event_change(&self, change: EventChange) -> Result<()> {
1206                let _ = self.sender.send((self.tenant_id.clone(), change));
1207                Ok(())
1208            }
1209        }
1210
1211        // Helper function to simulate business logic that emits events
1212        let business_logic = {
1213            let event_tx = event_tx.clone();
1214            move |tenant_id: String| {
1215                let event_tx = event_tx.clone();
1216                async move {
1217                    // Create a handler specific to this tenant
1218                    let handler = Arc::new(TenantAwareEventHandler {
1219                        tenant_id: tenant_id.clone(),
1220                        sender: event_tx,
1221                    });
1222
1223                    // Create event bus for this tenant
1224                    let bus = EventBus::with_handlers(vec![handler]);
1225
1226                    bus.run(|| async {
1227                        // Main scoped event
1228                        AlienEvent::TestBuildingStack {
1229                            stack: format!("tenant-{}-stack", tenant_id),
1230                        }
1231                        .in_scope(|_handle| async move {
1232                            // First nested event
1233                            AlienEvent::TestBuildImage {
1234                                image: format!("tenant-{}-api", tenant_id),
1235                                stage: "compile".to_string(),
1236                            }
1237                            .emit()
1238                            .await
1239                            .map_err(|e| {
1240                                AlienError::new(ErrorData::GenericError {
1241                                    message: e.to_string(),
1242                                })
1243                            })?;
1244
1245                            // Call another function that emits more events
1246                            deploy_service(&tenant_id).await?;
1247
1248                            Ok::<_, AlienError<ErrorData>>(format!(
1249                                "Success for tenant {}",
1250                                tenant_id
1251                            ))
1252                        })
1253                        .await
1254                    })
1255                    .await
1256                }
1257            }
1258        };
1259
1260        // Another function that emits events
1261        async fn deploy_service(tenant_id: &str) -> Result<()> {
1262            AlienEvent::TestDeployingStack {
1263                stack: format!("tenant-{}-deployment", tenant_id),
1264            }
1265            .emit()
1266            .await?;
1267
1268            AlienEvent::TestCreatingResource {
1269                resource_type: "LoadBalancer".to_string(),
1270                resource_name: format!("tenant-{}-lb", tenant_id),
1271                details: Some("Multi-tenant load balancer".to_string()),
1272            }
1273            .emit()
1274            .await?;
1275
1276            Ok(())
1277        }
1278
1279        // Create HTTP endpoint handler
1280        let hello_handler = {
1281            let business_logic = business_logic.clone();
1282            move |headers: HeaderMap| {
1283                let business_logic = business_logic.clone();
1284                async move {
1285                    let tenant_id = headers
1286                        .get("x-tenant-id")
1287                        .and_then(|v| v.to_str().ok())
1288                        .unwrap_or("default")
1289                        .to_string();
1290
1291                    match business_logic(tenant_id.clone()).await {
1292                        Ok(result) => Response::builder()
1293                            .status(StatusCode::OK)
1294                            .body(format!(
1295                                "Hello from tenant {}! Result: {}",
1296                                tenant_id, result
1297                            ))
1298                            .unwrap(),
1299                        Err(e) => Response::builder()
1300                            .status(StatusCode::INTERNAL_SERVER_ERROR)
1301                            .body(format!("Error for tenant {}: {}", tenant_id, e))
1302                            .unwrap(),
1303                    }
1304                }
1305            }
1306        };
1307
1308        // Smoke-test the handler without binding a socket (CI environments can forbid it).
1309        let mut headers = HeaderMap::new();
1310        headers.insert("x-tenant-id", "tenant-0000".parse().unwrap());
1311        let _ = hello_handler(headers).await;
1312
1313        // Create 10000 concurrent simulated requests with unique tenant IDs
1314        // Instead of actual HTTP requests, we'll directly call the business logic
1315        let mut request_futures = Vec::new();
1316
1317        for i in 0..10000 {
1318            let tenant_id = format!("tenant-{:04}", i);
1319            let business_logic = business_logic.clone();
1320
1321            let request_future = async move {
1322                match business_logic(tenant_id.clone()).await {
1323                    Ok(_) => tenant_id,
1324                    Err(e) => panic!("Request failed for tenant {}: {}", tenant_id, e),
1325                }
1326            };
1327
1328            request_futures.push(request_future);
1329        }
1330
1331        // Execute all requests concurrently
1332        let completed_tenants = join_all(request_futures).await;
1333        assert_eq!(completed_tenants.len(), 10000);
1334
1335        // Collect all events
1336        let mut all_events: Vec<(String, EventChange)> = Vec::new();
1337
1338        // Give some time for all events to be processed
1339        tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
1340
1341        // Drain the event receiver
1342        while let Ok(event) = event_rx.try_recv() {
1343            all_events.push(event);
1344        }
1345
1346        // Verify we received events
1347        assert!(!all_events.is_empty(), "No events were received");
1348
1349        // Group events by tenant
1350        let mut events_by_tenant: HashMap<String, Vec<EventChange>> = HashMap::new();
1351        for (tenant_id, event) in all_events.clone() {
1352            events_by_tenant.entry(tenant_id).or_default().push(event);
1353        }
1354
1355        // Verify we have events for all tenants
1356        assert_eq!(
1357            events_by_tenant.len(),
1358            10000,
1359            "Expected events for 10000 tenants, got {}",
1360            events_by_tenant.len()
1361        );
1362
1363        // Verify each tenant has the expected number of events
1364        for (tenant_id, events) in &events_by_tenant {
1365            // Each tenant should have:
1366            // 1. BuildingStack (created + state changed to started + state changed to success)
1367            // 2. BuildImage (created)
1368            // 3. DeployingStack (created)
1369            // 4. CreatingResource (created)
1370            // Total: 6 events minimum
1371            assert!(
1372                events.len() >= 4,
1373                "Tenant {} has {} events, expected at least 4",
1374                tenant_id,
1375                events.len()
1376            );
1377
1378            // Verify tenant-specific data in events
1379            for event in events {
1380                if let EventChange::Created {
1381                    event: alien_event, ..
1382                } = event
1383                {
1384                    match alien_event {
1385                        AlienEvent::TestBuildingStack { stack } => {
1386                            assert!(
1387                                stack.contains(tenant_id),
1388                                "Stack name '{}' should contain tenant ID '{}'",
1389                                stack,
1390                                tenant_id
1391                            );
1392                        }
1393                        AlienEvent::TestBuildImage { image, .. } => {
1394                            assert!(
1395                                image.contains(tenant_id),
1396                                "Image name '{}' should contain tenant ID '{}'",
1397                                image,
1398                                tenant_id
1399                            );
1400                        }
1401                        AlienEvent::TestDeployingStack { stack } => {
1402                            assert!(
1403                                stack.contains(tenant_id),
1404                                "Deployment stack '{}' should contain tenant ID '{}'",
1405                                stack,
1406                                tenant_id
1407                            );
1408                        }
1409                        AlienEvent::TestCreatingResource { resource_name, .. } => {
1410                            assert!(
1411                                resource_name.contains(tenant_id),
1412                                "Resource name '{}' should contain tenant ID '{}'",
1413                                resource_name,
1414                                tenant_id
1415                            );
1416                        }
1417                        _ => {}
1418                    }
1419                }
1420            }
1421        }
1422
1423        // Verify event hierarchy for a sample tenant
1424        let sample_tenant = "tenant-0000";
1425        let sample_events = events_by_tenant.get(sample_tenant).unwrap();
1426
1427        // Find the parent BuildingStack event
1428        let parent_event = sample_events
1429            .iter()
1430            .find(|e| {
1431                matches!(
1432                    e,
1433                    EventChange::Created {
1434                        event: AlienEvent::TestBuildingStack { .. },
1435                        ..
1436                    }
1437                )
1438            })
1439            .expect("Should have TestBuildingStack event");
1440
1441        if let EventChange::Created { id: parent_id, .. } = parent_event {
1442            // Verify child events reference the parent
1443            let child_events: Vec<_> = sample_events.iter().filter(|e| {
1444                matches!(e, EventChange::Created { parent_id: Some(pid), .. } if pid == parent_id)
1445            }).collect();
1446
1447            assert!(
1448                !child_events.is_empty(),
1449                "Should have child events for parent {}",
1450                parent_id
1451            );
1452        }
1453
1454        println!("✅ Multi-tenancy test passed!");
1455        println!("   - Processed 10000 concurrent requests");
1456        println!("   - Verified {} unique tenants", events_by_tenant.len());
1457        println!("   - Total events captured: {}", all_events.len());
1458        println!(
1459            "   - Average events per tenant: {:.1}",
1460            all_events.len() as f64 / events_by_tenant.len() as f64
1461        );
1462    }
1463
1464    #[tokio::test]
1465    async fn test_handler_failure() {
1466        // Create a handler that always fails
1467        struct FailingHandler;
1468
1469        #[async_trait]
1470        impl EventHandler for FailingHandler {
1471            async fn on_event_change(&self, _change: EventChange) -> Result<()> {
1472                Err(AlienError::new(ErrorData::GenericError {
1473                    message: "Handler intentionally failed".to_string(),
1474                }))
1475            }
1476        }
1477
1478        let failing_handler = Arc::new(FailingHandler);
1479        let bus = EventBus::with_handlers(vec![failing_handler]);
1480
1481        let result = bus
1482            .run(|| async {
1483                // Try to emit an event - this should fail because the handler fails
1484                AlienEvent::TestBuildingStack {
1485                    stack: "test-stack".to_string(),
1486                }
1487                .emit()
1488                .await
1489            })
1490            .await;
1491
1492        // Verify that the event emission failed due to handler failure
1493        assert!(result.is_err());
1494        let error = result.unwrap_err();
1495        assert!(error.to_string().contains("Event handler failed"));
1496        assert!(error.to_string().contains("Handler intentionally failed"));
1497    }
1498
1499    #[tokio::test]
1500    async fn test_mixed_handlers_one_fails() {
1501        // Create one successful handler and one failing handler
1502        let successful_handler = TestEventHandler::new();
1503
1504        struct FailingHandler;
1505
1506        #[async_trait]
1507        impl EventHandler for FailingHandler {
1508            async fn on_event_change(&self, _change: EventChange) -> Result<()> {
1509                Err(AlienError::new(ErrorData::GenericError {
1510                    message: "Second handler failed".to_string(),
1511                }))
1512            }
1513        }
1514
1515        let failing_handler = Arc::new(FailingHandler);
1516        let bus =
1517            EventBus::with_handlers(vec![Arc::new(successful_handler.clone()), failing_handler]);
1518
1519        let result = bus
1520            .run(|| async {
1521                // Try to emit an event - this should fail because one handler fails
1522                AlienEvent::TestBuildingStack {
1523                    stack: "test-stack".to_string(),
1524                }
1525                .emit()
1526                .await
1527            })
1528            .await;
1529
1530        // Verify that the event emission failed
1531        assert!(result.is_err());
1532        let error = result.unwrap_err();
1533        assert!(error.to_string().contains("Event handler failed"));
1534        assert!(error.to_string().contains("Second handler failed"));
1535
1536        // The successful handler should have been called before the failing one
1537        let events = successful_handler.events();
1538        assert_eq!(events.len(), 1);
1539        match &events[0] {
1540            EventChange::Created { event, .. } => {
1541                assert!(
1542                    matches!(event, AlienEvent::TestBuildingStack { stack } if stack == "test-stack")
1543                );
1544            }
1545            _ => panic!("Expected Created event"),
1546        }
1547    }
1548
1549    #[tokio::test]
1550    async fn test_handler_failure_in_scoped_event() {
1551        struct FailingHandler;
1552
1553        #[async_trait]
1554        impl EventHandler for FailingHandler {
1555            async fn on_event_change(&self, _change: EventChange) -> Result<()> {
1556                Err(AlienError::new(ErrorData::GenericError {
1557                    message: "Handler failed during scoped event".to_string(),
1558                }))
1559            }
1560        }
1561
1562        let failing_handler = Arc::new(FailingHandler);
1563        let bus = EventBus::with_handlers(vec![failing_handler]);
1564
1565        let result = bus
1566            .run(|| async {
1567                // Try to use in_scope - the initial event emission will fail, but in_scope
1568                // is designed to be fault-tolerant and will continue with a no-op handle
1569                AlienEvent::TestBuildingStack {
1570                    stack: "test-stack".to_string(),
1571                }
1572                .in_scope(|handle| async move {
1573                    // This code will be reached because in_scope is fault-tolerant
1574                    // The handle should be a no-op handle
1575                    assert!(handle.is_noop);
1576                    Ok::<_, AlienError<ErrorData>>(42)
1577                })
1578                .await
1579            })
1580            .await;
1581
1582        // Verify that the scoped event succeeded despite handler failure
1583        // This demonstrates the fault-tolerant design of the event system
1584        assert!(result.is_ok());
1585        assert_eq!(result.unwrap(), 42);
1586    }
1587
1588    #[tokio::test]
1589    async fn test_handler_failure_during_update() {
1590        // Create a handler that succeeds on creation but fails on updates
1591        struct UpdateFailingHandler {
1592            call_count: Arc<Mutex<usize>>,
1593        }
1594
1595        #[async_trait]
1596        impl EventHandler for UpdateFailingHandler {
1597            async fn on_event_change(&self, change: EventChange) -> Result<()> {
1598                let mut count = self.call_count.lock().unwrap();
1599                *count += 1;
1600
1601                match change {
1602                    EventChange::Created { .. } => Ok(()), // Allow creation
1603                    EventChange::Updated { .. } => {
1604                        // Fail on updates
1605                        Err(AlienError::new(ErrorData::GenericError {
1606                            message: "Handler failed during update".to_string(),
1607                        }))
1608                    }
1609                    EventChange::StateChanged { .. } => Ok(()), // Allow state changes
1610                }
1611            }
1612        }
1613
1614        let call_count = Arc::new(Mutex::new(0));
1615        let handler = Arc::new(UpdateFailingHandler {
1616            call_count: call_count.clone(),
1617        });
1618        let bus = EventBus::with_handlers(vec![handler]);
1619
1620        let result = bus
1621            .run(|| async {
1622                // Create an event - this should succeed
1623                let handle = AlienEvent::TestBuildImage {
1624                    image: "test-image".to_string(),
1625                    stage: "stage1".to_string(),
1626                }
1627                .emit()
1628                .await?;
1629
1630                // Try to update the event - this should fail
1631                handle
1632                    .update(AlienEvent::TestBuildImage {
1633                        image: "test-image".to_string(),
1634                        stage: "stage2".to_string(),
1635                    })
1636                    .await
1637            })
1638            .await;
1639
1640        // Verify that the update failed
1641        assert!(result.is_err());
1642        let error = result.unwrap_err();
1643        assert!(error.to_string().contains("Event handler failed"));
1644        assert!(error.to_string().contains("Handler failed during update"));
1645
1646        // Verify the handler was called twice (once for create, once for failed update)
1647        assert_eq!(*call_count.lock().unwrap(), 2);
1648    }
1649
1650    #[tokio::test]
1651    async fn test_alien_event_macro() {
1652        use crate::alien_event;
1653
1654        let (result, handler) = with_test_bus(|| async {
1655            // Test the macro with a simple function
1656            #[alien_event(AlienEvent::TestBuildingStack { stack: "macro-test".to_string() })]
1657            async fn test_macro_function() -> Result<String> {
1658                // Emit a child event
1659                AlienEvent::TestBuildImage {
1660                    image: "test-image".to_string(),
1661                    stage: "compile".to_string(),
1662                }
1663                .emit()
1664                .await?;
1665
1666                Ok("success".to_string())
1667            }
1668
1669            test_macro_function().await
1670        })
1671        .await;
1672
1673        // Verify the function succeeded
1674        assert!(result.is_ok());
1675        assert_eq!(result.unwrap(), "success");
1676
1677        let events = handler.events();
1678
1679        // Should have: parent created (started), child created, parent state changed to success
1680        assert!(events.len() >= 3);
1681
1682        // Verify parent started
1683        match &events[0] {
1684            EventChange::Created { state, event, .. } => {
1685                assert_eq!(state, &EventState::Started);
1686                assert!(
1687                    matches!(event, AlienEvent::TestBuildingStack { stack } if stack == "macro-test")
1688                );
1689            }
1690            _ => panic!("Expected Created event"),
1691        }
1692
1693        // Verify child event
1694        let child_event = events.iter().find(|e| {
1695            matches!(
1696                e,
1697                EventChange::Created {
1698                    event: AlienEvent::TestBuildImage { .. },
1699                    ..
1700                }
1701            )
1702        });
1703        assert!(child_event.is_some());
1704
1705        // Verify parent completed successfully
1706        let success_event = events.iter().find(|e| {
1707            matches!(
1708                e,
1709                EventChange::StateChanged {
1710                    new_state: EventState::Success,
1711                    ..
1712                }
1713            )
1714        });
1715        assert!(success_event.is_some());
1716    }
1717
1718    #[tokio::test]
1719    async fn test_alien_event_macro_with_failure() {
1720        use crate::alien_event;
1721
1722        let (result, handler) = with_test_bus(|| async {
1723            // Test the macro with a function that fails
1724            #[alien_event(AlienEvent::TestBuildingStack { stack: "macro-fail-test".to_string() })]
1725            async fn test_macro_failure() -> Result<String> {
1726                // Emit a child event
1727                AlienEvent::TestBuildImage {
1728                    image: "test-image".to_string(),
1729                    stage: "compile".to_string(),
1730                }
1731                .emit()
1732                .await?;
1733
1734                // Then fail
1735                Err(AlienError::new(ErrorData::GenericError {
1736                    message: "Macro test failure".to_string(),
1737                }))
1738            }
1739
1740            test_macro_failure().await
1741        })
1742        .await;
1743
1744        // Verify the function failed
1745        assert!(result.is_err());
1746
1747        let events = handler.events();
1748
1749        // Verify parent failed
1750        let failure_event = events.iter().find(|e| {
1751            matches!(
1752                e,
1753                EventChange::StateChanged {
1754                    new_state: EventState::Failed { .. },
1755                    ..
1756                }
1757            )
1758        });
1759        assert!(failure_event.is_some());
1760    }
1761
1762    #[tokio::test]
1763    async fn test_alien_event_macro_with_dynamic_values() {
1764        use crate::alien_event;
1765
1766        let (result, handler) = with_test_bus(|| async {
1767            // Test the macro with dynamic values
1768            async fn test_with_id(id: u32) -> Result<String> {
1769                #[alien_event(AlienEvent::TestBuildingStack { stack: format!("stack-{}", id) })]
1770                async fn inner_function(id: u32) -> Result<String> {
1771                    Ok(format!("processed-{}", id))
1772                }
1773
1774                inner_function(id).await
1775            }
1776
1777            test_with_id(42).await
1778        })
1779        .await;
1780
1781        // Verify the function succeeded
1782        assert!(result.is_ok());
1783        assert_eq!(result.unwrap(), "processed-42");
1784
1785        let events = handler.events();
1786
1787        // Verify the dynamic stack name
1788        match &events[0] {
1789            EventChange::Created { event, .. } => {
1790                assert!(
1791                    matches!(event, AlienEvent::TestBuildingStack { stack } if stack == "stack-42")
1792                );
1793            }
1794            _ => panic!("Expected Created event"),
1795        }
1796    }
1797
1798    #[tokio::test]
1799    async fn test_handler_failure_during_state_change() {
1800        // Create a handler that succeeds on creation but fails on state changes
1801        struct StateChangeFailingHandler {
1802            call_count: Arc<Mutex<usize>>,
1803        }
1804
1805        #[async_trait]
1806        impl EventHandler for StateChangeFailingHandler {
1807            async fn on_event_change(&self, change: EventChange) -> Result<()> {
1808                let mut count = self.call_count.lock().unwrap();
1809                *count += 1;
1810
1811                match change {
1812                    EventChange::Created { .. } => Ok(()), // Allow creation
1813                    EventChange::Updated { .. } => Ok(()), // Allow updates
1814                    EventChange::StateChanged { .. } => {
1815                        // Fail on state changes
1816                        Err(AlienError::new(ErrorData::GenericError {
1817                            message: "Handler failed during state change".to_string(),
1818                        }))
1819                    }
1820                }
1821            }
1822        }
1823
1824        let call_count = Arc::new(Mutex::new(0));
1825        let handler = Arc::new(StateChangeFailingHandler {
1826            call_count: call_count.clone(),
1827        });
1828        let bus = EventBus::with_handlers(vec![handler]);
1829
1830        let result = bus
1831            .run(|| async {
1832                // Create an event - this should succeed
1833                let handle = AlienEvent::TestBuildImage {
1834                    image: "test-image".to_string(),
1835                    stage: "stage1".to_string(),
1836                }
1837                .emit()
1838                .await?;
1839
1840                // Try to complete the event - this should fail due to state change handler failure
1841                handle.complete().await
1842            })
1843            .await;
1844
1845        // Verify that the state change failed
1846        assert!(result.is_err());
1847        let error = result.unwrap_err();
1848        assert!(error.to_string().contains("Event handler failed"));
1849        assert!(error
1850            .to_string()
1851            .contains("Handler failed during state change"));
1852
1853        // Verify the handler was called twice (once for create, once for failed state change)
1854        assert_eq!(*call_count.lock().unwrap(), 2);
1855    }
1856}