mecha10-messaging 0.1.46

Redis Streams messaging layer for Mecha10
Documentation
// Tests for mecha10_messaging

use mecha10_messaging::*;

use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
struct TestMessage {
    value: i32,
    text: String,
}

#[tokio::test]
#[ignore] // Requires Redis running
async fn test_pub_sub() {
    let mut bus1 = MessageBus::connect("redis://localhost:6379", "node-1").await.unwrap();
    let mut bus2 = MessageBus::connect("redis://localhost:6379", "node-2").await.unwrap();

    // Subscribe
    let mut rx = bus2.subscribe::<TestMessage>("/test", "group1").await.unwrap();

    // Publish
    let msg = TestMessage {
        value: 42,
        text: "hello".to_string(),
    };
    bus1.publish("/test", &msg).await.unwrap();

    // Receive
    tokio::time::timeout(tokio::time::Duration::from_secs(2), async {
        if let Some(received) = rx.recv().await {
            assert_eq!(received.payload, msg);
            assert_eq!(received.publisher, "node-1");
            received.ack().await.unwrap();
        } else {
            panic!("No message received");
        }
    })
    .await
    .unwrap();

    bus1.close().await.unwrap();
    bus2.close().await.unwrap();
}

#[tokio::test]
#[ignore] // Requires Redis running
async fn test_multiple_subscribers() {
    let mut bus1 = MessageBus::connect("redis://localhost:6379", "publisher")
        .await
        .unwrap();
    let mut bus2 = MessageBus::connect("redis://localhost:6379", "consumer-1")
        .await
        .unwrap();
    let mut bus3 = MessageBus::connect("redis://localhost:6379", "consumer-2")
        .await
        .unwrap();

    // Different consumer groups receive the same message
    let mut rx1 = bus2.subscribe::<TestMessage>("/test2", "group-a").await.unwrap();
    let mut rx2 = bus3.subscribe::<TestMessage>("/test2", "group-b").await.unwrap();

    // Publish
    let msg = TestMessage {
        value: 100,
        text: "broadcast".to_string(),
    };
    bus1.publish("/test2", &msg).await.unwrap();

    // Both should receive
    tokio::time::timeout(tokio::time::Duration::from_secs(2), async {
        let received1 = rx1.recv().await.expect("Consumer 1 should receive");
        let received2 = rx2.recv().await.expect("Consumer 2 should receive");

        assert_eq!(received1.payload, msg);
        assert_eq!(received2.payload, msg);

        received1.ack().await.unwrap();
        received2.ack().await.unwrap();
    })
    .await
    .unwrap();

    bus1.close().await.unwrap();
    bus2.close().await.unwrap();
    bus3.close().await.unwrap();
}

#[tokio::test]
#[ignore] // Requires Redis running
async fn test_wildcard_subscription() {
    // Create multiple publishers (simulating different robots)
    let mut bus_robot1 = MessageBus::connect("redis://localhost:6379", "robot-1").await.unwrap();
    let mut bus_robot2 = MessageBus::connect("redis://localhost:6379", "robot-2").await.unwrap();
    let mut bus_robot3 = MessageBus::connect("redis://localhost:6379", "robot-3").await.unwrap();

    // Create aggregator that subscribes to all robots' camera feeds
    let mut bus_aggregator = MessageBus::connect("redis://localhost:6379", "aggregator")
        .await
        .unwrap();

    // Publish initial messages to create topics
    let msg1 = TestMessage {
        value: 1,
        text: "robot-1-frame".to_string(),
    };
    let msg2 = TestMessage {
        value: 2,
        text: "robot-2-frame".to_string(),
    };
    let msg3 = TestMessage {
        value: 3,
        text: "robot-3-frame".to_string(),
    };

    bus_robot1.publish("robot-1/camera/rgb", &msg1).await.unwrap();
    bus_robot2.publish("robot-2/camera/rgb", &msg2).await.unwrap();
    bus_robot3.publish("robot-3/camera/rgb", &msg3).await.unwrap();

    // Wait for topics to be created
    tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;

    // Subscribe with wildcard pattern
    let mut rx = bus_aggregator
        .subscribe_pattern::<TestMessage>("*/camera/rgb", "vision-group")
        .await
        .unwrap();

    // Publish more messages
    let msg4 = TestMessage {
        value: 4,
        text: "robot-1-frame-2".to_string(),
    };
    let msg5 = TestMessage {
        value: 5,
        text: "robot-2-frame-2".to_string(),
    };

    bus_robot1.publish("robot-1/camera/rgb", &msg4).await.unwrap();
    bus_robot2.publish("robot-2/camera/rgb", &msg5).await.unwrap();

    // Receive messages from multiple robots through single subscriber
    let mut received_count = 0;
    let mut received_values = std::collections::HashSet::new();

    tokio::time::timeout(tokio::time::Duration::from_secs(5), async {
        while received_count < 2 {
            if let Some(msg) = rx.recv().await {
                received_values.insert(msg.payload.value);
                received_count += 1;
                msg.ack().await.unwrap();
            }
        }
    })
    .await
    .expect("Should receive messages from multiple robots");

    // Verify we received messages from different robots
    assert!(received_values.contains(&4) || received_values.contains(&5));
    assert_eq!(received_count, 2);

    bus_robot1.close().await.unwrap();
    bus_robot2.close().await.unwrap();
    bus_robot3.close().await.unwrap();
    bus_aggregator.close().await.unwrap();
}

#[tokio::test]
#[ignore] // Requires Redis running
async fn test_wildcard_subscription_empty_topics() {
    let mut bus = MessageBus::connect("redis://localhost:6379", "test-node")
        .await
        .unwrap();

    // Subscribe to pattern with no existing topics
    let mut rx = bus
        .subscribe_pattern::<TestMessage>("nonexistent-*/topic", "test-group")
        .await
        .unwrap();

    // Should return subscriber but not receive any messages
    tokio::time::timeout(tokio::time::Duration::from_millis(500), async { rx.recv().await })
        .await
        .expect_err("Should timeout when no topics exist");

    bus.close().await.unwrap();
}

#[tokio::test]
#[ignore] // Requires Redis running
async fn test_discover_topics() {
    let mut bus1 = MessageBus::connect("redis://localhost:6379", "discovery-test")
        .await
        .unwrap();

    // Publish to create topics
    let msg = TestMessage {
        value: 1,
        text: "test".to_string(),
    };

    bus1.publish("robot-a/sensor/lidar", &msg).await.unwrap();
    bus1.publish("robot-a/sensor/camera", &msg).await.unwrap();
    bus1.publish("robot-b/sensor/lidar", &msg).await.unwrap();

    // Wait for topics to be created
    tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;

    // Discover topics matching pattern
    let topics = bus1.discover_topics("mecha10:*/sensor/*").await.unwrap();

    // Should find all sensor topics
    assert!(topics.len() >= 3);
    assert!(topics.iter().any(|t| t.contains("sensor")));

    bus1.close().await.unwrap();
}