use crate::*;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct TestEvent {
pub message: String,
pub value: i32,
}
impl Event for TestEvent {
fn topic(&self) -> String {
"test/event".to_string()
}
}
#[derive(Debug)]
pub struct TestSubscriber {
pub received: Arc<Mutex<Vec<TestEvent>>>,
}
impl TestSubscriber {
pub fn new() -> Self {
Self {
received: Arc::new(Mutex::new(Vec::new())),
}
}
}
#[async_trait::async_trait]
impl Subscriber for TestSubscriber {
fn topic_pattern(&self) -> &str {
"test/#"
}
async fn handle(&mut self, _topic: &str, payload: Vec<u8>) -> Result<()> {
if let Ok(event) = bincode::deserialize::<TestEvent>(&payload) {
self.received.lock().await.push(event);
}
Ok(())
}
}
pub struct Calculator;
#[async_trait::async_trait]
impl Service for Calculator {
fn name(&self) -> &'static str {
"CalculatorService"
}
fn methods(&self) -> Vec<&'static str> {
vec!["add", "multiply"]
}
async fn handle(&self, method: &str, payload: Vec<u8>) -> Result<Vec<u8>> {
match method {
"add" => {
let (a, b): (i32, i32) = bincode::deserialize(&payload)?;
let result = a + b;
Ok(bincode::serialize(&result)?)
}
"multiply" => {
let (a, b): (i32, i32) = bincode::deserialize(&payload)?;
let result = a * b;
Ok(bincode::serialize(&result)?)
}
_ => Err(Error::method_not_found("TestService", method)),
}
}
}
#[cfg(test)]
mod integration_tests {
use super::*;
use tokio::time::timeout;
#[tokio::test]
async fn test_pubsub_pattern_with_events() {
println!("Starting pub/sub test");
let hub = ProcessHub::new("test_hub").await.unwrap();
let subscriber = TestSubscriber::new();
let received_events = subscriber.received.clone();
let _subscription = hub.subscribe(subscriber).await.unwrap();
println!("Subscriber registered");
tokio::time::sleep(Duration::from_millis(200)).await;
hub.publish(
"test/event",
TestEvent {
message: "Hello World".to_string(),
value: 42,
},
)
.await
.unwrap();
hub.publish(
"test/event",
TestEvent {
message: "Test Event".to_string(),
value: 100,
},
)
.await
.unwrap();
println!("Events published");
tokio::time::sleep(Duration::from_millis(500)).await;
let received = received_events.lock().await;
println!("Received {} events", received.len());
if !received.is_empty() {
println!("First event: {:?}", received[0]);
}
assert!(!received.is_empty(), "Should receive at least 1 event");
hub.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_request_response_pattern_with_service() {
println!("Starting req/resp test");
let hub = ProcessHub::new("test_hub").await.unwrap();
let calculator = Calculator;
hub.register_service(calculator).await.unwrap();
println!("Calculator service registered");
tokio::time::sleep(Duration::from_millis(200)).await;
println!("Testing add operation");
let add_result: i32 = timeout(
Duration::from_secs(5),
hub.call("CalculatorService.add", (10, 5)),
)
.await
.unwrap()
.unwrap();
assert_eq!(add_result, 15);
println!("Add test passed: 10 + 5 = {add_result}");
println!("Testing multiply operation");
let multiply_result: i32 = timeout(
Duration::from_secs(5),
hub.call("CalculatorService.multiply", (6, 7)),
)
.await
.unwrap()
.unwrap();
assert_eq!(multiply_result, 42);
println!("Multiply test passed: 6 * 7 = {multiply_result}");
hub.shutdown().await.unwrap();
}
}