use blvm_node::module::api::events::EventManager;
use blvm_node::module::ipc::protocol::{EventMessage, EventPayload, ModuleMessage};
use blvm_node::module::traits::EventType;
use tokio::sync::mpsc;
use tokio::time::Duration;
#[tokio::test]
async fn test_event_manager_new() {
let manager = EventManager::new();
assert!(true);
}
#[tokio::test]
async fn test_event_manager_default() {
let manager = EventManager::default();
assert!(true);
}
#[tokio::test]
async fn test_event_manager_subscribe_module() {
let manager = EventManager::new();
let (tx, _rx) = mpsc::channel(10);
let event_types = vec![EventType::NewBlock, EventType::NewTransaction];
let result = manager
.subscribe_module("test-module".to_string(), event_types, tx)
.await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_event_manager_subscribe_multiple_event_types() {
let manager = EventManager::new();
let (tx, _rx) = mpsc::channel(10);
let event_types = vec![EventType::NewBlock, EventType::NewTransaction];
let result = manager
.subscribe_module("test-module".to_string(), event_types, tx)
.await;
assert!(result.is_ok());
let block_subscribers = manager.get_subscribers(EventType::NewBlock).await;
assert!(block_subscribers.contains(&"test-module".to_string()));
let tx_subscribers = manager.get_subscribers(EventType::NewTransaction).await;
assert!(tx_subscribers.contains(&"test-module".to_string()));
}
#[tokio::test]
async fn test_event_manager_unsubscribe_module() {
let manager = EventManager::new();
let (tx, _rx) = mpsc::channel(10);
let event_types = vec![EventType::NewBlock];
manager
.subscribe_module("test-module".to_string(), event_types, tx)
.await
.unwrap();
let result = manager.unsubscribe_module("test-module").await;
assert!(result.is_ok());
let subscribers = manager.get_subscribers(EventType::NewBlock).await;
assert!(!subscribers.contains(&"test-module".to_string()));
}
#[tokio::test]
async fn test_event_manager_publish_event_no_subscribers() {
let manager = EventManager::new();
use blvm_protocol::Hash;
let result = manager
.publish_event(
EventType::NewBlock,
EventPayload::NewBlock {
block_hash: Hash::default(),
height: 0,
},
)
.await;
assert!(result.is_ok()); }
#[tokio::test]
async fn test_event_manager_publish_event_with_subscribers() {
let manager = EventManager::new();
let (tx, mut rx) = mpsc::channel(10);
let event_types = vec![EventType::NewBlock];
manager
.subscribe_module("test-module".to_string(), event_types, tx)
.await
.unwrap();
use blvm_protocol::Hash;
let result = manager
.publish_event(
EventType::NewBlock,
EventPayload::NewBlock {
block_hash: Hash::default(),
height: 100,
},
)
.await;
assert!(result.is_ok());
tokio::time::sleep(Duration::from_millis(10)).await;
let received = tokio::time::timeout(Duration::from_secs(1), rx.recv()).await;
assert!(received.is_ok());
let msg_opt = received.unwrap();
assert!(msg_opt.is_some());
if let Some(ModuleMessage::Event(EventMessage {
event_type,
payload,
})) = msg_opt
{
assert_eq!(event_type, EventType::NewBlock);
match payload {
EventPayload::NewBlock {
block_hash: _,
height,
} => {
assert_eq!(height, 100);
}
_ => panic!("Expected NewBlock payload"),
}
} else {
panic!("Expected Event message");
}
}
#[tokio::test]
async fn test_event_manager_publish_to_multiple_subscribers() {
let manager = EventManager::new();
let (tx1, mut rx1) = mpsc::channel(10);
let (tx2, mut rx2) = mpsc::channel(10);
manager
.subscribe_module("module1".to_string(), vec![EventType::NewBlock], tx1)
.await
.unwrap();
manager
.subscribe_module("module2".to_string(), vec![EventType::NewBlock], tx2)
.await
.unwrap();
use blvm_protocol::Hash;
let result = manager
.publish_event(
EventType::NewBlock,
EventPayload::NewBlock {
block_hash: Hash::default(),
height: 200,
},
)
.await;
assert!(result.is_ok());
let received1 = tokio::time::timeout(Duration::from_secs(1), rx1.recv()).await;
let received2 = tokio::time::timeout(Duration::from_secs(1), rx2.recv()).await;
assert!(received1.is_ok());
assert!(received1.unwrap().is_some());
assert!(received2.is_ok());
assert!(received2.unwrap().is_some());
}
#[tokio::test]
async fn test_event_manager_publish_different_event_types() {
let manager = EventManager::new();
let (tx, mut rx) = mpsc::channel(10);
manager
.subscribe_module("test-module".to_string(), vec![EventType::NewBlock], tx)
.await
.unwrap();
use blvm_protocol::Hash;
manager
.publish_event(
EventType::NewTransaction,
EventPayload::NewTransaction {
tx_hash: Hash::default(),
},
)
.await
.unwrap();
manager
.publish_event(
EventType::NewBlock,
EventPayload::NewBlock {
block_hash: Hash::default(),
height: 400,
},
)
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(10)).await;
let received = tokio::time::timeout(Duration::from_secs(1), rx.recv()).await;
assert!(received.is_ok());
let msg_opt = received.unwrap();
assert!(msg_opt.is_some());
if let Some(ModuleMessage::Event(EventMessage { event_type, .. })) = msg_opt {
assert_eq!(event_type, EventType::NewBlock);
} else {
panic!("Expected Event message");
}
let timeout_result = tokio::time::timeout(Duration::from_millis(100), rx.recv()).await;
assert!(timeout_result.is_err()); }
#[tokio::test]
async fn test_event_manager_cleanup_failed_channels() {
let manager = EventManager::new();
let (tx, rx) = mpsc::channel(1); let event_types = vec![EventType::NewBlock];
manager
.subscribe_module("test-module".to_string(), event_types, tx)
.await
.unwrap();
drop(rx);
use blvm_protocol::Hash;
let result = manager
.publish_event(
EventType::NewBlock,
EventPayload::NewBlock {
block_hash: Hash::default(),
height: 500,
},
)
.await;
assert!(result.is_ok());
let subscribers = manager.get_subscribers(EventType::NewBlock).await;
assert!(!subscribers.contains(&"test-module".to_string()));
}
#[tokio::test]
async fn test_event_manager_get_subscribers() {
let manager = EventManager::new();
let (tx1, _rx1) = mpsc::channel(10);
let (tx2, _rx2) = mpsc::channel(10);
manager
.subscribe_module("module1".to_string(), vec![EventType::NewBlock], tx1)
.await
.unwrap();
manager
.subscribe_module("module2".to_string(), vec![EventType::NewBlock], tx2)
.await
.unwrap();
let subscribers = manager.get_subscribers(EventType::NewBlock).await;
assert_eq!(subscribers.len(), 2);
assert!(subscribers.contains(&"module1".to_string()));
assert!(subscribers.contains(&"module2".to_string()));
let tx_subscribers = manager.get_subscribers(EventType::NewTransaction).await;
assert!(tx_subscribers.is_empty());
}
#[tokio::test]
async fn test_event_manager_unsubscribe_nonexistent() {
let manager = EventManager::new();
let result = manager.unsubscribe_module("nonexistent").await;
assert!(result.is_ok()); }