use ed25519_dalek::SigningKey;
use omnimesh::buffer::PayloadStorage;
use omnimesh::config::OmnimeshMode;
use omnimesh::envelope::{Did, EnvelopeHeader, MessageId, PayloadType, Priority, SignedEnvelope};
use omnimesh::runtime::RoutingTable;
use omnimesh::runtime::delivery::DeliveryLayer;
use omnimesh::runtime::security::SecurityLayer;
use omnimesh::runtime::transport::config::TransportConfig;
use omnimesh::runtime::transport::interface::Transport;
use omnimesh::runtime::transport::tcp::TcpTransport;
use rand_core::OsRng;
use std::sync::Arc;
use std::thread;
use std::time::Duration;
struct Node {
did: Did,
signing_key: SigningKey,
transport: TcpTransport,
security: SecurityLayer,
delivery: DeliveryLayer,
routing: Arc<RoutingTable>,
}
impl Node {
fn new(port: u16, mode: &OmnimeshMode) -> Self {
let signing_key = SigningKey::generate(&mut OsRng);
let did = Did::new(signing_key.verifying_key().to_bytes());
let config = TransportConfig {
tcp_listen_addr: format!("127.0.0.1:{}", port).parse().unwrap(),
tcp_connect_addr: "127.0.0.1:9000".parse().unwrap(),
..Default::default()
};
let routing = Arc::new(RoutingTable::new());
let transport =
TcpTransport::new(config, routing.clone()).expect("Failed to create transport");
let security = SecurityLayer::new(mode, None);
let delivery = DeliveryLayer::new(mode);
Node {
did,
signing_key,
transport,
security,
delivery,
routing,
}
}
fn register_peer(&self, peer_did: Did, peer_addr: &str) {
self.routing
.update_route(peer_did, peer_addr.parse().unwrap());
}
fn send_message(&self, recipient: &Node, seq: u64, payload: &[u8]) -> Result<(), String> {
let header = EnvelopeHeader {
version: 7,
message_id: MessageId([seq as u8; 16]),
sender_did: self.did,
recipient_did: recipient.did,
sequence_number: seq,
timestamp_us: 1234567890 + seq,
priority: Priority::Normal,
payload_type: PayloadType::Raw,
};
let mut payload_buf = PayloadStorage::<1024>::new();
payload_buf
.push_bytes(payload)
.map_err(|e| format!("{:?}", e))?;
let envelope = SignedEnvelope::sign(header, payload_buf, &self.signing_key);
self.transport.send(&envelope)
}
fn receive_and_verify(&self) -> Option<Vec<u8>> {
if let Some(envelope) = self.transport.receive() {
if self.security.verify(&envelope).is_ok() {
if self.delivery.deliver(&envelope).is_ok() {
return Some(envelope.payload.as_slice().to_vec());
}
}
}
None
}
}
#[test]
fn test_three_node_ring() {
let mode = OmnimeshMode::development();
let node_a = Node::new(9100, &mode);
let node_b = Node::new(9101, &mode);
let node_c = Node::new(9102, &mode);
node_a.register_peer(node_b.did, "127.0.0.1:9101");
node_b.register_peer(node_c.did, "127.0.0.1:9102");
node_c.register_peer(node_a.did, "127.0.0.1:9100");
thread::sleep(Duration::from_millis(100));
assert!(node_a.send_message(&node_b, 1, b"A to B").is_ok());
thread::sleep(Duration::from_millis(50));
if let Some(payload) = node_b.receive_and_verify() {
assert_eq!(&payload, b"A to B");
} else {
panic!("Node B did not receive message from A");
}
assert!(node_b.send_message(&node_c, 2, b"B to C").is_ok());
thread::sleep(Duration::from_millis(50));
if let Some(payload) = node_c.receive_and_verify() {
assert_eq!(&payload, b"B to C");
} else {
panic!("Node C did not receive message from B");
}
assert!(node_c.send_message(&node_a, 3, b"C to A").is_ok());
thread::sleep(Duration::from_millis(50));
if let Some(payload) = node_a.receive_and_verify() {
assert_eq!(&payload, b"C to A");
} else {
panic!("Node A did not receive message from C");
}
}
#[test]
fn test_broadcast_to_multiple_nodes() {
let mode = OmnimeshMode::development();
let sender = Node::new(9200, &mode);
let receiver1 = Node::new(9201, &mode);
let receiver2 = Node::new(9202, &mode);
let receiver3 = Node::new(9203, &mode);
thread::sleep(Duration::from_millis(100));
sender.register_peer(receiver1.did, "127.0.0.1:9201");
sender.register_peer(receiver2.did, "127.0.0.1:9202");
sender.register_peer(receiver3.did, "127.0.0.1:9203");
let message = b"Broadcast message";
assert!(sender.send_message(&receiver1, 1, message).is_ok());
assert!(sender.send_message(&receiver2, 2, message).is_ok());
assert!(sender.send_message(&receiver3, 3, message).is_ok());
thread::sleep(Duration::from_millis(100));
let mut received_count = 0;
if let Some(payload) = receiver1.receive_and_verify() {
assert_eq!(&payload, message);
received_count += 1;
}
if let Some(payload) = receiver2.receive_and_verify() {
assert_eq!(&payload, message);
received_count += 1;
}
if let Some(payload) = receiver3.receive_and_verify() {
assert_eq!(&payload, message);
received_count += 1;
}
assert_eq!(received_count, 3, "All receivers should get the broadcast");
}
#[test]
fn test_bidirectional_communication() {
let mode = OmnimeshMode::development();
let node_a = Node::new(9300, &mode);
let node_b = Node::new(9301, &mode);
thread::sleep(Duration::from_millis(100));
node_a.register_peer(node_b.did, "127.0.0.1:9301");
node_b.register_peer(node_a.did, "127.0.0.1:9300");
assert!(node_a.send_message(&node_b, 1, b"Request").is_ok());
thread::sleep(Duration::from_millis(50));
if let Some(payload) = node_b.receive_and_verify() {
assert_eq!(&payload, b"Request");
assert!(node_b.send_message(&node_a, 2, b"Response").is_ok());
thread::sleep(Duration::from_millis(50));
if let Some(response) = node_a.receive_and_verify() {
assert_eq!(&response, b"Response");
} else {
panic!("Node A did not receive response");
}
} else {
panic!("Node B did not receive request");
}
}
#[test]
fn test_message_ordering() {
let mode = OmnimeshMode::development();
let sender = Node::new(9400, &mode);
let receiver = Node::new(9401, &mode);
thread::sleep(Duration::from_millis(100));
sender.register_peer(receiver.did, "127.0.0.1:9401");
for i in 0..5 {
let msg = format!("Message {}", i);
assert!(sender.send_message(&receiver, i, msg.as_bytes()).is_ok());
thread::sleep(Duration::from_millis(20));
}
thread::sleep(Duration::from_millis(100));
let mut received = Vec::new();
for _ in 0..5 {
if let Some(payload) = receiver.receive_and_verify() {
received.push(String::from_utf8_lossy(&payload).to_string());
}
}
assert!(!received.is_empty(), "Should receive at least one message");
}
#[test]
fn test_duplicate_detection_across_nodes() {
let mode = OmnimeshMode::production();
let sender = Node::new(9500, &mode);
let receiver = Node::new(9501, &mode);
thread::sleep(Duration::from_millis(100));
sender.register_peer(receiver.did, "127.0.0.1:9501");
let message = b"Duplicate test";
assert!(sender.send_message(&receiver, 1, message).is_ok());
thread::sleep(Duration::from_millis(50));
if let Some(payload) = receiver.receive_and_verify() {
assert_eq!(&payload, message);
} else {
panic!("First message not received");
}
assert!(sender.send_message(&receiver, 1, message).is_ok());
thread::sleep(Duration::from_millis(50));
}
#[test]
fn test_high_throughput() {
let mode = OmnimeshMode::development();
let sender = Node::new(9600, &mode);
let receiver = Node::new(9601, &mode);
thread::sleep(Duration::from_millis(100));
sender.register_peer(receiver.did, "127.0.0.1:9601");
let message_count = 100;
let mut sent = 0;
for i in 0..message_count {
match sender.send_message(&receiver, i, b"High throughput test") {
Ok(_) => sent += 1,
Err(_) => break, }
}
thread::sleep(Duration::from_millis(200));
let mut received = 0;
for _ in 0..message_count {
if receiver.receive_and_verify().is_some() {
received += 1;
} else {
break;
}
}
println!("High throughput test: sent={}, received={}", sent, received);
assert!(sent > 0, "Should send at least some messages");
assert!(received > 0, "Should receive at least some messages");
}