alien_core/events/mod.rs
1//! # Alien Events System
2//!
3//! A Rust-based events system for user-facing events in Alien. This system is designed to handle
4//! events that can be consumed by dashboards, CLIs, and other monitoring tools to show users
5//! what's happening during builds, deployments, and other operations.
6//!
7//! ## Why Not Use Traces?
8//!
9//! This events system is **not** an alternative to traces - traces are for engineering observability
10//! and internal debugging, while this events module is about user-facing events that you show in UIs
11//! (either GUI or TUI). Key differences:
12//!
13//! - **Reliable User Flow**: Events are blocking (unlike non-blocking traces) because we want to
14//! guarantee users see critical progress updates and fail fast if we can't communicate status
15//! - **Rich UI Components**: Events have strict schemas (see `AlienEvent` enum) enabling translation
16//! to custom React components, progress bars, and interactive UI elements that show meaningful progress
17//! - **Flexible Granularity**: Unlike traces (spans + events), we only have events, with infinite
18//! hierarchy and unique IDs. Users can choose their view: high-level "Building" → "Deploying" or
19//! drill down to see "Building image", "Pushing image", etc. Each event can be scoped for perfect
20//! user control over detail level
21//! - **Live Progress Updates**: Events can be updated with new information (e.g., "pushing image...
22//! layer 1/5, layer 2/5, layer 3/5") enabling real-time progress indicators, which is impossible
23//! with immutable traces
24//! - **Clear Success/Failure States**: Each scoped event has explicit states (Started, Success, Failed)
25//! with detailed error information, giving users immediate feedback on what succeeded, what failed,
26//! and exactly why
27//!
28//! ## Key Features
29//!
30//! - **Global Event Bus**: Events can be emitted from anywhere in the code without passing around
31//! event bus instances, making it easy to add to large Rust workspaces with many crates.
32//! - **Hierarchical Events**: Events can have parent-child relationships for organizing complex operations.
33//! - **State Management**: Events can track their lifecycle (None, Started, Success, Failed).
34//! - **Durable Execution Support**: Designed to work with frameworks like Temporal, Inngest, and Restate
35//! where processes can restart and state needs to be preserved externally.
36//! - **Change-based Architecture**: Instead of storing events in memory, the system emits changes
37//! (Created, Updated, StateChanged) that handlers can persist or react to.
38//! - **Macro Support**: The `#[alien_event]` macro provides a convenient way to instrument functions
39//! with events, similar to tracing's `#[instrument]` macro.
40//!
41//! ## Basic Usage
42//!
43//! ### Using the `#[alien_event]` Macro (Recommended)
44//!
45//! The easiest way to instrument functions with events is using the `#[alien_event]` macro:
46//!
47//! ```rust,ignore
48//! use alien_core::{AlienEvent, EventBus, alien_event, Result};
49//!
50//! #[alien_event(AlienEvent::BuildingStack { stack: "my-stack".to_string() })]
51//! async fn build_stack() -> Result<()> {
52//! // All events emitted within this function will automatically be children
53//! // of the BuildingStack event. The event will be marked as successful
54//! // if the function returns Ok, or failed if it returns an error.
55//!
56//! AlienEvent::BuildingImage {
57//! image: "api:latest".to_string(),
58//! }
59//! .emit()
60//! .await?;
61//!
62//! Ok(())
63//! }
64//!
65//! # async fn example() -> Result<()> {
66//! let bus = EventBus::new();
67//! bus.run(|| async {
68//! build_stack().await?;
69//! Ok(())
70//! }).await?;
71//! # Ok(())
72//! # }
73//! ```
74//!
75//! ### Simple Event Emission
76//!
77//! For more control, you can emit events manually:
78//!
79//! ```rust
80//! use alien_core::{AlienEvent, EventBus};
81//!
82//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
83//! let bus = EventBus::new();
84//! bus.run(|| async {
85//! // Emit a simple event
86//! let handle = AlienEvent::BuildingStack {
87//! stack: "my-stack".to_string(),
88//! }
89//! .emit()
90//! .await?;
91//!
92//! println!("Emitted event with ID: {}", handle.id);
93//! Ok::<_, Box<dyn std::error::Error>>(())
94//! }).await?;
95//! # Ok(())
96//! # }
97//! ```
98//!
99//! ### Event Updates
100//!
101//! ```rust
102//! use alien_core::{AlienEvent, EventBus};
103//!
104//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
105//! let bus = EventBus::new();
106//! bus.run(|| async {
107//! // Emit an event and get a handle
108//! let handle = AlienEvent::BuildingImage {
109//! image: "api:latest".to_string(),
110//! }
111//! .emit()
112//! .await?;
113//!
114//! // Update the event with new information
115//! handle.update(AlienEvent::BuildingImage {
116//! image: "api:latest-v2".to_string(),
117//! }).await;
118//!
119//! // Mark as completed
120//! handle.complete().await;
121//! Ok::<_, Box<dyn std::error::Error>>(())
122//! }).await?;
123//! # Ok(())
124//! # }
125//! ```
126//!
127//! ### Scoped Events with Automatic State Management
128//!
129//! You can also use `in_scope` directly for more control over the event lifecycle:
130//!
131//! ```rust,ignore
132//! use alien_core::{AlienEvent, EventBus, ErrorData, Result};
133//! use alien_error::AlienError;
134//!
135//! # async fn example() -> Result<()> {
136//! let bus = EventBus::new();
137//! bus.run(|| async {
138//! // Use in_scope for automatic success/failure tracking
139//! // All events emitted within the scope automatically become children
140//! let result = AlienEvent::BuildingStack {
141//! stack: "my-stack".to_string(),
142//! }
143//! .in_scope(|_handle| async move {
144//! // This event will automatically be a child of BuildingStack
145//! AlienEvent::BuildingImage {
146//! image: "api:latest".to_string(),
147//! }
148//! .emit()
149//! .await?;
150//!
151//! // Do some work that might fail
152//! std::fs::create_dir_all("/tmp/build")
153//! .map_err(|e| AlienError::new(ErrorData::GenericError {
154//! message: e.to_string(),
155//! }))?;
156//!
157//! // Return success
158//! Ok::<_, AlienError<ErrorData>>(42)
159//! })
160//! .await?;
161//!
162//! println!("Operation completed with result: {}", result);
163//! Ok(())
164//! }).await?;
165//! # Ok(())
166//! # }
167//! ```
168//!
169//! ### Event Hierarchy
170//!
171//! ```rust,ignore
172//! use alien_core::{AlienEvent, EventBus, Result};
173//!
174//! # async fn example() -> Result<()> {
175//! let bus = EventBus::new();
176//! bus.run(|| async {
177//! let parent = AlienEvent::BuildingStack {
178//! stack: "my-stack".to_string(),
179//! }
180//! .emit()
181//! .await?;
182//!
183//! // Create a parent context for multiple child events
184//! parent.as_parent(|_handle| async {
185//! // All events emitted here will be children of the parent
186//! AlienEvent::BuildingImage {
187//! image: "api:latest".to_string(),
188//! }
189//! .emit()
190//! .await?;
191//!
192//! AlienEvent::PushingImage {
193//! image: "api:latest".to_string(),
194//! progress: None,
195//! }
196//! .emit()
197//! .await?;
198//!
199//! Ok(())
200//! }).await?;
201//!
202//! // Complete the parent when all children are done
203//! parent.complete().await;
204//! Ok(())
205//! }).await?;
206//! # Ok(())
207//! # }
208//! ```
209//!
210//! ## Durable Execution Support
211//!
212//! The events system is designed to work with durable execution frameworks where processes
213//! can restart and lose in-memory state. Instead of storing events in memory, the system
214//! emits changes that external handlers can persist.
215//!
216//! ### Manual State Management for Durable Workflows
217//!
218//! ```rust,ignore
219//! use alien_core::{AlienEvent, EventBus, EventState, Result};
220//!
221//! # async fn example() -> Result<()> {
222//! let bus = EventBus::new();
223//! bus.run(|| async {
224//! // In a durable execution framework like Temporal:
225//!
226//! // Step 1: Start a long-running operation
227//! let parent_handle = AlienEvent::BuildingStack {
228//! stack: "my-stack".to_string(),
229//! }
230//! .emit_with_state(EventState::Started)
231//! .await?;
232//!
233//! // Step 2: Perform work across multiple durable steps
234//! parent_handle.as_parent(|_handle| async {
235//! // ctx.run(|| { ... }) - durable step 1
236//! AlienEvent::BuildingImage {
237//! image: "api:latest".to_string(),
238//! }
239//! .emit()
240//! .await?;
241//!
242//! // ctx.run(|| { ... }) - durable step 2
243//! AlienEvent::PushingImage {
244//! image: "api:latest".to_string(),
245//! progress: None,
246//! }
247//! .emit()
248//! .await?;
249//!
250//! Ok(())
251//! }).await?;
252//!
253//! // Step 3: Complete the operation
254//! parent_handle.complete().await;
255//! Ok(())
256//! }).await?;
257//! # Ok(())
258//! # }
259//! ```
260//!
261//! ## The `#[alien_event]` Macro
262//!
263//! The `#[alien_event]` macro provides a convenient way to instrument async functions with events,
264//! similar to how tracing's `#[instrument]` macro works for logging. It automatically:
265//!
266//! - Creates an event when the function starts (with `EventState::Started`)
267//! - Establishes a parent context so all events emitted within the function become children
268//! - Marks the event as successful (`EventState::Success`) if the function returns `Ok`
269//! - Marks the event as failed (`EventState::Failed`) if the function returns an `Err`
270//!
271//! ### Basic Usage
272//!
273//! ```rust,ignore
274//! use alien_core::{AlienEvent, alien_event, Result};
275//!
276//! #[alien_event(AlienEvent::BuildingStack { stack: "my-stack".to_string() })]
277//! async fn build_stack() -> Result<()> {
278//! // Function implementation
279//! Ok(())
280//! }
281//! ```
282//!
283//! ### Dynamic Values
284//!
285//! You can use function parameters and expressions in the event definition:
286//!
287//! ```rust,ignore
288//! use alien_core::{AlienEvent, alien_event, Result};
289//!
290//! #[alien_event(AlienEvent::BuildingStack { stack: format!("stack-{}", stack_id) })]
291//! async fn build_dynamic_stack(stack_id: u32) -> Result<()> {
292//! // Function implementation
293//! Ok(())
294//! }
295//! ```
296//!
297//! ### Comparison with Manual Event Management
298//!
299//! The macro transforms this:
300//!
301//! ```rust,ignore
302//! #[alien_event(AlienEvent::BuildingStack { stack: "my-stack".to_string() })]
303//! async fn build_stack() -> Result<()> {
304//! // function body
305//! Ok(())
306//! }
307//! ```
308//!
309//! Into this:
310//!
311//! ```rust,ignore
312//! async fn build_stack() -> Result<()> {
313//! AlienEvent::BuildingStack { stack: "my-stack".to_string() }
314//! .in_scope(|_event_handle| async move {
315//! // function body
316//! Ok(())
317//! })
318//! .await
319//! }
320//! ```
321//!
322//! ### Limitations
323//!
324//! - The macro only works with `async` functions
325//! - For sync functions, use `AlienEvent::emit()` manually
326//! - The event expression is evaluated when the function is called, not when the macro is expanded
327//!
328//! ## Event Handlers
329//!
330//! Event handlers receive changes and can persist them, update UIs, or trigger other actions:
331//!
332//! ```rust
333//! use alien_core::{EventHandler, EventChange, EventBus, AlienEvent};
334//! use async_trait::async_trait;
335//!
336//! struct PostgresEventHandler {
337//! // database connection pool, etc.
338//! }
339//!
340//! #[async_trait]
341//! impl EventHandler for PostgresEventHandler {
342//! async fn on_event_change(&self, change: EventChange) -> alien_core::Result<()> {
343//! match change {
344//! EventChange::Created { id, parent_id, created_at, event, state } => {
345//! // Insert new event record into database
346//! println!("Creating event {} with parent {:?}", id, parent_id);
347//! }
348//! EventChange::Updated { id, updated_at, event } => {
349//! // Update event data in database
350//! println!("Updating event {} at {}", id, updated_at);
351//! }
352//! EventChange::StateChanged { id, updated_at, new_state } => {
353//! // Update event state in database
354//! println!("Event {} state changed to {:?} at {}", id, new_state, updated_at);
355//! }
356//! }
357//! Ok(())
358//! }
359//! }
360//!
361//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
362//! let handler = std::sync::Arc::new(PostgresEventHandler {});
363//! let bus = EventBus::with_handlers(vec![handler]);
364//!
365//! bus.run(|| async {
366//! // Events will now be persisted to PostgreSQL
367//! AlienEvent::BuildingStack {
368//! stack: "my-stack".to_string(),
369//! }
370//! .emit()
371//! .await?;
372//! Ok::<_, Box<dyn std::error::Error>>(())
373//! }).await?;
374//! # Ok(())
375//! # }
376//! ```
377//!
378//! ## Architecture Notes
379//!
380//! ### Change-Based Design
381//!
382//! Unlike traditional event systems that store complete event state, this system emits
383//! incremental changes:
384//!
385//! - `EventChange::Created`: A new event was created with initial data and state
386//! - `EventChange::Updated`: An event's data was updated
387//! - `EventChange::StateChanged`: An event's state transitioned (None → Started → Success/Failed)
388//!
389//! This design is crucial for durable execution where the event bus itself doesn't persist
390//! state across process restarts.
391//!
392//! ### Task-Local Context
393//!
394//! The event bus uses Tokio's task-local storage to provide a global context without
395//! requiring explicit parameter passing. This makes it easy to add event emission to
396//! existing codebases. The `#[alien_event]` macro leverages this design to provide
397//! seamless instrumentation without requiring changes to function signatures.
398//!
399//! ### Error Handling
400//!
401//! Event emission is designed to be non-blocking and fault-tolerant. If no event bus
402//! context is available, operations will return `EventBusError::NoEventBusContext`
403//! but won't panic, allowing code to continue running even without event tracking.
404
405mod event;
406pub use event::*;
407
408mod handler;
409pub use handler::*;
410
411mod state;
412pub use state::*;
413
414mod handle;
415pub use handle::*;
416
417mod bus;
418pub use bus::*;
419
420#[cfg(test)]
421mod tests {
422 use crate::events::{AlienEvent, EventBus, EventChange, EventHandler, EventState};
423 use crate::{ErrorData, Result};
424 use alien_error::{AlienError, GenericError};
425 use async_trait::async_trait;
426 use insta::assert_debug_snapshot;
427 use rstest::*;
428 use std::sync::{Arc, Mutex};
429
430 /// Test event handler that captures all events for testing
431 #[derive(Debug, Clone)]
432 struct TestEventHandler {
433 events: Arc<Mutex<Vec<EventChange>>>,
434 }
435
436 impl TestEventHandler {
437 fn new() -> Self {
438 Self {
439 events: Arc::new(Mutex::new(Vec::new())),
440 }
441 }
442
443 fn events(&self) -> Vec<EventChange> {
444 self.events.lock().unwrap().clone()
445 }
446
447 #[allow(dead_code)]
448 fn clear(&self) {
449 self.events.lock().unwrap().clear();
450 }
451 }
452
453 #[async_trait]
454 impl EventHandler for TestEventHandler {
455 async fn on_event_change(&self, change: EventChange) -> Result<()> {
456 self.events.lock().unwrap().push(change);
457 Ok(())
458 }
459 }
460
461 /// Helper to run tests with event bus context
462 async fn with_test_bus<F, Fut, R>(f: F) -> (R, TestEventHandler)
463 where
464 F: FnOnce() -> Fut,
465 Fut: std::future::Future<Output = R>,
466 {
467 let handler = TestEventHandler::new();
468 let bus = EventBus::with_handlers(vec![Arc::new(handler.clone())]);
469 let result = bus.run(f).await;
470 (result, handler)
471 }
472
473 #[tokio::test]
474 async fn test_simple_event_emission() {
475 let (result, handler) = with_test_bus(|| async {
476 let handle = AlienEvent::TestBuildingStack {
477 stack: "test-stack".to_string(),
478 }
479 .emit()
480 .await
481 .unwrap();
482
483 assert!(!handle.id.is_empty());
484 handle
485 })
486 .await;
487
488 let events = handler.events();
489 assert_eq!(events.len(), 1);
490
491 match &events[0] {
492 EventChange::Created {
493 id,
494 parent_id,
495 event,
496 state,
497 ..
498 } => {
499 assert_eq!(id, &result.id);
500 assert_eq!(parent_id, &None);
501 assert!(
502 matches!(event, AlienEvent::TestBuildingStack { stack } if stack == "test-stack")
503 );
504 assert_eq!(state, &EventState::None);
505 }
506 _ => panic!("Expected Created event"),
507 }
508 }
509
510 #[tokio::test]
511 async fn test_event_update() {
512 let (_, handler) = with_test_bus(|| async {
513 let handle = AlienEvent::TestBuildImage {
514 image: "api:latest".to_string(),
515 stage: "stage 1".to_string(),
516 }
517 .emit()
518 .await
519 .unwrap();
520
521 handle
522 .update(AlienEvent::TestBuildImage {
523 image: "api:latest".to_string(),
524 stage: "stage 2".to_string(),
525 })
526 .await
527 .unwrap();
528
529 handle
530 .update(AlienEvent::TestBuildImage {
531 image: "api:latest".to_string(),
532 stage: "stage 3".to_string(),
533 })
534 .await
535 .unwrap();
536 })
537 .await;
538
539 let events = handler.events();
540 assert_eq!(events.len(), 3); // 1 create + 2 updates
541
542 // Check updates
543 match &events[1] {
544 EventChange::Updated { event, .. } => {
545 assert!(
546 matches!(event, AlienEvent::TestBuildImage { stage, .. } if stage == "stage 2")
547 );
548 }
549 _ => panic!("Expected Updated event"),
550 }
551
552 match &events[2] {
553 EventChange::Updated { event, .. } => {
554 assert!(
555 matches!(event, AlienEvent::TestBuildImage { stage, .. } if stage == "stage 3")
556 );
557 }
558 _ => panic!("Expected Updated event"),
559 }
560 }
561
562 #[tokio::test]
563 async fn test_scoped_event_success() {
564 let (result, handler) = with_test_bus(|| async {
565 AlienEvent::TestBuildingImage {
566 image: "api:latest".to_string(),
567 }
568 .in_scope(|_handle| async move {
569 // Emit child events
570 AlienEvent::TestBuildImage {
571 image: "api:latest".to_string(),
572 stage: "compile".to_string(),
573 }
574 .emit()
575 .await
576 .unwrap();
577
578 AlienEvent::TestBuildImage {
579 image: "api:latest".to_string(),
580 stage: "link".to_string(),
581 }
582 .emit()
583 .await
584 .unwrap();
585
586 Ok::<_, AlienError<ErrorData>>(42)
587 })
588 .await
589 .unwrap()
590 })
591 .await;
592
593 assert_eq!(result, 42);
594
595 let events = handler.events();
596
597 // Should have: parent created (started), 2 children created, parent state changed to success
598 assert!(events.len() >= 4);
599
600 // Verify parent started
601 match &events[0] {
602 EventChange::Created { state, .. } => {
603 assert_eq!(state, &EventState::Started);
604 }
605 _ => panic!("Expected Created event"),
606 }
607
608 // Verify parent completed successfully
609 let last_event = events.last().unwrap();
610 match last_event {
611 EventChange::StateChanged { new_state, .. } => {
612 assert_eq!(new_state, &EventState::Success);
613 }
614 _ => panic!("Expected StateChanged event"),
615 }
616 }
617
618 #[tokio::test]
619 async fn test_scoped_event_failure() {
620 let (result, handler) = with_test_bus(|| async {
621 AlienEvent::TestBuildingImage {
622 image: "api:latest".to_string(),
623 }
624 .in_scope(|_handle| async move {
625 // Emit a child event
626 AlienEvent::TestBuildImage {
627 image: "api:latest".to_string(),
628 stage: "compile".to_string(),
629 }
630 .emit()
631 .await
632 .unwrap();
633
634 // Then fail
635 Err::<(), _>(AlienError::new(ErrorData::GenericError {
636 message: "Test error".to_string(),
637 }))
638 })
639 .await
640 })
641 .await;
642
643 assert!(result.is_err());
644
645 let events = handler.events();
646
647 // Verify parent failed
648 let last_event = events.last().unwrap();
649 match last_event {
650 EventChange::StateChanged { new_state, .. } => {
651 assert!(matches!(new_state, EventState::Failed { .. }));
652 }
653 _ => panic!("Expected StateChanged event"),
654 }
655 }
656
657 #[tokio::test]
658 async fn test_deep_hierarchy() {
659 let (_, handler) = with_test_bus(|| async {
660 AlienEvent::TestBuildingStack {
661 stack: "root-stack".to_string(),
662 }
663 .in_scope(|_| async {
664 // Level 1
665 AlienEvent::TestBuildingImage {
666 image: "app1".to_string(),
667 }
668 .in_scope(|_| async {
669 // Level 2
670 AlienEvent::TestBuildImage {
671 image: "app1".to_string(),
672 stage: "compile".to_string(),
673 }
674 .in_scope(|_| async {
675 // Level 3
676 AlienEvent::TestBuildImage {
677 image: "app1".to_string(),
678 stage: "optimize".to_string(),
679 }
680 .emit()
681 .await
682 .unwrap();
683 Ok::<_, AlienError<ErrorData>>(())
684 })
685 .await
686 .unwrap();
687 Ok::<_, AlienError<ErrorData>>(())
688 })
689 .await
690 .unwrap();
691
692 // Another branch at level 1
693 AlienEvent::TestBuildingImage {
694 image: "app2".to_string(),
695 }
696 .emit()
697 .await
698 .unwrap();
699
700 Ok::<_, AlienError<ErrorData>>(())
701 })
702 .await
703 .unwrap()
704 })
705 .await;
706
707 // Create a hierarchy representation for snapshot testing
708 let events = handler.events();
709 let mut hierarchy = Vec::new();
710 let mut id_map = std::collections::HashMap::new();
711 let mut counter = 0;
712
713 for event in &events {
714 match event {
715 EventChange::Created {
716 id,
717 parent_id,
718 event,
719 ..
720 } => {
721 // Map real IDs to stable IDs for snapshot testing
722 let stable_id = id_map
723 .entry(id.clone())
724 .or_insert_with(|| {
725 counter += 1;
726 format!("event-{}", counter)
727 })
728 .clone();
729
730 let stable_parent_id = parent_id.as_ref().map(|p| {
731 id_map
732 .entry(p.clone())
733 .or_insert_with(|| {
734 counter += 1;
735 format!("event-{}", counter)
736 })
737 .clone()
738 });
739
740 hierarchy.push((stable_id, stable_parent_id, format!("{:?}", event)));
741 }
742 _ => {}
743 }
744 }
745
746 assert_debug_snapshot!(hierarchy);
747 }
748
749 #[tokio::test]
750 async fn test_wide_hierarchy() {
751 let (_, handler) = with_test_bus(|| async {
752 let parent = AlienEvent::TestBuildingStack {
753 stack: "wide-stack".to_string(),
754 }
755 .emit()
756 .await
757 .unwrap();
758
759 parent
760 .as_parent(|_| async {
761 // Emit many child events
762 for i in 0..10 {
763 AlienEvent::TestBuildImage {
764 image: format!("image-{}", i),
765 stage: "build".to_string(),
766 }
767 .emit()
768 .await
769 .unwrap();
770 }
771 Ok::<_, ErrorData>(())
772 })
773 .await
774 .unwrap();
775
776 parent.complete().await.unwrap();
777 })
778 .await;
779
780 let events = handler.events();
781
782 // Count children
783 let children_count = events
784 .iter()
785 .filter(|e| {
786 matches!(
787 e,
788 EventChange::Created {
789 parent_id: Some(_),
790 ..
791 }
792 )
793 })
794 .count();
795
796 assert_eq!(children_count, 10);
797 }
798
799 #[tokio::test]
800 async fn test_durable_execution_simulation() {
801 // Simulate a durable execution with multiple steps
802 let (_, handler) = with_test_bus(|| async {
803 // Step 1: Start parent event
804 let parent_handle = AlienEvent::TestBuildingStack {
805 stack: "durable-stack".to_string(),
806 }
807 .emit_with_state(EventState::Started)
808 .await
809 .unwrap();
810
811 // Simulate ctx.run() boundaries
812 let parent_id = parent_handle.id.clone();
813
814 // Step 2: First child in separate "execution"
815 parent_handle
816 .as_parent(|_| async {
817 AlienEvent::TestBuildImage {
818 image: "api:latest".to_string(),
819 stage: "compile".to_string(),
820 }
821 .emit()
822 .await
823 .unwrap();
824 Ok::<_, ErrorData>(())
825 })
826 .await
827 .unwrap();
828
829 // Step 3: Second child in separate "execution"
830 parent_handle
831 .as_parent(|_| async {
832 AlienEvent::TestPushImage {
833 image: "api:latest".to_string(),
834 }
835 .emit()
836 .await
837 .unwrap();
838 Ok::<_, ErrorData>(())
839 })
840 .await
841 .unwrap();
842
843 // Step 4: Complete parent
844 parent_handle.complete().await.unwrap();
845
846 parent_id
847 })
848 .await;
849
850 let events = handler.events();
851
852 // Verify sequence of events
853 assert!(events.len() >= 4); // parent created, 2 children, parent completed
854
855 // Verify all events are properly linked
856 let created_events: Vec<_> = events
857 .iter()
858 .filter_map(|e| match e {
859 EventChange::Created { id, parent_id, .. } => Some((id.clone(), parent_id.clone())),
860 _ => None,
861 })
862 .collect();
863
864 assert_eq!(created_events.len(), 3); // 1 parent + 2 children
865 }
866
867 #[tokio::test]
868 async fn test_manual_state_management() {
869 let (_, handler) = with_test_bus(|| async {
870 let handle = AlienEvent::TestBuildingStack {
871 stack: "manual-stack".to_string(),
872 }
873 .emit_with_state(EventState::Started)
874 .await
875 .unwrap();
876
877 // Do some work...
878
879 // Manually fail the event
880 handle
881 .fail(AlienError::new(GenericError {
882 message: "Something went wrong".to_string(),
883 }))
884 .await
885 .unwrap();
886 })
887 .await;
888
889 let events = handler.events();
890
891 // Check initial state
892 match &events[0] {
893 EventChange::Created { state, .. } => {
894 assert_eq!(state, &EventState::Started);
895 }
896 _ => panic!("Expected Created event"),
897 }
898
899 // Check failure
900 match &events[1] {
901 EventChange::StateChanged { new_state, .. } => match new_state {
902 EventState::Failed { error } => {
903 let error = error.as_ref().expect("Expected error to be present");
904 assert_eq!(error.message, "Something went wrong");
905 }
906 _ => panic!("Expected Failed state"),
907 },
908 _ => panic!("Expected StateChanged event"),
909 }
910 }
911
912 #[tokio::test]
913 async fn test_concurrent_events() {
914 let (_, handler) = with_test_bus(|| async {
915 let handles = tokio::join!(
916 AlienEvent::TestBuildImage {
917 image: "image1".to_string(),
918 stage: "build".to_string(),
919 }
920 .emit(),
921 AlienEvent::TestBuildImage {
922 image: "image2".to_string(),
923 stage: "build".to_string(),
924 }
925 .emit(),
926 AlienEvent::TestBuildImage {
927 image: "image3".to_string(),
928 stage: "build".to_string(),
929 }
930 .emit(),
931 );
932
933 assert!(handles.0.is_ok());
934 assert!(handles.1.is_ok());
935 assert!(handles.2.is_ok());
936 })
937 .await;
938
939 let events = handler.events();
940 assert_eq!(events.len(), 3);
941 }
942
943 #[tokio::test]
944 async fn test_nested_scopes_with_errors() {
945 let (result, handler) = with_test_bus(|| async {
946 AlienEvent::TestBuildingStack {
947 stack: "nested-error-stack".to_string(),
948 }
949 .in_scope(|_| async {
950 // This should succeed
951 AlienEvent::TestBuildImage {
952 image: "image1".to_string(),
953 stage: "stage1".to_string(),
954 }
955 .emit()
956 .await
957 .unwrap();
958
959 // This scope should fail
960 let inner_result = AlienEvent::TestBuildImage {
961 image: "image2".to_string(),
962 stage: "stage2".to_string(),
963 }
964 .in_scope(|_| async {
965 Err::<(), _>(AlienError::new(ErrorData::GenericError {
966 message: "Inner error".to_string(),
967 }))
968 })
969 .await;
970
971 // Continue despite inner failure
972 assert!(inner_result.is_err());
973
974 Ok::<_, AlienError<ErrorData>>("Outer succeeded")
975 })
976 .await
977 })
978 .await;
979
980 assert!(result.is_ok());
981 assert_eq!(result.unwrap(), "Outer succeeded");
982
983 // Verify mixed success/failure states
984 let state_changes: Vec<_> = handler
985 .events()
986 .iter()
987 .filter_map(|e| match e {
988 EventChange::StateChanged { id, new_state, .. } => {
989 Some((id.clone(), new_state.clone()))
990 }
991 _ => None,
992 })
993 .collect();
994
995 assert!(state_changes
996 .iter()
997 .any(|(_, state)| matches!(state, EventState::Failed { .. })));
998 assert!(state_changes
999 .iter()
1000 .any(|(_, state)| matches!(state, EventState::Success)));
1001 }
1002
1003 #[tokio::test]
1004 async fn test_no_event_bus_context() {
1005 // Test behavior when no event bus is present
1006 let result = AlienEvent::TestBuildingStack {
1007 stack: "no-context".to_string(),
1008 }
1009 .emit()
1010 .await;
1011
1012 // Should succeed with a no-op handle
1013 assert!(result.is_ok());
1014 let handle = result.unwrap();
1015 assert!(handle.is_noop);
1016 }
1017
1018 #[rstest]
1019 #[case("stack1", "image1", "stage1")]
1020 #[case("stack2", "image2", "stage2")]
1021 #[case("stack3", "image3", "stage3")]
1022 #[tokio::test]
1023 async fn test_parameterized_events(
1024 #[case] stack: &str,
1025 #[case] image: &str,
1026 #[case] stage: &str,
1027 ) {
1028 let (_, handler) = with_test_bus(|| async {
1029 AlienEvent::TestBuildingStack {
1030 stack: stack.to_string(),
1031 }
1032 .in_scope(|_| async move {
1033 AlienEvent::TestBuildImage {
1034 image: image.to_string(),
1035 stage: stage.to_string(),
1036 }
1037 .emit()
1038 .await
1039 .unwrap();
1040 Ok::<_, AlienError<ErrorData>>(())
1041 })
1042 .await
1043 .unwrap()
1044 })
1045 .await;
1046
1047 let events = handler.events();
1048 assert!(events.len() >= 3); // parent created, child created, parent success
1049 }
1050
1051 #[tokio::test]
1052 async fn test_complex_workflow_snapshot() {
1053 let (_, handler) = with_test_bus(|| async {
1054 // Simulate a complex deployment workflow
1055 AlienEvent::TestBuildingStack {
1056 stack: "production-stack".to_string(),
1057 }
1058 .in_scope(|_| async {
1059 // Build phase
1060 AlienEvent::TestBuildingImage {
1061 image: "api-service".to_string(),
1062 }
1063 .in_scope(|_| async {
1064 for stage in ["download-deps", "compile", "optimize", "build"] {
1065 AlienEvent::TestBuildImage {
1066 image: "api-service".to_string(),
1067 stage: stage.to_string(),
1068 }
1069 .emit()
1070 .await
1071 .unwrap();
1072 }
1073 Ok::<_, AlienError<ErrorData>>(())
1074 })
1075 .await
1076 .unwrap();
1077
1078 // Push phase
1079 AlienEvent::TestPushImage {
1080 image: "api-service".to_string(),
1081 }
1082 .emit()
1083 .await
1084 .unwrap();
1085
1086 // Deploy phase
1087 AlienEvent::TestDeployingStack {
1088 stack: "api-service".to_string(),
1089 }
1090 .in_scope(|_| async {
1091 AlienEvent::TestCreatingResource {
1092 resource_type: "LoadBalancer".to_string(),
1093 resource_name: "api-lb".to_string(),
1094 details: Some("Updating target groups".to_string()),
1095 }
1096 .emit()
1097 .await
1098 .unwrap();
1099
1100 AlienEvent::TestPerformingHealthCheck {
1101 target: "https://api.example.com/health".to_string(),
1102 check_type: "HTTP".to_string(),
1103 }
1104 .emit()
1105 .await
1106 .unwrap();
1107
1108 Ok::<_, AlienError<ErrorData>>(())
1109 })
1110 .await
1111 .unwrap();
1112
1113 Ok::<_, AlienError<ErrorData>>(())
1114 })
1115 .await
1116 .unwrap()
1117 })
1118 .await;
1119
1120 // Create a structured view of all events for snapshot
1121 let events = handler.events();
1122 let mut id_map = std::collections::HashMap::new();
1123 let mut counter = 0;
1124
1125 let snapshot_data: Vec<_> = events
1126 .iter()
1127 .map(|e| match e {
1128 EventChange::Created {
1129 id,
1130 parent_id,
1131 event,
1132 state,
1133 ..
1134 } => {
1135 let stable_id = id_map
1136 .entry(id.clone())
1137 .or_insert_with(|| {
1138 counter += 1;
1139 format!("event-{}", counter)
1140 })
1141 .clone();
1142
1143 let stable_parent_id = parent_id.as_ref().map(|p| {
1144 id_map
1145 .entry(p.clone())
1146 .or_insert_with(|| {
1147 counter += 1;
1148 format!("event-{}", counter)
1149 })
1150 .clone()
1151 });
1152
1153 format!(
1154 "Created: id={}, parent={:?}, event={:?}, state={:?}",
1155 stable_id, stable_parent_id, event, state
1156 )
1157 }
1158 EventChange::Updated { id, event, .. } => {
1159 let stable_id = id_map
1160 .entry(id.clone())
1161 .or_insert_with(|| {
1162 counter += 1;
1163 format!("event-{}", counter)
1164 })
1165 .clone();
1166
1167 format!("Updated: id={}, event={:?}", stable_id, event)
1168 }
1169 EventChange::StateChanged { id, new_state, .. } => {
1170 let stable_id = id_map
1171 .entry(id.clone())
1172 .or_insert_with(|| {
1173 counter += 1;
1174 format!("event-{}", counter)
1175 })
1176 .clone();
1177
1178 format!("StateChanged: id={}, new_state={:?}", stable_id, new_state)
1179 }
1180 })
1181 .collect();
1182
1183 assert_debug_snapshot!(snapshot_data);
1184 }
1185
1186 #[tokio::test]
1187 async fn test_multi_tenancy_with_http_server() {
1188 use axum::{http::header::HeaderMap, http::StatusCode, response::Response};
1189 use futures::future::join_all;
1190 use std::collections::HashMap;
1191 use tokio::sync::mpsc;
1192
1193 // Create a channel to collect all events from all tenants
1194 let (event_tx, mut event_rx) = mpsc::unbounded_channel::<(String, EventChange)>();
1195
1196 // Create a test handler that captures events with tenant context
1197 #[derive(Clone)]
1198 struct TenantAwareEventHandler {
1199 tenant_id: String,
1200 sender: mpsc::UnboundedSender<(String, EventChange)>,
1201 }
1202
1203 #[async_trait]
1204 impl EventHandler for TenantAwareEventHandler {
1205 async fn on_event_change(&self, change: EventChange) -> Result<()> {
1206 let _ = self.sender.send((self.tenant_id.clone(), change));
1207 Ok(())
1208 }
1209 }
1210
1211 // Helper function to simulate business logic that emits events
1212 let business_logic = {
1213 let event_tx = event_tx.clone();
1214 move |tenant_id: String| {
1215 let event_tx = event_tx.clone();
1216 async move {
1217 // Create a handler specific to this tenant
1218 let handler = Arc::new(TenantAwareEventHandler {
1219 tenant_id: tenant_id.clone(),
1220 sender: event_tx,
1221 });
1222
1223 // Create event bus for this tenant
1224 let bus = EventBus::with_handlers(vec![handler]);
1225
1226 bus.run(|| async {
1227 // Main scoped event
1228 AlienEvent::TestBuildingStack {
1229 stack: format!("tenant-{}-stack", tenant_id),
1230 }
1231 .in_scope(|_handle| async move {
1232 // First nested event
1233 AlienEvent::TestBuildImage {
1234 image: format!("tenant-{}-api", tenant_id),
1235 stage: "compile".to_string(),
1236 }
1237 .emit()
1238 .await
1239 .map_err(|e| {
1240 AlienError::new(ErrorData::GenericError {
1241 message: e.to_string(),
1242 })
1243 })?;
1244
1245 // Call another function that emits more events
1246 deploy_service(&tenant_id).await?;
1247
1248 Ok::<_, AlienError<ErrorData>>(format!(
1249 "Success for tenant {}",
1250 tenant_id
1251 ))
1252 })
1253 .await
1254 })
1255 .await
1256 }
1257 }
1258 };
1259
1260 // Another function that emits events
1261 async fn deploy_service(tenant_id: &str) -> Result<()> {
1262 AlienEvent::TestDeployingStack {
1263 stack: format!("tenant-{}-deployment", tenant_id),
1264 }
1265 .emit()
1266 .await?;
1267
1268 AlienEvent::TestCreatingResource {
1269 resource_type: "LoadBalancer".to_string(),
1270 resource_name: format!("tenant-{}-lb", tenant_id),
1271 details: Some("Multi-tenant load balancer".to_string()),
1272 }
1273 .emit()
1274 .await?;
1275
1276 Ok(())
1277 }
1278
1279 // Create HTTP endpoint handler
1280 let hello_handler = {
1281 let business_logic = business_logic.clone();
1282 move |headers: HeaderMap| {
1283 let business_logic = business_logic.clone();
1284 async move {
1285 let tenant_id = headers
1286 .get("x-tenant-id")
1287 .and_then(|v| v.to_str().ok())
1288 .unwrap_or("default")
1289 .to_string();
1290
1291 match business_logic(tenant_id.clone()).await {
1292 Ok(result) => Response::builder()
1293 .status(StatusCode::OK)
1294 .body(format!(
1295 "Hello from tenant {}! Result: {}",
1296 tenant_id, result
1297 ))
1298 .unwrap(),
1299 Err(e) => Response::builder()
1300 .status(StatusCode::INTERNAL_SERVER_ERROR)
1301 .body(format!("Error for tenant {}: {}", tenant_id, e))
1302 .unwrap(),
1303 }
1304 }
1305 }
1306 };
1307
1308 // Smoke-test the handler without binding a socket (CI environments can forbid it).
1309 let mut headers = HeaderMap::new();
1310 headers.insert("x-tenant-id", "tenant-0000".parse().unwrap());
1311 let _ = hello_handler(headers).await;
1312
1313 // Create 10000 concurrent simulated requests with unique tenant IDs
1314 // Instead of actual HTTP requests, we'll directly call the business logic
1315 let mut request_futures = Vec::new();
1316
1317 for i in 0..10000 {
1318 let tenant_id = format!("tenant-{:04}", i);
1319 let business_logic = business_logic.clone();
1320
1321 let request_future = async move {
1322 match business_logic(tenant_id.clone()).await {
1323 Ok(_) => tenant_id,
1324 Err(e) => panic!("Request failed for tenant {}: {}", tenant_id, e),
1325 }
1326 };
1327
1328 request_futures.push(request_future);
1329 }
1330
1331 // Execute all requests concurrently
1332 let completed_tenants = join_all(request_futures).await;
1333 assert_eq!(completed_tenants.len(), 10000);
1334
1335 // Collect all events
1336 let mut all_events: Vec<(String, EventChange)> = Vec::new();
1337
1338 // Give some time for all events to be processed
1339 tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
1340
1341 // Drain the event receiver
1342 while let Ok(event) = event_rx.try_recv() {
1343 all_events.push(event);
1344 }
1345
1346 // Verify we received events
1347 assert!(!all_events.is_empty(), "No events were received");
1348
1349 // Group events by tenant
1350 let mut events_by_tenant: HashMap<String, Vec<EventChange>> = HashMap::new();
1351 for (tenant_id, event) in all_events.clone() {
1352 events_by_tenant.entry(tenant_id).or_default().push(event);
1353 }
1354
1355 // Verify we have events for all tenants
1356 assert_eq!(
1357 events_by_tenant.len(),
1358 10000,
1359 "Expected events for 10000 tenants, got {}",
1360 events_by_tenant.len()
1361 );
1362
1363 // Verify each tenant has the expected number of events
1364 for (tenant_id, events) in &events_by_tenant {
1365 // Each tenant should have:
1366 // 1. BuildingStack (created + state changed to started + state changed to success)
1367 // 2. BuildImage (created)
1368 // 3. DeployingStack (created)
1369 // 4. CreatingResource (created)
1370 // Total: 6 events minimum
1371 assert!(
1372 events.len() >= 4,
1373 "Tenant {} has {} events, expected at least 4",
1374 tenant_id,
1375 events.len()
1376 );
1377
1378 // Verify tenant-specific data in events
1379 for event in events {
1380 if let EventChange::Created {
1381 event: alien_event, ..
1382 } = event
1383 {
1384 match alien_event {
1385 AlienEvent::TestBuildingStack { stack } => {
1386 assert!(
1387 stack.contains(tenant_id),
1388 "Stack name '{}' should contain tenant ID '{}'",
1389 stack,
1390 tenant_id
1391 );
1392 }
1393 AlienEvent::TestBuildImage { image, .. } => {
1394 assert!(
1395 image.contains(tenant_id),
1396 "Image name '{}' should contain tenant ID '{}'",
1397 image,
1398 tenant_id
1399 );
1400 }
1401 AlienEvent::TestDeployingStack { stack } => {
1402 assert!(
1403 stack.contains(tenant_id),
1404 "Deployment stack '{}' should contain tenant ID '{}'",
1405 stack,
1406 tenant_id
1407 );
1408 }
1409 AlienEvent::TestCreatingResource { resource_name, .. } => {
1410 assert!(
1411 resource_name.contains(tenant_id),
1412 "Resource name '{}' should contain tenant ID '{}'",
1413 resource_name,
1414 tenant_id
1415 );
1416 }
1417 _ => {}
1418 }
1419 }
1420 }
1421 }
1422
1423 // Verify event hierarchy for a sample tenant
1424 let sample_tenant = "tenant-0000";
1425 let sample_events = events_by_tenant.get(sample_tenant).unwrap();
1426
1427 // Find the parent BuildingStack event
1428 let parent_event = sample_events
1429 .iter()
1430 .find(|e| {
1431 matches!(
1432 e,
1433 EventChange::Created {
1434 event: AlienEvent::TestBuildingStack { .. },
1435 ..
1436 }
1437 )
1438 })
1439 .expect("Should have TestBuildingStack event");
1440
1441 if let EventChange::Created { id: parent_id, .. } = parent_event {
1442 // Verify child events reference the parent
1443 let child_events: Vec<_> = sample_events.iter().filter(|e| {
1444 matches!(e, EventChange::Created { parent_id: Some(pid), .. } if pid == parent_id)
1445 }).collect();
1446
1447 assert!(
1448 !child_events.is_empty(),
1449 "Should have child events for parent {}",
1450 parent_id
1451 );
1452 }
1453
1454 println!("✅ Multi-tenancy test passed!");
1455 println!(" - Processed 10000 concurrent requests");
1456 println!(" - Verified {} unique tenants", events_by_tenant.len());
1457 println!(" - Total events captured: {}", all_events.len());
1458 println!(
1459 " - Average events per tenant: {:.1}",
1460 all_events.len() as f64 / events_by_tenant.len() as f64
1461 );
1462 }
1463
1464 #[tokio::test]
1465 async fn test_handler_failure() {
1466 // Create a handler that always fails
1467 struct FailingHandler;
1468
1469 #[async_trait]
1470 impl EventHandler for FailingHandler {
1471 async fn on_event_change(&self, _change: EventChange) -> Result<()> {
1472 Err(AlienError::new(ErrorData::GenericError {
1473 message: "Handler intentionally failed".to_string(),
1474 }))
1475 }
1476 }
1477
1478 let failing_handler = Arc::new(FailingHandler);
1479 let bus = EventBus::with_handlers(vec![failing_handler]);
1480
1481 let result = bus
1482 .run(|| async {
1483 // Try to emit an event - this should fail because the handler fails
1484 AlienEvent::TestBuildingStack {
1485 stack: "test-stack".to_string(),
1486 }
1487 .emit()
1488 .await
1489 })
1490 .await;
1491
1492 // Verify that the event emission failed due to handler failure
1493 assert!(result.is_err());
1494 let error = result.unwrap_err();
1495 assert!(error.to_string().contains("Event handler failed"));
1496 assert!(error.to_string().contains("Handler intentionally failed"));
1497 }
1498
1499 #[tokio::test]
1500 async fn test_mixed_handlers_one_fails() {
1501 // Create one successful handler and one failing handler
1502 let successful_handler = TestEventHandler::new();
1503
1504 struct FailingHandler;
1505
1506 #[async_trait]
1507 impl EventHandler for FailingHandler {
1508 async fn on_event_change(&self, _change: EventChange) -> Result<()> {
1509 Err(AlienError::new(ErrorData::GenericError {
1510 message: "Second handler failed".to_string(),
1511 }))
1512 }
1513 }
1514
1515 let failing_handler = Arc::new(FailingHandler);
1516 let bus =
1517 EventBus::with_handlers(vec![Arc::new(successful_handler.clone()), failing_handler]);
1518
1519 let result = bus
1520 .run(|| async {
1521 // Try to emit an event - this should fail because one handler fails
1522 AlienEvent::TestBuildingStack {
1523 stack: "test-stack".to_string(),
1524 }
1525 .emit()
1526 .await
1527 })
1528 .await;
1529
1530 // Verify that the event emission failed
1531 assert!(result.is_err());
1532 let error = result.unwrap_err();
1533 assert!(error.to_string().contains("Event handler failed"));
1534 assert!(error.to_string().contains("Second handler failed"));
1535
1536 // The successful handler should have been called before the failing one
1537 let events = successful_handler.events();
1538 assert_eq!(events.len(), 1);
1539 match &events[0] {
1540 EventChange::Created { event, .. } => {
1541 assert!(
1542 matches!(event, AlienEvent::TestBuildingStack { stack } if stack == "test-stack")
1543 );
1544 }
1545 _ => panic!("Expected Created event"),
1546 }
1547 }
1548
1549 #[tokio::test]
1550 async fn test_handler_failure_in_scoped_event() {
1551 struct FailingHandler;
1552
1553 #[async_trait]
1554 impl EventHandler for FailingHandler {
1555 async fn on_event_change(&self, _change: EventChange) -> Result<()> {
1556 Err(AlienError::new(ErrorData::GenericError {
1557 message: "Handler failed during scoped event".to_string(),
1558 }))
1559 }
1560 }
1561
1562 let failing_handler = Arc::new(FailingHandler);
1563 let bus = EventBus::with_handlers(vec![failing_handler]);
1564
1565 let result = bus
1566 .run(|| async {
1567 // Try to use in_scope - the initial event emission will fail, but in_scope
1568 // is designed to be fault-tolerant and will continue with a no-op handle
1569 AlienEvent::TestBuildingStack {
1570 stack: "test-stack".to_string(),
1571 }
1572 .in_scope(|handle| async move {
1573 // This code will be reached because in_scope is fault-tolerant
1574 // The handle should be a no-op handle
1575 assert!(handle.is_noop);
1576 Ok::<_, AlienError<ErrorData>>(42)
1577 })
1578 .await
1579 })
1580 .await;
1581
1582 // Verify that the scoped event succeeded despite handler failure
1583 // This demonstrates the fault-tolerant design of the event system
1584 assert!(result.is_ok());
1585 assert_eq!(result.unwrap(), 42);
1586 }
1587
1588 #[tokio::test]
1589 async fn test_handler_failure_during_update() {
1590 // Create a handler that succeeds on creation but fails on updates
1591 struct UpdateFailingHandler {
1592 call_count: Arc<Mutex<usize>>,
1593 }
1594
1595 #[async_trait]
1596 impl EventHandler for UpdateFailingHandler {
1597 async fn on_event_change(&self, change: EventChange) -> Result<()> {
1598 let mut count = self.call_count.lock().unwrap();
1599 *count += 1;
1600
1601 match change {
1602 EventChange::Created { .. } => Ok(()), // Allow creation
1603 EventChange::Updated { .. } => {
1604 // Fail on updates
1605 Err(AlienError::new(ErrorData::GenericError {
1606 message: "Handler failed during update".to_string(),
1607 }))
1608 }
1609 EventChange::StateChanged { .. } => Ok(()), // Allow state changes
1610 }
1611 }
1612 }
1613
1614 let call_count = Arc::new(Mutex::new(0));
1615 let handler = Arc::new(UpdateFailingHandler {
1616 call_count: call_count.clone(),
1617 });
1618 let bus = EventBus::with_handlers(vec![handler]);
1619
1620 let result = bus
1621 .run(|| async {
1622 // Create an event - this should succeed
1623 let handle = AlienEvent::TestBuildImage {
1624 image: "test-image".to_string(),
1625 stage: "stage1".to_string(),
1626 }
1627 .emit()
1628 .await?;
1629
1630 // Try to update the event - this should fail
1631 handle
1632 .update(AlienEvent::TestBuildImage {
1633 image: "test-image".to_string(),
1634 stage: "stage2".to_string(),
1635 })
1636 .await
1637 })
1638 .await;
1639
1640 // Verify that the update failed
1641 assert!(result.is_err());
1642 let error = result.unwrap_err();
1643 assert!(error.to_string().contains("Event handler failed"));
1644 assert!(error.to_string().contains("Handler failed during update"));
1645
1646 // Verify the handler was called twice (once for create, once for failed update)
1647 assert_eq!(*call_count.lock().unwrap(), 2);
1648 }
1649
1650 #[tokio::test]
1651 async fn test_alien_event_macro() {
1652 use crate::alien_event;
1653
1654 let (result, handler) = with_test_bus(|| async {
1655 // Test the macro with a simple function
1656 #[alien_event(AlienEvent::TestBuildingStack { stack: "macro-test".to_string() })]
1657 async fn test_macro_function() -> Result<String> {
1658 // Emit a child event
1659 AlienEvent::TestBuildImage {
1660 image: "test-image".to_string(),
1661 stage: "compile".to_string(),
1662 }
1663 .emit()
1664 .await?;
1665
1666 Ok("success".to_string())
1667 }
1668
1669 test_macro_function().await
1670 })
1671 .await;
1672
1673 // Verify the function succeeded
1674 assert!(result.is_ok());
1675 assert_eq!(result.unwrap(), "success");
1676
1677 let events = handler.events();
1678
1679 // Should have: parent created (started), child created, parent state changed to success
1680 assert!(events.len() >= 3);
1681
1682 // Verify parent started
1683 match &events[0] {
1684 EventChange::Created { state, event, .. } => {
1685 assert_eq!(state, &EventState::Started);
1686 assert!(
1687 matches!(event, AlienEvent::TestBuildingStack { stack } if stack == "macro-test")
1688 );
1689 }
1690 _ => panic!("Expected Created event"),
1691 }
1692
1693 // Verify child event
1694 let child_event = events.iter().find(|e| {
1695 matches!(
1696 e,
1697 EventChange::Created {
1698 event: AlienEvent::TestBuildImage { .. },
1699 ..
1700 }
1701 )
1702 });
1703 assert!(child_event.is_some());
1704
1705 // Verify parent completed successfully
1706 let success_event = events.iter().find(|e| {
1707 matches!(
1708 e,
1709 EventChange::StateChanged {
1710 new_state: EventState::Success,
1711 ..
1712 }
1713 )
1714 });
1715 assert!(success_event.is_some());
1716 }
1717
1718 #[tokio::test]
1719 async fn test_alien_event_macro_with_failure() {
1720 use crate::alien_event;
1721
1722 let (result, handler) = with_test_bus(|| async {
1723 // Test the macro with a function that fails
1724 #[alien_event(AlienEvent::TestBuildingStack { stack: "macro-fail-test".to_string() })]
1725 async fn test_macro_failure() -> Result<String> {
1726 // Emit a child event
1727 AlienEvent::TestBuildImage {
1728 image: "test-image".to_string(),
1729 stage: "compile".to_string(),
1730 }
1731 .emit()
1732 .await?;
1733
1734 // Then fail
1735 Err(AlienError::new(ErrorData::GenericError {
1736 message: "Macro test failure".to_string(),
1737 }))
1738 }
1739
1740 test_macro_failure().await
1741 })
1742 .await;
1743
1744 // Verify the function failed
1745 assert!(result.is_err());
1746
1747 let events = handler.events();
1748
1749 // Verify parent failed
1750 let failure_event = events.iter().find(|e| {
1751 matches!(
1752 e,
1753 EventChange::StateChanged {
1754 new_state: EventState::Failed { .. },
1755 ..
1756 }
1757 )
1758 });
1759 assert!(failure_event.is_some());
1760 }
1761
1762 #[tokio::test]
1763 async fn test_alien_event_macro_with_dynamic_values() {
1764 use crate::alien_event;
1765
1766 let (result, handler) = with_test_bus(|| async {
1767 // Test the macro with dynamic values
1768 async fn test_with_id(id: u32) -> Result<String> {
1769 #[alien_event(AlienEvent::TestBuildingStack { stack: format!("stack-{}", id) })]
1770 async fn inner_function(id: u32) -> Result<String> {
1771 Ok(format!("processed-{}", id))
1772 }
1773
1774 inner_function(id).await
1775 }
1776
1777 test_with_id(42).await
1778 })
1779 .await;
1780
1781 // Verify the function succeeded
1782 assert!(result.is_ok());
1783 assert_eq!(result.unwrap(), "processed-42");
1784
1785 let events = handler.events();
1786
1787 // Verify the dynamic stack name
1788 match &events[0] {
1789 EventChange::Created { event, .. } => {
1790 assert!(
1791 matches!(event, AlienEvent::TestBuildingStack { stack } if stack == "stack-42")
1792 );
1793 }
1794 _ => panic!("Expected Created event"),
1795 }
1796 }
1797
1798 #[tokio::test]
1799 async fn test_handler_failure_during_state_change() {
1800 // Create a handler that succeeds on creation but fails on state changes
1801 struct StateChangeFailingHandler {
1802 call_count: Arc<Mutex<usize>>,
1803 }
1804
1805 #[async_trait]
1806 impl EventHandler for StateChangeFailingHandler {
1807 async fn on_event_change(&self, change: EventChange) -> Result<()> {
1808 let mut count = self.call_count.lock().unwrap();
1809 *count += 1;
1810
1811 match change {
1812 EventChange::Created { .. } => Ok(()), // Allow creation
1813 EventChange::Updated { .. } => Ok(()), // Allow updates
1814 EventChange::StateChanged { .. } => {
1815 // Fail on state changes
1816 Err(AlienError::new(ErrorData::GenericError {
1817 message: "Handler failed during state change".to_string(),
1818 }))
1819 }
1820 }
1821 }
1822 }
1823
1824 let call_count = Arc::new(Mutex::new(0));
1825 let handler = Arc::new(StateChangeFailingHandler {
1826 call_count: call_count.clone(),
1827 });
1828 let bus = EventBus::with_handlers(vec![handler]);
1829
1830 let result = bus
1831 .run(|| async {
1832 // Create an event - this should succeed
1833 let handle = AlienEvent::TestBuildImage {
1834 image: "test-image".to_string(),
1835 stage: "stage1".to_string(),
1836 }
1837 .emit()
1838 .await?;
1839
1840 // Try to complete the event - this should fail due to state change handler failure
1841 handle.complete().await
1842 })
1843 .await;
1844
1845 // Verify that the state change failed
1846 assert!(result.is_err());
1847 let error = result.unwrap_err();
1848 assert!(error.to_string().contains("Event handler failed"));
1849 assert!(error
1850 .to_string()
1851 .contains("Handler failed during state change"));
1852
1853 // Verify the handler was called twice (once for create, once for failed state change)
1854 assert_eq!(*call_count.lock().unwrap(), 2);
1855 }
1856}