mod event;
pub use event::*;
mod handler;
pub use handler::*;
mod state;
pub use state::*;
mod handle;
pub use handle::*;
mod bus;
pub use bus::*;
#[cfg(test)]
mod tests {
use crate::events::{AlienEvent, EventBus, EventChange, EventHandler, EventState};
use crate::{ErrorData, Result};
use alien_error::{AlienError, GenericError};
use async_trait::async_trait;
use insta::assert_debug_snapshot;
use rstest::*;
use std::sync::{Arc, Mutex};
#[derive(Debug, Clone)]
struct TestEventHandler {
events: Arc<Mutex<Vec<EventChange>>>,
}
impl TestEventHandler {
fn new() -> Self {
Self {
events: Arc::new(Mutex::new(Vec::new())),
}
}
fn events(&self) -> Vec<EventChange> {
self.events.lock().unwrap().clone()
}
#[allow(dead_code)]
fn clear(&self) {
self.events.lock().unwrap().clear();
}
}
#[async_trait]
impl EventHandler for TestEventHandler {
async fn on_event_change(&self, change: EventChange) -> Result<()> {
self.events.lock().unwrap().push(change);
Ok(())
}
}
async fn with_test_bus<F, Fut, R>(f: F) -> (R, TestEventHandler)
where
F: FnOnce() -> Fut,
Fut: std::future::Future<Output = R>,
{
let handler = TestEventHandler::new();
let bus = EventBus::with_handlers(vec![Arc::new(handler.clone())]);
let result = bus.run(f).await;
(result, handler)
}
#[tokio::test]
async fn test_simple_event_emission() {
let (result, handler) = with_test_bus(|| async {
let handle = AlienEvent::TestBuildingStack {
stack: "test-stack".to_string(),
}
.emit()
.await
.unwrap();
assert!(!handle.id.is_empty());
handle
})
.await;
let events = handler.events();
assert_eq!(events.len(), 1);
match &events[0] {
EventChange::Created {
id,
parent_id,
event,
state,
..
} => {
assert_eq!(id, &result.id);
assert_eq!(parent_id, &None);
assert!(
matches!(event, AlienEvent::TestBuildingStack { stack } if stack == "test-stack")
);
assert_eq!(state, &EventState::None);
}
_ => panic!("Expected Created event"),
}
}
#[tokio::test]
async fn test_event_update() {
let (_, handler) = with_test_bus(|| async {
let handle = AlienEvent::TestBuildImage {
image: "api:latest".to_string(),
stage: "stage 1".to_string(),
}
.emit()
.await
.unwrap();
handle
.update(AlienEvent::TestBuildImage {
image: "api:latest".to_string(),
stage: "stage 2".to_string(),
})
.await
.unwrap();
handle
.update(AlienEvent::TestBuildImage {
image: "api:latest".to_string(),
stage: "stage 3".to_string(),
})
.await
.unwrap();
})
.await;
let events = handler.events();
assert_eq!(events.len(), 3);
match &events[1] {
EventChange::Updated { event, .. } => {
assert!(
matches!(event, AlienEvent::TestBuildImage { stage, .. } if stage == "stage 2")
);
}
_ => panic!("Expected Updated event"),
}
match &events[2] {
EventChange::Updated { event, .. } => {
assert!(
matches!(event, AlienEvent::TestBuildImage { stage, .. } if stage == "stage 3")
);
}
_ => panic!("Expected Updated event"),
}
}
#[tokio::test]
async fn test_scoped_event_success() {
let (result, handler) = with_test_bus(|| async {
AlienEvent::TestBuildingImage {
image: "api:latest".to_string(),
}
.in_scope(|_handle| async move {
AlienEvent::TestBuildImage {
image: "api:latest".to_string(),
stage: "compile".to_string(),
}
.emit()
.await
.unwrap();
AlienEvent::TestBuildImage {
image: "api:latest".to_string(),
stage: "link".to_string(),
}
.emit()
.await
.unwrap();
Ok::<_, AlienError<ErrorData>>(42)
})
.await
.unwrap()
})
.await;
assert_eq!(result, 42);
let events = handler.events();
assert!(events.len() >= 4);
match &events[0] {
EventChange::Created { state, .. } => {
assert_eq!(state, &EventState::Started);
}
_ => panic!("Expected Created event"),
}
let last_event = events.last().unwrap();
match last_event {
EventChange::StateChanged { new_state, .. } => {
assert_eq!(new_state, &EventState::Success);
}
_ => panic!("Expected StateChanged event"),
}
}
#[tokio::test]
async fn test_scoped_event_failure() {
let (result, handler) = with_test_bus(|| async {
AlienEvent::TestBuildingImage {
image: "api:latest".to_string(),
}
.in_scope(|_handle| async move {
AlienEvent::TestBuildImage {
image: "api:latest".to_string(),
stage: "compile".to_string(),
}
.emit()
.await
.unwrap();
Err::<(), _>(AlienError::new(ErrorData::GenericError {
message: "Test error".to_string(),
}))
})
.await
})
.await;
assert!(result.is_err());
let events = handler.events();
let last_event = events.last().unwrap();
match last_event {
EventChange::StateChanged { new_state, .. } => {
assert!(matches!(new_state, EventState::Failed { .. }));
}
_ => panic!("Expected StateChanged event"),
}
}
#[tokio::test]
async fn test_deep_hierarchy() {
let (_, handler) = with_test_bus(|| async {
AlienEvent::TestBuildingStack {
stack: "root-stack".to_string(),
}
.in_scope(|_| async {
AlienEvent::TestBuildingImage {
image: "app1".to_string(),
}
.in_scope(|_| async {
AlienEvent::TestBuildImage {
image: "app1".to_string(),
stage: "compile".to_string(),
}
.in_scope(|_| async {
AlienEvent::TestBuildImage {
image: "app1".to_string(),
stage: "optimize".to_string(),
}
.emit()
.await
.unwrap();
Ok::<_, AlienError<ErrorData>>(())
})
.await
.unwrap();
Ok::<_, AlienError<ErrorData>>(())
})
.await
.unwrap();
AlienEvent::TestBuildingImage {
image: "app2".to_string(),
}
.emit()
.await
.unwrap();
Ok::<_, AlienError<ErrorData>>(())
})
.await
.unwrap()
})
.await;
let events = handler.events();
let mut hierarchy = Vec::new();
let mut id_map = std::collections::HashMap::new();
let mut counter = 0;
for event in &events {
match event {
EventChange::Created {
id,
parent_id,
event,
..
} => {
let stable_id = id_map
.entry(id.clone())
.or_insert_with(|| {
counter += 1;
format!("event-{}", counter)
})
.clone();
let stable_parent_id = parent_id.as_ref().map(|p| {
id_map
.entry(p.clone())
.or_insert_with(|| {
counter += 1;
format!("event-{}", counter)
})
.clone()
});
hierarchy.push((stable_id, stable_parent_id, format!("{:?}", event)));
}
_ => {}
}
}
assert_debug_snapshot!(hierarchy);
}
#[tokio::test]
async fn test_wide_hierarchy() {
let (_, handler) = with_test_bus(|| async {
let parent = AlienEvent::TestBuildingStack {
stack: "wide-stack".to_string(),
}
.emit()
.await
.unwrap();
parent
.as_parent(|_| async {
for i in 0..10 {
AlienEvent::TestBuildImage {
image: format!("image-{}", i),
stage: "build".to_string(),
}
.emit()
.await
.unwrap();
}
Ok::<_, ErrorData>(())
})
.await
.unwrap();
parent.complete().await.unwrap();
})
.await;
let events = handler.events();
let children_count = events
.iter()
.filter(|e| {
matches!(
e,
EventChange::Created {
parent_id: Some(_),
..
}
)
})
.count();
assert_eq!(children_count, 10);
}
#[tokio::test]
async fn test_durable_execution_simulation() {
let (_, handler) = with_test_bus(|| async {
let parent_handle = AlienEvent::TestBuildingStack {
stack: "durable-stack".to_string(),
}
.emit_with_state(EventState::Started)
.await
.unwrap();
let parent_id = parent_handle.id.clone();
parent_handle
.as_parent(|_| async {
AlienEvent::TestBuildImage {
image: "api:latest".to_string(),
stage: "compile".to_string(),
}
.emit()
.await
.unwrap();
Ok::<_, ErrorData>(())
})
.await
.unwrap();
parent_handle
.as_parent(|_| async {
AlienEvent::TestPushImage {
image: "api:latest".to_string(),
}
.emit()
.await
.unwrap();
Ok::<_, ErrorData>(())
})
.await
.unwrap();
parent_handle.complete().await.unwrap();
parent_id
})
.await;
let events = handler.events();
assert!(events.len() >= 4);
let created_events: Vec<_> = events
.iter()
.filter_map(|e| match e {
EventChange::Created { id, parent_id, .. } => Some((id.clone(), parent_id.clone())),
_ => None,
})
.collect();
assert_eq!(created_events.len(), 3); }
#[tokio::test]
async fn test_manual_state_management() {
let (_, handler) = with_test_bus(|| async {
let handle = AlienEvent::TestBuildingStack {
stack: "manual-stack".to_string(),
}
.emit_with_state(EventState::Started)
.await
.unwrap();
handle
.fail(AlienError::new(GenericError {
message: "Something went wrong".to_string(),
}))
.await
.unwrap();
})
.await;
let events = handler.events();
match &events[0] {
EventChange::Created { state, .. } => {
assert_eq!(state, &EventState::Started);
}
_ => panic!("Expected Created event"),
}
match &events[1] {
EventChange::StateChanged { new_state, .. } => match new_state {
EventState::Failed { error } => {
let error = error.as_ref().expect("Expected error to be present");
assert_eq!(error.message, "Something went wrong");
}
_ => panic!("Expected Failed state"),
},
_ => panic!("Expected StateChanged event"),
}
}
#[tokio::test]
async fn test_concurrent_events() {
let (_, handler) = with_test_bus(|| async {
let handles = tokio::join!(
AlienEvent::TestBuildImage {
image: "image1".to_string(),
stage: "build".to_string(),
}
.emit(),
AlienEvent::TestBuildImage {
image: "image2".to_string(),
stage: "build".to_string(),
}
.emit(),
AlienEvent::TestBuildImage {
image: "image3".to_string(),
stage: "build".to_string(),
}
.emit(),
);
assert!(handles.0.is_ok());
assert!(handles.1.is_ok());
assert!(handles.2.is_ok());
})
.await;
let events = handler.events();
assert_eq!(events.len(), 3);
}
#[tokio::test]
async fn test_nested_scopes_with_errors() {
let (result, handler) = with_test_bus(|| async {
AlienEvent::TestBuildingStack {
stack: "nested-error-stack".to_string(),
}
.in_scope(|_| async {
AlienEvent::TestBuildImage {
image: "image1".to_string(),
stage: "stage1".to_string(),
}
.emit()
.await
.unwrap();
let inner_result = AlienEvent::TestBuildImage {
image: "image2".to_string(),
stage: "stage2".to_string(),
}
.in_scope(|_| async {
Err::<(), _>(AlienError::new(ErrorData::GenericError {
message: "Inner error".to_string(),
}))
})
.await;
assert!(inner_result.is_err());
Ok::<_, AlienError<ErrorData>>("Outer succeeded")
})
.await
})
.await;
assert!(result.is_ok());
assert_eq!(result.unwrap(), "Outer succeeded");
let state_changes: Vec<_> = handler
.events()
.iter()
.filter_map(|e| match e {
EventChange::StateChanged { id, new_state, .. } => {
Some((id.clone(), new_state.clone()))
}
_ => None,
})
.collect();
assert!(state_changes
.iter()
.any(|(_, state)| matches!(state, EventState::Failed { .. })));
assert!(state_changes
.iter()
.any(|(_, state)| matches!(state, EventState::Success)));
}
#[tokio::test]
async fn test_no_event_bus_context() {
let result = AlienEvent::TestBuildingStack {
stack: "no-context".to_string(),
}
.emit()
.await;
assert!(result.is_ok());
let handle = result.unwrap();
assert!(handle.is_noop);
}
#[rstest]
#[case("stack1", "image1", "stage1")]
#[case("stack2", "image2", "stage2")]
#[case("stack3", "image3", "stage3")]
#[tokio::test]
async fn test_parameterized_events(
#[case] stack: &str,
#[case] image: &str,
#[case] stage: &str,
) {
let (_, handler) = with_test_bus(|| async {
AlienEvent::TestBuildingStack {
stack: stack.to_string(),
}
.in_scope(|_| async move {
AlienEvent::TestBuildImage {
image: image.to_string(),
stage: stage.to_string(),
}
.emit()
.await
.unwrap();
Ok::<_, AlienError<ErrorData>>(())
})
.await
.unwrap()
})
.await;
let events = handler.events();
assert!(events.len() >= 3); }
#[tokio::test]
async fn test_complex_workflow_snapshot() {
let (_, handler) = with_test_bus(|| async {
AlienEvent::TestBuildingStack {
stack: "production-stack".to_string(),
}
.in_scope(|_| async {
AlienEvent::TestBuildingImage {
image: "api-service".to_string(),
}
.in_scope(|_| async {
for stage in ["download-deps", "compile", "optimize", "build"] {
AlienEvent::TestBuildImage {
image: "api-service".to_string(),
stage: stage.to_string(),
}
.emit()
.await
.unwrap();
}
Ok::<_, AlienError<ErrorData>>(())
})
.await
.unwrap();
AlienEvent::TestPushImage {
image: "api-service".to_string(),
}
.emit()
.await
.unwrap();
AlienEvent::TestDeployingStack {
stack: "api-service".to_string(),
}
.in_scope(|_| async {
AlienEvent::TestCreatingResource {
resource_type: "LoadBalancer".to_string(),
resource_name: "api-lb".to_string(),
details: Some("Updating target groups".to_string()),
}
.emit()
.await
.unwrap();
AlienEvent::TestPerformingHealthCheck {
target: "https://api.example.com/health".to_string(),
check_type: "HTTP".to_string(),
}
.emit()
.await
.unwrap();
Ok::<_, AlienError<ErrorData>>(())
})
.await
.unwrap();
Ok::<_, AlienError<ErrorData>>(())
})
.await
.unwrap()
})
.await;
let events = handler.events();
let mut id_map = std::collections::HashMap::new();
let mut counter = 0;
let snapshot_data: Vec<_> = events
.iter()
.map(|e| match e {
EventChange::Created {
id,
parent_id,
event,
state,
..
} => {
let stable_id = id_map
.entry(id.clone())
.or_insert_with(|| {
counter += 1;
format!("event-{}", counter)
})
.clone();
let stable_parent_id = parent_id.as_ref().map(|p| {
id_map
.entry(p.clone())
.or_insert_with(|| {
counter += 1;
format!("event-{}", counter)
})
.clone()
});
format!(
"Created: id={}, parent={:?}, event={:?}, state={:?}",
stable_id, stable_parent_id, event, state
)
}
EventChange::Updated { id, event, .. } => {
let stable_id = id_map
.entry(id.clone())
.or_insert_with(|| {
counter += 1;
format!("event-{}", counter)
})
.clone();
format!("Updated: id={}, event={:?}", stable_id, event)
}
EventChange::StateChanged { id, new_state, .. } => {
let stable_id = id_map
.entry(id.clone())
.or_insert_with(|| {
counter += 1;
format!("event-{}", counter)
})
.clone();
format!("StateChanged: id={}, new_state={:?}", stable_id, new_state)
}
})
.collect();
assert_debug_snapshot!(snapshot_data);
}
#[tokio::test]
async fn test_multi_tenancy_with_http_server() {
use axum::{http::header::HeaderMap, http::StatusCode, response::Response};
use futures::future::join_all;
use std::collections::HashMap;
use tokio::sync::mpsc;
let (event_tx, mut event_rx) = mpsc::unbounded_channel::<(String, EventChange)>();
#[derive(Clone)]
struct TenantAwareEventHandler {
tenant_id: String,
sender: mpsc::UnboundedSender<(String, EventChange)>,
}
#[async_trait]
impl EventHandler for TenantAwareEventHandler {
async fn on_event_change(&self, change: EventChange) -> Result<()> {
let _ = self.sender.send((self.tenant_id.clone(), change));
Ok(())
}
}
let business_logic = {
let event_tx = event_tx.clone();
move |tenant_id: String| {
let event_tx = event_tx.clone();
async move {
let handler = Arc::new(TenantAwareEventHandler {
tenant_id: tenant_id.clone(),
sender: event_tx,
});
let bus = EventBus::with_handlers(vec![handler]);
bus.run(|| async {
AlienEvent::TestBuildingStack {
stack: format!("tenant-{}-stack", tenant_id),
}
.in_scope(|_handle| async move {
AlienEvent::TestBuildImage {
image: format!("tenant-{}-api", tenant_id),
stage: "compile".to_string(),
}
.emit()
.await
.map_err(|e| {
AlienError::new(ErrorData::GenericError {
message: e.to_string(),
})
})?;
deploy_service(&tenant_id).await?;
Ok::<_, AlienError<ErrorData>>(format!(
"Success for tenant {}",
tenant_id
))
})
.await
})
.await
}
}
};
async fn deploy_service(tenant_id: &str) -> Result<()> {
AlienEvent::TestDeployingStack {
stack: format!("tenant-{}-deployment", tenant_id),
}
.emit()
.await?;
AlienEvent::TestCreatingResource {
resource_type: "LoadBalancer".to_string(),
resource_name: format!("tenant-{}-lb", tenant_id),
details: Some("Multi-tenant load balancer".to_string()),
}
.emit()
.await?;
Ok(())
}
let hello_handler = {
let business_logic = business_logic.clone();
move |headers: HeaderMap| {
let business_logic = business_logic.clone();
async move {
let tenant_id = headers
.get("x-tenant-id")
.and_then(|v| v.to_str().ok())
.unwrap_or("default")
.to_string();
match business_logic(tenant_id.clone()).await {
Ok(result) => Response::builder()
.status(StatusCode::OK)
.body(format!(
"Hello from tenant {}! Result: {}",
tenant_id, result
))
.unwrap(),
Err(e) => Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(format!("Error for tenant {}: {}", tenant_id, e))
.unwrap(),
}
}
}
};
let mut headers = HeaderMap::new();
headers.insert("x-tenant-id", "tenant-0000".parse().unwrap());
let _ = hello_handler(headers).await;
let mut request_futures = Vec::new();
for i in 0..10000 {
let tenant_id = format!("tenant-{:04}", i);
let business_logic = business_logic.clone();
let request_future = async move {
match business_logic(tenant_id.clone()).await {
Ok(_) => tenant_id,
Err(e) => panic!("Request failed for tenant {}: {}", tenant_id, e),
}
};
request_futures.push(request_future);
}
let completed_tenants = join_all(request_futures).await;
assert_eq!(completed_tenants.len(), 10000);
let mut all_events: Vec<(String, EventChange)> = Vec::new();
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
while let Ok(event) = event_rx.try_recv() {
all_events.push(event);
}
assert!(!all_events.is_empty(), "No events were received");
let mut events_by_tenant: HashMap<String, Vec<EventChange>> = HashMap::new();
for (tenant_id, event) in all_events.clone() {
events_by_tenant.entry(tenant_id).or_default().push(event);
}
assert_eq!(
events_by_tenant.len(),
10000,
"Expected events for 10000 tenants, got {}",
events_by_tenant.len()
);
for (tenant_id, events) in &events_by_tenant {
assert!(
events.len() >= 4,
"Tenant {} has {} events, expected at least 4",
tenant_id,
events.len()
);
for event in events {
if let EventChange::Created {
event: alien_event, ..
} = event
{
match alien_event {
AlienEvent::TestBuildingStack { stack } => {
assert!(
stack.contains(tenant_id),
"Stack name '{}' should contain tenant ID '{}'",
stack,
tenant_id
);
}
AlienEvent::TestBuildImage { image, .. } => {
assert!(
image.contains(tenant_id),
"Image name '{}' should contain tenant ID '{}'",
image,
tenant_id
);
}
AlienEvent::TestDeployingStack { stack } => {
assert!(
stack.contains(tenant_id),
"Deployment stack '{}' should contain tenant ID '{}'",
stack,
tenant_id
);
}
AlienEvent::TestCreatingResource { resource_name, .. } => {
assert!(
resource_name.contains(tenant_id),
"Resource name '{}' should contain tenant ID '{}'",
resource_name,
tenant_id
);
}
_ => {}
}
}
}
}
let sample_tenant = "tenant-0000";
let sample_events = events_by_tenant.get(sample_tenant).unwrap();
let parent_event = sample_events
.iter()
.find(|e| {
matches!(
e,
EventChange::Created {
event: AlienEvent::TestBuildingStack { .. },
..
}
)
})
.expect("Should have TestBuildingStack event");
if let EventChange::Created { id: parent_id, .. } = parent_event {
let child_events: Vec<_> = sample_events.iter().filter(|e| {
matches!(e, EventChange::Created { parent_id: Some(pid), .. } if pid == parent_id)
}).collect();
assert!(
!child_events.is_empty(),
"Should have child events for parent {}",
parent_id
);
}
println!("✅ Multi-tenancy test passed!");
println!(" - Processed 10000 concurrent requests");
println!(" - Verified {} unique tenants", events_by_tenant.len());
println!(" - Total events captured: {}", all_events.len());
println!(
" - Average events per tenant: {:.1}",
all_events.len() as f64 / events_by_tenant.len() as f64
);
}
#[tokio::test]
async fn test_handler_failure() {
struct FailingHandler;
#[async_trait]
impl EventHandler for FailingHandler {
async fn on_event_change(&self, _change: EventChange) -> Result<()> {
Err(AlienError::new(ErrorData::GenericError {
message: "Handler intentionally failed".to_string(),
}))
}
}
let failing_handler = Arc::new(FailingHandler);
let bus = EventBus::with_handlers(vec![failing_handler]);
let result = bus
.run(|| async {
AlienEvent::TestBuildingStack {
stack: "test-stack".to_string(),
}
.emit()
.await
})
.await;
assert!(result.is_err());
let error = result.unwrap_err();
assert!(error.to_string().contains("Event handler failed"));
assert!(error.to_string().contains("Handler intentionally failed"));
}
#[tokio::test]
async fn test_mixed_handlers_one_fails() {
let successful_handler = TestEventHandler::new();
struct FailingHandler;
#[async_trait]
impl EventHandler for FailingHandler {
async fn on_event_change(&self, _change: EventChange) -> Result<()> {
Err(AlienError::new(ErrorData::GenericError {
message: "Second handler failed".to_string(),
}))
}
}
let failing_handler = Arc::new(FailingHandler);
let bus =
EventBus::with_handlers(vec![Arc::new(successful_handler.clone()), failing_handler]);
let result = bus
.run(|| async {
AlienEvent::TestBuildingStack {
stack: "test-stack".to_string(),
}
.emit()
.await
})
.await;
assert!(result.is_err());
let error = result.unwrap_err();
assert!(error.to_string().contains("Event handler failed"));
assert!(error.to_string().contains("Second handler failed"));
let events = successful_handler.events();
assert_eq!(events.len(), 1);
match &events[0] {
EventChange::Created { event, .. } => {
assert!(
matches!(event, AlienEvent::TestBuildingStack { stack } if stack == "test-stack")
);
}
_ => panic!("Expected Created event"),
}
}
#[tokio::test]
async fn test_handler_failure_in_scoped_event() {
struct FailingHandler;
#[async_trait]
impl EventHandler for FailingHandler {
async fn on_event_change(&self, _change: EventChange) -> Result<()> {
Err(AlienError::new(ErrorData::GenericError {
message: "Handler failed during scoped event".to_string(),
}))
}
}
let failing_handler = Arc::new(FailingHandler);
let bus = EventBus::with_handlers(vec![failing_handler]);
let result = bus
.run(|| async {
AlienEvent::TestBuildingStack {
stack: "test-stack".to_string(),
}
.in_scope(|handle| async move {
assert!(handle.is_noop);
Ok::<_, AlienError<ErrorData>>(42)
})
.await
})
.await;
assert!(result.is_ok());
assert_eq!(result.unwrap(), 42);
}
#[tokio::test]
async fn test_handler_failure_during_update() {
struct UpdateFailingHandler {
call_count: Arc<Mutex<usize>>,
}
#[async_trait]
impl EventHandler for UpdateFailingHandler {
async fn on_event_change(&self, change: EventChange) -> Result<()> {
let mut count = self.call_count.lock().unwrap();
*count += 1;
match change {
EventChange::Created { .. } => Ok(()), EventChange::Updated { .. } => {
Err(AlienError::new(ErrorData::GenericError {
message: "Handler failed during update".to_string(),
}))
}
EventChange::StateChanged { .. } => Ok(()), }
}
}
let call_count = Arc::new(Mutex::new(0));
let handler = Arc::new(UpdateFailingHandler {
call_count: call_count.clone(),
});
let bus = EventBus::with_handlers(vec![handler]);
let result = bus
.run(|| async {
let handle = AlienEvent::TestBuildImage {
image: "test-image".to_string(),
stage: "stage1".to_string(),
}
.emit()
.await?;
handle
.update(AlienEvent::TestBuildImage {
image: "test-image".to_string(),
stage: "stage2".to_string(),
})
.await
})
.await;
assert!(result.is_err());
let error = result.unwrap_err();
assert!(error.to_string().contains("Event handler failed"));
assert!(error.to_string().contains("Handler failed during update"));
assert_eq!(*call_count.lock().unwrap(), 2);
}
#[tokio::test]
async fn test_alien_event_macro() {
use crate::alien_event;
let (result, handler) = with_test_bus(|| async {
#[alien_event(AlienEvent::TestBuildingStack { stack: "macro-test".to_string() })]
async fn test_macro_function() -> Result<String> {
AlienEvent::TestBuildImage {
image: "test-image".to_string(),
stage: "compile".to_string(),
}
.emit()
.await?;
Ok("success".to_string())
}
test_macro_function().await
})
.await;
assert!(result.is_ok());
assert_eq!(result.unwrap(), "success");
let events = handler.events();
assert!(events.len() >= 3);
match &events[0] {
EventChange::Created { state, event, .. } => {
assert_eq!(state, &EventState::Started);
assert!(
matches!(event, AlienEvent::TestBuildingStack { stack } if stack == "macro-test")
);
}
_ => panic!("Expected Created event"),
}
let child_event = events.iter().find(|e| {
matches!(
e,
EventChange::Created {
event: AlienEvent::TestBuildImage { .. },
..
}
)
});
assert!(child_event.is_some());
let success_event = events.iter().find(|e| {
matches!(
e,
EventChange::StateChanged {
new_state: EventState::Success,
..
}
)
});
assert!(success_event.is_some());
}
#[tokio::test]
async fn test_alien_event_macro_with_failure() {
use crate::alien_event;
let (result, handler) = with_test_bus(|| async {
#[alien_event(AlienEvent::TestBuildingStack { stack: "macro-fail-test".to_string() })]
async fn test_macro_failure() -> Result<String> {
AlienEvent::TestBuildImage {
image: "test-image".to_string(),
stage: "compile".to_string(),
}
.emit()
.await?;
Err(AlienError::new(ErrorData::GenericError {
message: "Macro test failure".to_string(),
}))
}
test_macro_failure().await
})
.await;
assert!(result.is_err());
let events = handler.events();
let failure_event = events.iter().find(|e| {
matches!(
e,
EventChange::StateChanged {
new_state: EventState::Failed { .. },
..
}
)
});
assert!(failure_event.is_some());
}
#[tokio::test]
async fn test_alien_event_macro_with_dynamic_values() {
use crate::alien_event;
let (result, handler) = with_test_bus(|| async {
async fn test_with_id(id: u32) -> Result<String> {
#[alien_event(AlienEvent::TestBuildingStack { stack: format!("stack-{}", id) })]
async fn inner_function(id: u32) -> Result<String> {
Ok(format!("processed-{}", id))
}
inner_function(id).await
}
test_with_id(42).await
})
.await;
assert!(result.is_ok());
assert_eq!(result.unwrap(), "processed-42");
let events = handler.events();
match &events[0] {
EventChange::Created { event, .. } => {
assert!(
matches!(event, AlienEvent::TestBuildingStack { stack } if stack == "stack-42")
);
}
_ => panic!("Expected Created event"),
}
}
#[tokio::test]
async fn test_handler_failure_during_state_change() {
struct StateChangeFailingHandler {
call_count: Arc<Mutex<usize>>,
}
#[async_trait]
impl EventHandler for StateChangeFailingHandler {
async fn on_event_change(&self, change: EventChange) -> Result<()> {
let mut count = self.call_count.lock().unwrap();
*count += 1;
match change {
EventChange::Created { .. } => Ok(()), EventChange::Updated { .. } => Ok(()), EventChange::StateChanged { .. } => {
Err(AlienError::new(ErrorData::GenericError {
message: "Handler failed during state change".to_string(),
}))
}
}
}
}
let call_count = Arc::new(Mutex::new(0));
let handler = Arc::new(StateChangeFailingHandler {
call_count: call_count.clone(),
});
let bus = EventBus::with_handlers(vec![handler]);
let result = bus
.run(|| async {
let handle = AlienEvent::TestBuildImage {
image: "test-image".to_string(),
stage: "stage1".to_string(),
}
.emit()
.await?;
handle.complete().await
})
.await;
assert!(result.is_err());
let error = result.unwrap_err();
assert!(error.to_string().contains("Event handler failed"));
assert!(error
.to_string()
.contains("Handler failed during state change"));
assert_eq!(*call_count.lock().unwrap(), 2);
}
}