omnimesh 1.0.1

Zero-allocation mesh networking middleware for autonomous robot fleets, edge-AI swarms, and multi-agent systems
Documentation
//! Multi-node integration tests
//!
//! Tests communication between multiple OMNI-MESH nodes in various scenarios.

use ed25519_dalek::SigningKey;
use omnimesh::buffer::PayloadStorage;
use omnimesh::config::OmnimeshMode;
use omnimesh::envelope::{Did, EnvelopeHeader, MessageId, PayloadType, Priority, SignedEnvelope};
use omnimesh::runtime::RoutingTable;
use omnimesh::runtime::delivery::DeliveryLayer;
use omnimesh::runtime::security::SecurityLayer;
use omnimesh::runtime::transport::config::TransportConfig;
use omnimesh::runtime::transport::interface::Transport;
use omnimesh::runtime::transport::tcp::TcpTransport;
use rand_core::OsRng;
use std::sync::Arc;
use std::thread;
use std::time::Duration;

struct Node {
    did: Did,
    signing_key: SigningKey,
    transport: TcpTransport,
    security: SecurityLayer,
    delivery: DeliveryLayer,
    routing: Arc<RoutingTable>,
}

impl Node {
    fn new(port: u16, mode: &OmnimeshMode) -> Self {
        let signing_key = SigningKey::generate(&mut OsRng);
        let did = Did::new(signing_key.verifying_key().to_bytes());

        let config = TransportConfig {
            tcp_listen_addr: format!("127.0.0.1:{}", port).parse().unwrap(),
            tcp_connect_addr: "127.0.0.1:9000".parse().unwrap(),
            ..Default::default()
        };

        let routing = Arc::new(RoutingTable::new());
        let transport =
            TcpTransport::new(config, routing.clone()).expect("Failed to create transport");
        let security = SecurityLayer::new(mode, None);
        let delivery = DeliveryLayer::new(mode);

        Node {
            did,
            signing_key,
            transport,
            security,
            delivery,
            routing,
        }
    }

    fn register_peer(&self, peer_did: Did, peer_addr: &str) {
        self.routing
            .update_route(peer_did, peer_addr.parse().unwrap());
    }

    fn send_message(&self, recipient: &Node, seq: u64, payload: &[u8]) -> Result<(), String> {
        let header = EnvelopeHeader {
            version: 7,
            message_id: MessageId([seq as u8; 16]),
            sender_did: self.did,
            recipient_did: recipient.did,
            sequence_number: seq,
            timestamp_us: 1234567890 + seq,
            priority: Priority::Normal,
            payload_type: PayloadType::Raw,
        };

        let mut payload_buf = PayloadStorage::<1024>::new();
        payload_buf
            .push_bytes(payload)
            .map_err(|e| format!("{:?}", e))?;

        let envelope = SignedEnvelope::sign(header, payload_buf, &self.signing_key);
        self.transport.send(&envelope)
    }

    fn receive_and_verify(&self) -> Option<Vec<u8>> {
        if let Some(envelope) = self.transport.receive() {
            if self.security.verify(&envelope).is_ok() {
                if self.delivery.deliver(&envelope).is_ok() {
                    return Some(envelope.payload.as_slice().to_vec());
                }
            }
        }
        None
    }
}

#[test]
fn test_three_node_ring() {
    let mode = OmnimeshMode::development();

    let node_a = Node::new(9100, &mode);
    let node_b = Node::new(9101, &mode);
    let node_c = Node::new(9102, &mode);

    // Register peers
    node_a.register_peer(node_b.did, "127.0.0.1:9101");
    node_b.register_peer(node_c.did, "127.0.0.1:9102");
    node_c.register_peer(node_a.did, "127.0.0.1:9100");

    // Wait for listeners to start
    thread::sleep(Duration::from_millis(100));

    // A -> B
    assert!(node_a.send_message(&node_b, 1, b"A to B").is_ok());
    thread::sleep(Duration::from_millis(50));

    if let Some(payload) = node_b.receive_and_verify() {
        assert_eq!(&payload, b"A to B");
    } else {
        panic!("Node B did not receive message from A");
    }

    // B -> C
    assert!(node_b.send_message(&node_c, 2, b"B to C").is_ok());
    thread::sleep(Duration::from_millis(50));

    if let Some(payload) = node_c.receive_and_verify() {
        assert_eq!(&payload, b"B to C");
    } else {
        panic!("Node C did not receive message from B");
    }

    // C -> A (complete the ring)
    assert!(node_c.send_message(&node_a, 3, b"C to A").is_ok());
    thread::sleep(Duration::from_millis(50));

    if let Some(payload) = node_a.receive_and_verify() {
        assert_eq!(&payload, b"C to A");
    } else {
        panic!("Node A did not receive message from C");
    }
}

#[test]
fn test_broadcast_to_multiple_nodes() {
    let mode = OmnimeshMode::development();

    let sender = Node::new(9200, &mode);
    let receiver1 = Node::new(9201, &mode);
    let receiver2 = Node::new(9202, &mode);
    let receiver3 = Node::new(9203, &mode);

    thread::sleep(Duration::from_millis(100));

    sender.register_peer(receiver1.did, "127.0.0.1:9201");
    sender.register_peer(receiver2.did, "127.0.0.1:9202");
    sender.register_peer(receiver3.did, "127.0.0.1:9203");

    // Sender broadcasts to all receivers
    let message = b"Broadcast message";
    assert!(sender.send_message(&receiver1, 1, message).is_ok());
    assert!(sender.send_message(&receiver2, 2, message).is_ok());
    assert!(sender.send_message(&receiver3, 3, message).is_ok());

    thread::sleep(Duration::from_millis(100));

    // All receivers should get the message
    let mut received_count = 0;

    if let Some(payload) = receiver1.receive_and_verify() {
        assert_eq!(&payload, message);
        received_count += 1;
    }

    if let Some(payload) = receiver2.receive_and_verify() {
        assert_eq!(&payload, message);
        received_count += 1;
    }

    if let Some(payload) = receiver3.receive_and_verify() {
        assert_eq!(&payload, message);
        received_count += 1;
    }

    assert_eq!(received_count, 3, "All receivers should get the broadcast");
}

#[test]
fn test_bidirectional_communication() {
    let mode = OmnimeshMode::development();

    let node_a = Node::new(9300, &mode);
    let node_b = Node::new(9301, &mode);

    thread::sleep(Duration::from_millis(100));

    node_a.register_peer(node_b.did, "127.0.0.1:9301");
    node_b.register_peer(node_a.did, "127.0.0.1:9300");

    // A -> B
    assert!(node_a.send_message(&node_b, 1, b"Request").is_ok());
    thread::sleep(Duration::from_millis(50));

    if let Some(payload) = node_b.receive_and_verify() {
        assert_eq!(&payload, b"Request");

        // B -> A (response)
        assert!(node_b.send_message(&node_a, 2, b"Response").is_ok());
        thread::sleep(Duration::from_millis(50));

        if let Some(response) = node_a.receive_and_verify() {
            assert_eq!(&response, b"Response");
        } else {
            panic!("Node A did not receive response");
        }
    } else {
        panic!("Node B did not receive request");
    }
}

#[test]
fn test_message_ordering() {
    let mode = OmnimeshMode::development();

    let sender = Node::new(9400, &mode);
    let receiver = Node::new(9401, &mode);

    thread::sleep(Duration::from_millis(100));
    sender.register_peer(receiver.did, "127.0.0.1:9401");

    // Send multiple messages in sequence
    for i in 0..5 {
        let msg = format!("Message {}", i);
        assert!(sender.send_message(&receiver, i, msg.as_bytes()).is_ok());
        thread::sleep(Duration::from_millis(20));
    }

    thread::sleep(Duration::from_millis(100));

    // Receive and verify order
    let mut received = Vec::new();
    for _ in 0..5 {
        if let Some(payload) = receiver.receive_and_verify() {
            received.push(String::from_utf8_lossy(&payload).to_string());
        }
    }

    // Should receive at least some messages (may not be all due to timing)
    assert!(!received.is_empty(), "Should receive at least one message");
}

#[test]
fn test_duplicate_detection_across_nodes() {
    let mode = OmnimeshMode::production(); // Production mode has persistent dedup

    let sender = Node::new(9500, &mode);
    let receiver = Node::new(9501, &mode);

    thread::sleep(Duration::from_millis(100));
    sender.register_peer(receiver.did, "127.0.0.1:9501");

    // Send same message twice
    let message = b"Duplicate test";
    assert!(sender.send_message(&receiver, 1, message).is_ok());
    thread::sleep(Duration::from_millis(50));

    // First delivery should succeed
    if let Some(payload) = receiver.receive_and_verify() {
        assert_eq!(&payload, message);
    } else {
        panic!("First message not received");
    }

    // Send duplicate
    assert!(sender.send_message(&receiver, 1, message).is_ok());
    thread::sleep(Duration::from_millis(50));

    // Duplicate should be filtered (receive_and_verify returns None for duplicates)
    // Note: This test may be flaky due to timing, but demonstrates the concept
}

#[test]
fn test_high_throughput() {
    let mode = OmnimeshMode::development();

    let sender = Node::new(9600, &mode);
    let receiver = Node::new(9601, &mode);

    thread::sleep(Duration::from_millis(100));
    sender.register_peer(receiver.did, "127.0.0.1:9601");

    let message_count = 100;
    let mut sent = 0;

    // Send many messages rapidly
    for i in 0..message_count {
        match sender.send_message(&receiver, i, b"High throughput test") {
            Ok(_) => sent += 1,
            Err(_) => break, // Stop on backpressure
        }
    }

    thread::sleep(Duration::from_millis(200));

    // Count received messages
    let mut received = 0;
    for _ in 0..message_count {
        if receiver.receive_and_verify().is_some() {
            received += 1;
        } else {
            break;
        }
    }

    println!("High throughput test: sent={}, received={}", sent, received);
    assert!(sent > 0, "Should send at least some messages");
    assert!(received > 0, "Should receive at least some messages");
}