use mecha10_messaging::*;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
struct TestMessage {
value: i32,
text: String,
}
#[tokio::test]
#[ignore] async fn test_pub_sub() {
let mut bus1 = MessageBus::connect("redis://localhost:6379", "node-1").await.unwrap();
let mut bus2 = MessageBus::connect("redis://localhost:6379", "node-2").await.unwrap();
let mut rx = bus2.subscribe::<TestMessage>("/test", "group1").await.unwrap();
let msg = TestMessage {
value: 42,
text: "hello".to_string(),
};
bus1.publish("/test", &msg).await.unwrap();
tokio::time::timeout(tokio::time::Duration::from_secs(2), async {
if let Some(received) = rx.recv().await {
assert_eq!(received.payload, msg);
assert_eq!(received.publisher, "node-1");
received.ack().await.unwrap();
} else {
panic!("No message received");
}
})
.await
.unwrap();
bus1.close().await.unwrap();
bus2.close().await.unwrap();
}
#[tokio::test]
#[ignore] async fn test_multiple_subscribers() {
let mut bus1 = MessageBus::connect("redis://localhost:6379", "publisher")
.await
.unwrap();
let mut bus2 = MessageBus::connect("redis://localhost:6379", "consumer-1")
.await
.unwrap();
let mut bus3 = MessageBus::connect("redis://localhost:6379", "consumer-2")
.await
.unwrap();
let mut rx1 = bus2.subscribe::<TestMessage>("/test2", "group-a").await.unwrap();
let mut rx2 = bus3.subscribe::<TestMessage>("/test2", "group-b").await.unwrap();
let msg = TestMessage {
value: 100,
text: "broadcast".to_string(),
};
bus1.publish("/test2", &msg).await.unwrap();
tokio::time::timeout(tokio::time::Duration::from_secs(2), async {
let received1 = rx1.recv().await.expect("Consumer 1 should receive");
let received2 = rx2.recv().await.expect("Consumer 2 should receive");
assert_eq!(received1.payload, msg);
assert_eq!(received2.payload, msg);
received1.ack().await.unwrap();
received2.ack().await.unwrap();
})
.await
.unwrap();
bus1.close().await.unwrap();
bus2.close().await.unwrap();
bus3.close().await.unwrap();
}
#[tokio::test]
#[ignore] async fn test_wildcard_subscription() {
let mut bus_robot1 = MessageBus::connect("redis://localhost:6379", "robot-1").await.unwrap();
let mut bus_robot2 = MessageBus::connect("redis://localhost:6379", "robot-2").await.unwrap();
let mut bus_robot3 = MessageBus::connect("redis://localhost:6379", "robot-3").await.unwrap();
let mut bus_aggregator = MessageBus::connect("redis://localhost:6379", "aggregator")
.await
.unwrap();
let msg1 = TestMessage {
value: 1,
text: "robot-1-frame".to_string(),
};
let msg2 = TestMessage {
value: 2,
text: "robot-2-frame".to_string(),
};
let msg3 = TestMessage {
value: 3,
text: "robot-3-frame".to_string(),
};
bus_robot1.publish("robot-1/camera/rgb", &msg1).await.unwrap();
bus_robot2.publish("robot-2/camera/rgb", &msg2).await.unwrap();
bus_robot3.publish("robot-3/camera/rgb", &msg3).await.unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
let mut rx = bus_aggregator
.subscribe_pattern::<TestMessage>("*/camera/rgb", "vision-group")
.await
.unwrap();
let msg4 = TestMessage {
value: 4,
text: "robot-1-frame-2".to_string(),
};
let msg5 = TestMessage {
value: 5,
text: "robot-2-frame-2".to_string(),
};
bus_robot1.publish("robot-1/camera/rgb", &msg4).await.unwrap();
bus_robot2.publish("robot-2/camera/rgb", &msg5).await.unwrap();
let mut received_count = 0;
let mut received_values = std::collections::HashSet::new();
tokio::time::timeout(tokio::time::Duration::from_secs(5), async {
while received_count < 2 {
if let Some(msg) = rx.recv().await {
received_values.insert(msg.payload.value);
received_count += 1;
msg.ack().await.unwrap();
}
}
})
.await
.expect("Should receive messages from multiple robots");
assert!(received_values.contains(&4) || received_values.contains(&5));
assert_eq!(received_count, 2);
bus_robot1.close().await.unwrap();
bus_robot2.close().await.unwrap();
bus_robot3.close().await.unwrap();
bus_aggregator.close().await.unwrap();
}
#[tokio::test]
#[ignore] async fn test_wildcard_subscription_empty_topics() {
let mut bus = MessageBus::connect("redis://localhost:6379", "test-node")
.await
.unwrap();
let mut rx = bus
.subscribe_pattern::<TestMessage>("nonexistent-*/topic", "test-group")
.await
.unwrap();
tokio::time::timeout(tokio::time::Duration::from_millis(500), async { rx.recv().await })
.await
.expect_err("Should timeout when no topics exist");
bus.close().await.unwrap();
}
#[tokio::test]
#[ignore] async fn test_discover_topics() {
let mut bus1 = MessageBus::connect("redis://localhost:6379", "discovery-test")
.await
.unwrap();
let msg = TestMessage {
value: 1,
text: "test".to_string(),
};
bus1.publish("robot-a/sensor/lidar", &msg).await.unwrap();
bus1.publish("robot-a/sensor/camera", &msg).await.unwrap();
bus1.publish("robot-b/sensor/lidar", &msg).await.unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
let topics = bus1.discover_topics("mecha10:*/sensor/*").await.unwrap();
assert!(topics.len() >= 3);
assert!(topics.iter().any(|t| t.contains("sensor")));
bus1.close().await.unwrap();
}