use std::collections::hash_map::DefaultHasher;
use std::collections::{BTreeMap, VecDeque};
use std::fmt::{self, Display};
use std::hash::{Hash, Hasher};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use switchy_async::sync::RwLock;
use switchy_random::{Rng, rng};
fn default_latency() -> Duration {
std::env::var("SIMULATOR_DEFAULT_LATENCY_MS")
.ok()
.and_then(|s| s.parse().ok())
.map_or(Duration::from_millis(50), Duration::from_millis)
}
fn default_packet_loss() -> f64 {
std::env::var("SIMULATOR_DEFAULT_PACKET_LOSS")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(0.01) }
#[allow(dead_code)] fn discovery_delay() -> Duration {
std::env::var("SIMULATOR_DISCOVERY_DELAY_MS")
.ok()
.and_then(|s| s.parse().ok())
.map_or(Duration::from_millis(100), Duration::from_millis)
}
#[allow(dead_code)] fn connection_timeout() -> Duration {
std::env::var("SIMULATOR_CONNECTION_TIMEOUT_SECS")
.ok()
.and_then(|s| s.parse().ok())
.map_or(Duration::from_secs(30), Duration::from_secs)
}
fn max_message_size() -> usize {
std::env::var("SIMULATOR_MAX_MESSAGE_SIZE")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(1024 * 1024) }
#[derive(Debug, Clone)]
pub struct NetworkGraph {
nodes: BTreeMap<SimulatorNodeId, NodeInfo>,
links: BTreeMap<(SimulatorNodeId, SimulatorNodeId), LinkInfo>,
}
#[derive(Debug, Clone)]
pub struct NodeInfo {
#[allow(dead_code)] id: SimulatorNodeId,
#[allow(dead_code)] is_online: bool,
#[allow(dead_code)] registered_names: BTreeMap<String, String>, message_queues: BTreeMap<SimulatorNodeId, VecDeque<Vec<u8>>>,
}
#[derive(Debug, Clone)]
pub struct LinkInfo {
latency: Duration,
packet_loss: f64,
#[allow(dead_code)] bandwidth_limit: Option<u64>, is_active: bool,
}
impl NetworkGraph {
#[must_use]
pub const fn new() -> Self {
Self {
nodes: BTreeMap::new(),
links: BTreeMap::new(),
}
}
pub fn add_node(&mut self, node_id: SimulatorNodeId) {
self.nodes.insert(
node_id.clone(),
NodeInfo {
id: node_id,
is_online: true,
registered_names: BTreeMap::new(),
message_queues: BTreeMap::new(),
},
);
}
pub fn connect_nodes(&mut self, a: SimulatorNodeId, b: SimulatorNodeId, link: LinkInfo) {
self.links.insert((a.clone(), b.clone()), link.clone());
self.links.insert((b, a), link); }
#[must_use]
pub fn find_path(
&self,
from: SimulatorNodeId,
to: SimulatorNodeId,
) -> Option<Vec<SimulatorNodeId>> {
if from == to {
return Some(vec![from]);
}
let mut queue = VecDeque::new();
let mut visited = std::collections::BTreeSet::new();
let mut parent: BTreeMap<SimulatorNodeId, SimulatorNodeId> = BTreeMap::new();
queue.push_back(from.clone());
visited.insert(from);
while let Some(current) = queue.pop_front() {
for ((link_from, link_to), link_info) in &self.links {
if *link_from == current && link_info.is_active {
if *link_to == to {
let mut path = vec![to, current.clone()];
let mut node = current;
while let Some(prev) = parent.get(&node) {
path.push(prev.clone());
node = prev.clone();
}
path.reverse();
return Some(path);
}
if !visited.contains(link_to) {
visited.insert(link_to.clone());
parent.insert(link_to.clone(), current.clone());
queue.push_back(link_to.clone());
}
}
}
}
None }
pub fn add_partition(&mut self, group_a: &[SimulatorNodeId], group_b: &[SimulatorNodeId]) {
for a in group_a {
for b in group_b {
self.links.remove(&(a.clone(), b.clone()));
self.links.remove(&(b.clone(), a.clone()));
}
}
}
pub fn heal_partition(&mut self, group_a: &[SimulatorNodeId], group_b: &[SimulatorNodeId]) {
let default_link = LinkInfo {
latency: default_latency(),
packet_loss: default_packet_loss(),
bandwidth_limit: None,
is_active: true,
};
for a in group_a {
for b in group_b {
self.connect_nodes(a.clone(), b.clone(), default_link.clone());
}
}
}
#[must_use]
pub fn get_node_mut(&mut self, node_id: &SimulatorNodeId) -> Option<&mut NodeInfo> {
self.nodes.get_mut(node_id)
}
#[must_use]
pub fn get_node(&self, node_id: &SimulatorNodeId) -> Option<&NodeInfo> {
self.nodes.get(node_id)
}
}
impl Default for NetworkGraph {
fn default() -> Self {
Self::new()
}
}
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct SimulatorNodeId([u8; 32]);
impl SimulatorNodeId {
#[must_use]
pub fn from_seed(seed: &str) -> Self {
let mut hasher = DefaultHasher::new();
seed.hash(&mut hasher);
let seed_u64 = hasher.finish();
let rng = Rng::from_seed(seed_u64);
let mut bytes = [0u8; 32];
rng.fill(&mut bytes);
Self(bytes)
}
#[must_use]
pub const fn from_bytes(bytes: [u8; 32]) -> Self {
Self(bytes)
}
#[must_use]
pub const fn as_bytes(&self) -> &[u8; 32] {
&self.0
}
#[must_use]
pub fn fmt_short(&self) -> String {
format!(
"{:02x}{:02x}{:02x}{:02x}{:02x}",
self.0[0], self.0[1], self.0[2], self.0[3], self.0[4]
)
}
#[must_use]
pub fn generate() -> Self {
let mut bytes = [0u8; 32];
rng().fill(&mut bytes);
Self(bytes)
}
}
impl Display for SimulatorNodeId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
for byte in &self.0 {
write!(f, "{byte:02x}")?;
}
Ok(())
}
}
pub struct SimulatorP2P {
node_id: SimulatorNodeId,
network_graph: Arc<RwLock<NetworkGraph>>, connections: Arc<RwLock<BTreeMap<SimulatorNodeId, SimulatorConnection>>>, }
impl SimulatorP2P {
#[must_use]
pub fn new() -> Self {
Self {
node_id: SimulatorNodeId::generate(),
network_graph: Arc::new(RwLock::new(NetworkGraph::new())),
connections: Arc::new(RwLock::new(BTreeMap::new())), }
}
#[must_use]
pub fn with_seed(seed: &str) -> Self {
Self {
node_id: SimulatorNodeId::from_seed(seed),
network_graph: Arc::new(RwLock::new(NetworkGraph::new())),
connections: Arc::new(RwLock::new(BTreeMap::new())), }
}
#[must_use]
pub const fn local_node_id(&self) -> &SimulatorNodeId {
&self.node_id
}
pub async fn connect(&self, remote_id: SimulatorNodeId) -> Result<SimulatorConnection, String> {
let mut graph = self.network_graph.write().await;
if !graph.nodes.contains_key(&self.node_id) {
graph.add_node(self.node_id.clone());
}
if !graph.nodes.contains_key(&remote_id) {
graph.add_node(remote_id.clone());
}
if let Some(local_node) = graph.get_node_mut(&self.node_id) {
local_node
.message_queues
.entry(remote_id.clone())
.or_insert_with(VecDeque::new);
}
if let Some(remote_node) = graph.get_node_mut(&remote_id) {
remote_node
.message_queues
.entry(self.node_id.clone())
.or_insert_with(VecDeque::new);
}
let has_path = graph
.find_path(self.node_id.clone(), remote_id.clone())
.is_some();
if !has_path {
return Err("No route to destination".to_string());
}
drop(graph);
let connection = SimulatorConnection {
local_id: self.node_id.clone(),
remote_id: remote_id.clone(),
network_graph: Arc::clone(&self.network_graph),
is_connected: Arc::new(AtomicBool::new(true)),
};
{
let mut connections = self.connections.write().await;
connections.insert(remote_id, connection.clone());
}
Ok(connection)
}
pub async fn register_peer(&self, name: &str, node_id: SimulatorNodeId) -> Result<(), String> {
let mut graph = self.network_graph.write().await;
if !graph.nodes.contains_key(&node_id) {
graph.add_node(node_id.clone());
}
if let Some(node_info) = graph.nodes.get_mut(&node_id) {
node_info
.registered_names
.insert(name.to_string(), node_id.to_string());
}
drop(graph);
Ok(())
}
pub async fn discover(&self, name: &str) -> Result<SimulatorNodeId, String> {
let delay = discovery_delay();
switchy_async::time::sleep(delay).await;
let graph = self.network_graph.read().await;
for (node_id, node_info) in &graph.nodes {
if node_info.registered_names.contains_key(name) {
return Ok(node_id.clone());
}
}
drop(graph);
Err(format!("Name '{name}' not found"))
}
pub async fn connect_by_name(&self, name: &str) -> Result<SimulatorConnection, String> {
let node_id = self.discover(name).await?;
self.connect(node_id).await
}
}
impl Default for SimulatorP2P {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct SimulatorConnection {
local_id: SimulatorNodeId,
remote_id: SimulatorNodeId,
network_graph: Arc<RwLock<NetworkGraph>>,
is_connected: Arc<AtomicBool>,
}
impl SimulatorConnection {
pub async fn send(&mut self, data: &[u8]) -> Result<(), String> {
if !self.is_connected.load(Ordering::Relaxed) {
return Err("Connection closed".to_string());
}
let max_size = max_message_size();
if data.len() > max_size {
return Err(format!(
"Message too large: {} bytes exceeds max {}",
data.len(),
max_size
));
}
let graph = self.network_graph.read().await;
let path = graph
.find_path(self.local_id.clone(), self.remote_id.clone())
.ok_or_else(|| "No route to destination".to_string())?;
let total_latency = Self::calculate_path_latency(&graph, &path);
if Self::packet_lost(&graph, &path) {
return Ok(()); }
drop(graph); switchy_async::time::sleep(total_latency).await;
{
let mut graph = self.network_graph.write().await;
if let Some(remote_node) = graph.get_node_mut(&self.remote_id)
&& let Some(queue) = remote_node.message_queues.get_mut(&self.local_id)
{
queue.push_back(data.to_vec());
}
}
Ok(())
}
pub async fn recv(&mut self) -> Result<Vec<u8>, String> {
{
let mut graph = self.network_graph.write().await;
if let Some(local_node) = graph.get_node_mut(&self.local_id)
&& let Some(queue) = local_node.message_queues.get_mut(&self.remote_id)
&& let Some(message) = queue.pop_front()
{
return Ok(message);
}
}
Err("No message available".to_string())
}
#[must_use]
pub fn is_connected(&self) -> bool {
self.is_connected.load(Ordering::Relaxed)
}
#[must_use]
pub const fn remote_node_id(&self) -> &SimulatorNodeId {
&self.remote_id
}
pub fn close(&mut self) -> Result<(), String> {
self.is_connected.store(false, Ordering::Relaxed);
Ok(())
}
fn calculate_path_latency(graph: &NetworkGraph, path: &[SimulatorNodeId]) -> Duration {
let mut total = Duration::from_millis(0);
for window in path.windows(2) {
if let Some(link) = graph.links.get(&(window[0].clone(), window[1].clone())) {
total += link.latency;
}
}
total
}
fn packet_lost(graph: &NetworkGraph, path: &[SimulatorNodeId]) -> bool {
for window in path.windows(2) {
if let Some(link) = graph.links.get(&(window[0].clone(), window[1].clone()))
&& rng().gen_range(0.0..1.0) < link.packet_loss
{
return true;
}
}
false
}
}
impl crate::traits::P2PNodeId for SimulatorNodeId {
fn from_bytes(bytes: &[u8; 32]) -> crate::types::P2PResult<Self> {
Ok(Self::from_bytes(*bytes))
}
fn as_bytes(&self) -> &[u8; 32] {
self.as_bytes()
}
fn fmt_short(&self) -> String {
self.fmt_short()
}
}
#[must_use]
pub fn test_node_id(name: &str) -> SimulatorNodeId {
SimulatorNodeId::from_seed(name)
}
#[cfg(test)]
impl SimulatorP2P {
#[must_use]
pub fn test_setup() -> (Self, SimulatorNodeId, SimulatorNodeId) {
let alice = Self::new();
let alice_id = alice.local_node_id().clone();
let bob = Self::new();
let bob_id = bob.local_node_id().clone();
{
let mut graph = alice.network_graph.blocking_write();
graph.connect_nodes(
alice_id.clone(),
bob_id.clone(),
LinkInfo {
latency: Duration::from_millis(10),
packet_loss: 0.0,
bandwidth_limit: None,
is_active: true,
},
);
}
(alice, alice_id, bob_id)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test_log::test]
fn test_node_id_deterministic() {
let id1 = test_node_id("alice");
let id2 = test_node_id("alice");
assert_eq!(id1, id2);
}
#[test_log::test]
fn test_node_id_different() {
let alice = test_node_id("alice");
let bob = test_node_id("bob");
assert_ne!(alice, bob);
}
#[test_log::test]
fn test_fmt_short() {
let id = test_node_id("test");
let short = id.fmt_short();
assert_eq!(short.len(), 10); }
#[test_log::test]
fn test_node_id_from_bytes() {
let bytes = [42u8; 32];
let id = SimulatorNodeId::from_bytes(bytes);
assert_eq!(id.as_bytes(), &bytes);
}
#[test_log::test]
fn test_node_id_display() {
let bytes = [
0xAB, 0xCD, 0xEF, 0x01, 0x23, 0x45, 0x67, 0x89, 0x0A, 0xBC, 0xDE, 0xF0, 0x12, 0x34,
0x56, 0x78, 0x9A, 0xBC, 0xDE, 0xF0, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88,
0x99, 0xAA, 0xBB, 0xCC,
];
let id = SimulatorNodeId::from_bytes(bytes);
let display = format!("{id}");
assert_eq!(display.len(), 64); assert!(display.starts_with("abcdef"));
}
#[test_log::test]
fn test_node_id_generate_creates_unique_ids() {
let id1 = SimulatorNodeId::generate();
let id2 = SimulatorNodeId::generate();
assert_ne!(id1, id2);
}
#[test_log::test]
fn test_node_id_ordering() {
let id1 = SimulatorNodeId::from_bytes([1u8; 32]);
let id2 = SimulatorNodeId::from_bytes([2u8; 32]);
assert!(id1 < id2);
assert!(id2 > id1);
}
#[test_log::test]
fn test_network_graph_new() {
let graph = NetworkGraph::new();
assert!(graph.nodes.is_empty());
assert!(graph.links.is_empty());
}
#[test_log::test]
fn test_network_graph_add_node() {
let mut graph = NetworkGraph::new();
let node_id = test_node_id("alice");
graph.add_node(node_id.clone());
assert!(graph.nodes.contains_key(&node_id));
let node = graph.get_node(&node_id).unwrap();
assert_eq!(node.id, node_id);
assert!(node.is_online);
assert!(node.registered_names.is_empty());
assert!(node.message_queues.is_empty());
}
#[test_log::test]
fn test_network_graph_add_node_idempotent() {
let mut graph = NetworkGraph::new();
let node_id = test_node_id("alice");
graph.add_node(node_id.clone());
graph.add_node(node_id);
assert_eq!(graph.nodes.len(), 1);
}
#[test_log::test]
fn test_network_graph_connect_nodes() {
let mut graph = NetworkGraph::new();
let alice = test_node_id("alice");
let bob = test_node_id("bob");
let link = LinkInfo {
latency: Duration::from_millis(10),
packet_loss: 0.01,
bandwidth_limit: Some(1_000_000),
is_active: true,
};
graph.connect_nodes(alice.clone(), bob.clone(), link);
assert!(graph.links.contains_key(&(alice.clone(), bob.clone())));
assert!(graph.links.contains_key(&(bob.clone(), alice.clone())));
let forward_link = &graph.links[&(alice, bob)];
assert_eq!(forward_link.latency, Duration::from_millis(10));
assert!((forward_link.packet_loss - 0.01).abs() < f64::EPSILON);
assert!(forward_link.is_active);
}
#[test_log::test]
fn test_network_graph_find_path_same_node() {
let graph = NetworkGraph::new();
let alice = test_node_id("alice");
let path = graph.find_path(alice.clone(), alice.clone());
assert_eq!(path, Some(vec![alice]));
}
#[test_log::test]
fn test_network_graph_find_path_direct() {
let mut graph = NetworkGraph::new();
let alice = test_node_id("alice");
let bob = test_node_id("bob");
graph.add_node(alice.clone());
graph.add_node(bob.clone());
graph.connect_nodes(
alice.clone(),
bob.clone(),
LinkInfo {
latency: Duration::from_millis(10),
packet_loss: 0.0,
bandwidth_limit: None,
is_active: true,
},
);
let path = graph.find_path(alice.clone(), bob.clone());
assert_eq!(path, Some(vec![alice, bob]));
}
#[test_log::test]
fn test_network_graph_find_path_multi_hop() {
let mut graph = NetworkGraph::new();
let alice = test_node_id("alice");
let bob = test_node_id("bob");
let charlie = test_node_id("charlie");
graph.add_node(alice.clone());
graph.add_node(bob.clone());
graph.add_node(charlie.clone());
let link = LinkInfo {
latency: Duration::from_millis(10),
packet_loss: 0.0,
bandwidth_limit: None,
is_active: true,
};
graph.connect_nodes(alice.clone(), bob.clone(), link.clone());
graph.connect_nodes(bob.clone(), charlie.clone(), link);
let path = graph.find_path(alice.clone(), charlie.clone());
assert_eq!(path, Some(vec![alice, bob, charlie]));
}
#[test_log::test]
fn test_network_graph_find_path_no_route() {
let mut graph = NetworkGraph::new();
let alice = test_node_id("alice");
let bob = test_node_id("bob");
graph.add_node(alice.clone());
graph.add_node(bob.clone());
let path = graph.find_path(alice, bob);
assert_eq!(path, None);
}
#[test_log::test]
fn test_network_graph_find_path_inactive_link() {
let mut graph = NetworkGraph::new();
let alice = test_node_id("alice");
let bob = test_node_id("bob");
graph.add_node(alice.clone());
graph.add_node(bob.clone());
graph.connect_nodes(
alice.clone(),
bob.clone(),
LinkInfo {
latency: Duration::from_millis(10),
packet_loss: 0.0,
bandwidth_limit: None,
is_active: false,
},
);
let path = graph.find_path(alice, bob);
assert_eq!(path, None);
}
#[test_log::test]
fn test_network_graph_add_partition() {
let mut graph = NetworkGraph::new();
let alice = test_node_id("alice");
let bob = test_node_id("bob");
let charlie = test_node_id("charlie");
let dave = test_node_id("dave");
graph.add_node(alice.clone());
graph.add_node(bob.clone());
graph.add_node(charlie.clone());
graph.add_node(dave.clone());
let link = LinkInfo {
latency: Duration::from_millis(10),
packet_loss: 0.0,
bandwidth_limit: None,
is_active: true,
};
graph.connect_nodes(alice.clone(), charlie.clone(), link.clone());
graph.connect_nodes(alice.clone(), dave.clone(), link.clone());
graph.connect_nodes(bob.clone(), charlie.clone(), link.clone());
graph.connect_nodes(bob.clone(), dave.clone(), link);
let group_a = vec![alice.clone(), bob.clone()];
let group_b = vec![charlie.clone(), dave.clone()];
graph.add_partition(&group_a, &group_b);
assert!(!graph.links.contains_key(&(alice.clone(), charlie.clone())));
assert!(!graph.links.contains_key(&(alice.clone(), dave.clone())));
assert!(!graph.links.contains_key(&(bob.clone(), charlie.clone())));
assert!(!graph.links.contains_key(&(bob, dave)));
assert_eq!(graph.find_path(alice, charlie), None);
}
#[test_log::test]
fn test_network_graph_heal_partition() {
let mut graph = NetworkGraph::new();
let alice = test_node_id("alice");
let bob = test_node_id("bob");
graph.add_node(alice.clone());
graph.add_node(bob.clone());
let group_a = vec![alice.clone()];
let group_b = vec![bob.clone()];
graph.heal_partition(&group_a, &group_b);
assert!(graph.links.contains_key(&(alice.clone(), bob.clone())));
assert!(graph.links.contains_key(&(bob.clone(), alice.clone())));
assert!(graph.find_path(alice, bob).is_some());
}
#[test_log::test]
fn test_network_graph_heal_partition_multiple_nodes() {
let mut graph = NetworkGraph::new();
let alice = test_node_id("heal_alice");
let bob = test_node_id("heal_bob");
let charlie = test_node_id("heal_charlie");
let dave = test_node_id("heal_dave");
graph.add_node(alice.clone());
graph.add_node(bob.clone());
graph.add_node(charlie.clone());
graph.add_node(dave.clone());
let group_a = vec![alice.clone(), bob.clone()];
let group_b = vec![charlie.clone(), dave.clone()];
graph.heal_partition(&group_a, &group_b);
assert!(graph.links.contains_key(&(alice.clone(), charlie.clone())));
assert!(graph.links.contains_key(&(alice.clone(), dave.clone())));
assert!(graph.links.contains_key(&(bob.clone(), charlie.clone())));
assert!(graph.links.contains_key(&(bob.clone(), dave.clone())));
assert!(graph.links.contains_key(&(charlie.clone(), alice.clone())));
assert!(graph.links.contains_key(&(dave.clone(), alice.clone())));
assert!(graph.links.contains_key(&(charlie.clone(), bob.clone())));
assert!(graph.links.contains_key(&(dave.clone(), bob.clone())));
assert!(graph.find_path(alice.clone(), charlie.clone()).is_some());
assert!(graph.find_path(alice, dave.clone()).is_some());
assert!(graph.find_path(bob.clone(), charlie).is_some());
assert!(graph.find_path(bob, dave).is_some());
}
#[test_log::test]
fn test_network_graph_get_node_mut() {
let mut graph = NetworkGraph::new();
let alice = test_node_id("alice");
graph.add_node(alice.clone());
let node = graph.get_node_mut(&alice).unwrap();
node.registered_names
.insert("test".to_string(), "value".to_string());
let node = graph.get_node(&alice).unwrap();
assert!(node.registered_names.contains_key("test"));
}
#[test_log::test]
fn test_network_graph_get_node_nonexistent() {
let graph = NetworkGraph::new();
let alice = test_node_id("alice");
assert!(graph.get_node(&alice).is_none());
}
#[test_log::test]
fn test_network_graph_get_node_mut_nonexistent() {
let mut graph = NetworkGraph::new();
let alice = test_node_id("alice");
assert!(graph.get_node_mut(&alice).is_none());
}
#[test_log::test]
fn test_simulator_p2p_new() {
let sim = SimulatorP2P::new();
let id = sim.local_node_id();
assert_eq!(id.as_bytes().len(), 32);
}
#[test_log::test]
fn test_simulator_p2p_with_seed() {
let sim1 = SimulatorP2P::with_seed("alice");
let sim2 = SimulatorP2P::with_seed("alice");
assert_eq!(sim1.local_node_id(), sim2.local_node_id());
}
#[test_log::test(switchy_async::test)]
async fn test_simulator_p2p_connect_creates_queues() {
let alice = SimulatorP2P::with_seed("alice");
let bob_id = test_node_id("bob");
{
let mut graph = alice.network_graph.write().await;
graph.add_node(alice.local_node_id().clone());
graph.add_node(bob_id.clone());
graph.connect_nodes(
alice.local_node_id().clone(),
bob_id.clone(),
LinkInfo {
latency: Duration::from_millis(10),
packet_loss: 0.0,
bandwidth_limit: None,
is_active: true,
},
);
}
let conn = alice.connect(bob_id.clone()).await.unwrap();
assert!(conn.is_connected());
assert_eq!(conn.remote_node_id(), &bob_id);
let alice_node = alice
.network_graph
.read()
.await
.get_node(alice.local_node_id())
.unwrap()
.clone();
assert!(alice_node.message_queues.contains_key(&bob_id));
}
#[test_log::test(switchy_async::test)]
async fn test_simulator_p2p_connect_no_route() {
let alice = SimulatorP2P::with_seed("alice");
let bob_id = test_node_id("bob");
{
let mut graph = alice.network_graph.write().await;
graph.add_node(alice.local_node_id().clone());
graph.add_node(bob_id.clone());
}
let result = alice.connect(bob_id).await;
assert!(result.is_err());
assert_eq!(result.unwrap_err(), "No route to destination");
}
#[test_log::test(switchy_async::test)]
async fn test_simulator_p2p_register_peer() {
let sim = SimulatorP2P::with_seed("alice");
let bob_id = test_node_id("bob");
let result = sim.register_peer("bob", bob_id.clone()).await;
assert!(result.is_ok());
let node = sim
.network_graph
.read()
.await
.get_node(&bob_id)
.unwrap()
.clone();
assert!(node.registered_names.contains_key("bob"));
}
#[test_log::test(switchy_async::test)]
async fn test_simulator_connection_recv_empty_queue() {
let (alice, _alice_id, bob_id) = SimulatorP2P::test_setup();
let mut conn = alice.connect(bob_id).await.unwrap();
let result = conn.recv().await;
assert!(result.is_err());
assert_eq!(result.unwrap_err(), "No message available");
}
#[test_log::test(switchy_async::test)]
async fn test_simulator_connection_send_after_close() {
let (alice, _alice_id, bob_id) = SimulatorP2P::test_setup();
let mut conn = alice.connect(bob_id).await.unwrap();
conn.close().unwrap();
let result = conn.send(b"test").await;
assert!(result.is_err());
assert_eq!(result.unwrap_err(), "Connection closed");
}
#[test_log::test(switchy_async::test)]
async fn test_simulator_connection_message_too_large() {
let (alice, _alice_id, bob_id) = SimulatorP2P::test_setup();
let mut conn = alice.connect(bob_id).await.unwrap();
let max_size = max_message_size();
let large_data = vec![0u8; max_size + 1];
let result = conn.send(&large_data).await;
assert!(result.is_err());
assert!(result.unwrap_err().contains("Message too large"));
}
#[test_log::test(switchy_async::test)]
async fn test_simulator_connection_close_idempotent() {
let (alice, _alice_id, bob_id) = SimulatorP2P::test_setup();
let mut conn = alice.connect(bob_id).await.unwrap();
assert!(conn.is_connected());
conn.close().unwrap();
assert!(!conn.is_connected());
conn.close().unwrap();
assert!(!conn.is_connected());
}
#[test_log::test(switchy_async::test)]
async fn test_simulator_reconnect_after_close() {
let (alice, _alice_id, bob_id) = SimulatorP2P::test_setup();
let mut conn1 = alice.connect(bob_id.clone()).await.unwrap();
assert!(conn1.is_connected());
assert_eq!(conn1.remote_node_id(), &bob_id);
conn1.close().unwrap();
assert!(!conn1.is_connected());
let conn2 = alice.connect(bob_id.clone()).await.unwrap();
assert!(conn2.is_connected());
assert_eq!(conn2.remote_node_id(), &bob_id);
assert!(!conn1.is_connected());
}
#[test_log::test]
fn test_simulator_connection_calculate_path_latency() {
let mut graph = NetworkGraph::new();
let alice = test_node_id("alice");
let bob = test_node_id("bob");
let charlie = test_node_id("charlie");
graph.add_node(alice.clone());
graph.add_node(bob.clone());
graph.add_node(charlie.clone());
graph.connect_nodes(
alice.clone(),
bob.clone(),
LinkInfo {
latency: Duration::from_millis(10),
packet_loss: 0.0,
bandwidth_limit: None,
is_active: true,
},
);
graph.connect_nodes(
bob.clone(),
charlie.clone(),
LinkInfo {
latency: Duration::from_millis(20),
packet_loss: 0.0,
bandwidth_limit: None,
is_active: true,
},
);
let path = vec![alice, bob, charlie];
let latency = SimulatorConnection::calculate_path_latency(&graph, &path);
assert_eq!(latency, Duration::from_millis(30));
}
#[test_log::test]
fn test_simulator_connection_calculate_path_latency_empty_path() {
let graph = NetworkGraph::new();
let path = vec![];
let latency = SimulatorConnection::calculate_path_latency(&graph, &path);
assert_eq!(latency, Duration::from_millis(0));
}
#[test_log::test]
fn test_simulator_connection_calculate_path_latency_single_node() {
let graph = NetworkGraph::new();
let alice = test_node_id("alice");
let path = vec![alice];
let latency = SimulatorConnection::calculate_path_latency(&graph, &path);
assert_eq!(latency, Duration::from_millis(0));
}
#[test_log::test]
fn test_p2p_node_id_trait_from_bytes() {
use crate::traits::P2PNodeId;
let bytes = [123u8; 32];
let id = <SimulatorNodeId as P2PNodeId>::from_bytes(&bytes).unwrap();
assert_eq!(id.as_bytes(), &bytes);
}
#[test_log::test]
fn test_p2p_node_id_trait_as_bytes() {
use crate::traits::P2PNodeId;
let bytes = [42u8; 32];
let id = <SimulatorNodeId as P2PNodeId>::from_bytes(&bytes).unwrap();
assert_eq!(id.as_bytes(), &bytes);
}
#[test_log::test]
fn test_p2p_node_id_trait_fmt_short() {
use crate::traits::P2PNodeId;
let bytes = [0xAB; 32];
let id = <SimulatorNodeId as P2PNodeId>::from_bytes(&bytes).unwrap();
let short = id.fmt_short();
assert_eq!(short.len(), 10);
assert_eq!(short, "ababababab");
}
#[test_log::test(switchy_async::test)]
async fn test_network_partition_blocks_path_finding() {
let alice = SimulatorP2P::with_seed("alice_partition_path");
let bob_id = test_node_id("bob_partition_path");
let charlie_id = test_node_id("charlie_partition_path");
let alice_id = alice.local_node_id().clone();
let shared_graph = alice.network_graph.clone();
{
let mut graph = shared_graph.write().await;
graph.add_node(alice_id.clone());
graph.add_node(bob_id.clone());
graph.add_node(charlie_id.clone());
let link = LinkInfo {
latency: Duration::from_millis(1),
packet_loss: 0.0,
bandwidth_limit: None,
is_active: true,
};
graph.connect_nodes(alice_id.clone(), bob_id.clone(), link.clone());
graph.connect_nodes(bob_id.clone(), charlie_id.clone(), link);
}
assert!(
shared_graph
.read()
.await
.find_path(alice_id.clone(), charlie_id.clone())
.is_some()
);
{
let mut graph = shared_graph.write().await;
graph.add_partition(
&[alice_id.clone(), bob_id.clone()],
std::slice::from_ref(&charlie_id),
);
}
assert!(
shared_graph
.read()
.await
.find_path(alice_id.clone(), charlie_id.clone())
.is_none()
);
{
let mut graph = shared_graph.write().await;
graph.heal_partition(&[bob_id], std::slice::from_ref(&charlie_id));
}
assert!(
shared_graph
.read()
.await
.find_path(alice_id, charlie_id)
.is_some()
);
}
#[test_log::test(switchy_async::test)]
async fn test_simulator_p2p_register_peer_new_node() {
let sim = SimulatorP2P::with_seed("register_new");
let new_id = test_node_id("new_peer");
sim.register_peer("new_peer", new_id.clone()).await.unwrap();
let graph = sim.network_graph.read().await;
assert!(graph.get_node(&new_id).is_some());
drop(graph);
}
#[test_log::test(switchy_async::test)]
async fn test_simulator_p2p_connect_adds_nodes_to_graph() {
let alice = SimulatorP2P::with_seed("alice_auto_add");
let bob_id = test_node_id("bob_auto_add");
assert!(alice.network_graph.read().await.nodes.is_empty());
{
let mut graph = alice.network_graph.write().await;
graph.connect_nodes(
alice.local_node_id().clone(),
bob_id.clone(),
LinkInfo {
latency: Duration::from_millis(1),
packet_loss: 0.0,
bandwidth_limit: None,
is_active: true,
},
);
}
let _conn = alice.connect(bob_id.clone()).await.unwrap();
let graph = alice.network_graph.read().await;
assert!(graph.get_node(alice.local_node_id()).is_some());
assert!(graph.get_node(&bob_id).is_some());
drop(graph);
}
#[test_log::test]
fn test_network_graph_find_path_shortest() {
let mut graph = NetworkGraph::new();
let a = test_node_id("node_a");
let b = test_node_id("node_b");
let c = test_node_id("node_c");
let d = test_node_id("node_d");
graph.add_node(a.clone());
graph.add_node(b.clone());
graph.add_node(c.clone());
graph.add_node(d.clone());
let link = LinkInfo {
latency: Duration::from_millis(10),
packet_loss: 0.0,
bandwidth_limit: None,
is_active: true,
};
graph.connect_nodes(a.clone(), b.clone(), link.clone());
graph.connect_nodes(b, c.clone(), link.clone());
graph.connect_nodes(c, d.clone(), link.clone());
graph.connect_nodes(a.clone(), d.clone(), link);
let path = graph.find_path(a.clone(), d.clone()).unwrap();
assert_eq!(path.len(), 2);
assert_eq!(path[0], a);
assert_eq!(path[1], d);
}
#[test_log::test]
fn test_network_graph_find_path_cyclic_graph() {
let mut graph = NetworkGraph::new();
let a = test_node_id("cycle_a");
let b = test_node_id("cycle_b");
let c = test_node_id("cycle_c");
let d = test_node_id("cycle_d");
graph.add_node(a.clone());
graph.add_node(b.clone());
graph.add_node(c.clone());
graph.add_node(d.clone());
let link = LinkInfo {
latency: Duration::from_millis(10),
packet_loss: 0.0,
bandwidth_limit: None,
is_active: true,
};
graph.connect_nodes(a.clone(), b.clone(), link.clone());
graph.connect_nodes(b, c.clone(), link.clone());
graph.connect_nodes(c.clone(), a.clone(), link.clone()); graph.connect_nodes(c, d.clone(), link);
let path = graph.find_path(a.clone(), d.clone()).unwrap();
assert_eq!(path.len(), 3, "BFS should find shortest path a -> c -> d");
assert_eq!(path[0], a);
assert_eq!(path[2], d);
let unique_nodes: std::collections::BTreeSet<_> = path.iter().collect();
assert_eq!(
unique_nodes.len(),
path.len(),
"Path should not contain duplicate nodes"
);
}
#[test_log::test]
fn test_simulator_connection_calculate_path_latency_missing_links() {
let mut graph = NetworkGraph::new();
let a = test_node_id("lat_a");
let b = test_node_id("lat_b");
let c = test_node_id("lat_c");
graph.add_node(a.clone());
graph.add_node(b.clone());
graph.add_node(c.clone());
graph.connect_nodes(
a.clone(),
b.clone(),
LinkInfo {
latency: Duration::from_millis(50),
packet_loss: 0.0,
bandwidth_limit: None,
is_active: true,
},
);
let path = vec![a, b, c];
let latency = SimulatorConnection::calculate_path_latency(&graph, &path);
assert_eq!(latency, Duration::from_millis(50));
}
#[test_log::test(switchy_async::test(real_time))]
async fn test_discover_returns_registered_node_id() {
let sim = SimulatorP2P::with_seed("discover_test");
let peer_id = test_node_id("discoverable_peer");
sim.register_peer("my-peer", peer_id.clone()).await.unwrap();
let discovered_id = sim.discover("my-peer").await.unwrap();
assert_eq!(discovered_id, peer_id);
}
#[test_log::test(switchy_async::test(real_time))]
async fn test_discover_name_not_found_returns_error() {
let sim = SimulatorP2P::with_seed("discover_notfound_test");
let result = sim.discover("nonexistent-peer").await;
assert!(result.is_err());
let err = result.unwrap_err();
assert!(
err.contains("not found"),
"Expected 'not found' error, got: {err}"
);
}
#[test_log::test(switchy_async::test(real_time))]
async fn test_discover_multiple_registered_names() {
let sim = SimulatorP2P::with_seed("discover_multi_test");
let peer1_id = test_node_id("peer1");
let peer2_id = test_node_id("peer2");
sim.register_peer("first-peer", peer1_id.clone())
.await
.unwrap();
sim.register_peer("second-peer", peer2_id.clone())
.await
.unwrap();
let discovered1 = sim.discover("first-peer").await.unwrap();
let discovered2 = sim.discover("second-peer").await.unwrap();
assert_eq!(discovered1, peer1_id);
assert_eq!(discovered2, peer2_id);
assert_ne!(discovered1, discovered2);
}
#[test_log::test(switchy_async::test(real_time))]
async fn test_connect_by_name_success() {
let alice = SimulatorP2P::with_seed("alice_cbn");
let bob_id = test_node_id("bob_cbn");
alice.register_peer("bob", bob_id.clone()).await.unwrap();
{
let mut graph = alice.network_graph.write().await;
graph.add_node(alice.local_node_id().clone());
graph.connect_nodes(
alice.local_node_id().clone(),
bob_id.clone(),
LinkInfo {
latency: Duration::from_millis(1),
packet_loss: 0.0,
bandwidth_limit: None,
is_active: true,
},
);
}
let conn = alice.connect_by_name("bob").await.unwrap();
assert!(conn.is_connected());
assert_eq!(conn.remote_node_id(), &bob_id);
}
#[test_log::test(switchy_async::test(real_time))]
async fn test_connect_by_name_discovery_fails() {
let alice = SimulatorP2P::with_seed("alice_cbn_fail");
let result = alice.connect_by_name("unknown-peer").await;
assert!(result.is_err());
let err = result.unwrap_err();
assert!(
err.contains("not found"),
"Expected discovery error, got: {err}"
);
}
#[test_log::test(switchy_async::test(real_time))]
async fn test_connect_by_name_connection_fails_no_route() {
let alice = SimulatorP2P::with_seed("alice_cbn_noroute");
let bob_id = test_node_id("bob_cbn_noroute");
alice
.register_peer("bob-noroute", bob_id.clone())
.await
.unwrap();
{
let mut graph = alice.network_graph.write().await;
graph.add_node(alice.local_node_id().clone());
}
let result = alice.connect_by_name("bob-noroute").await;
assert!(result.is_err());
let err = result.unwrap_err();
assert!(
err.contains("No route"),
"Expected 'No route' error, got: {err}"
);
}
#[test_log::test]
fn test_packet_lost_zero_packet_loss_never_drops() {
let mut graph = NetworkGraph::new();
let a = test_node_id("pl_zero_a");
let b = test_node_id("pl_zero_b");
graph.add_node(a.clone());
graph.add_node(b.clone());
graph.connect_nodes(
a.clone(),
b.clone(),
LinkInfo {
latency: Duration::from_millis(1),
packet_loss: 0.0, bandwidth_limit: None,
is_active: true,
},
);
let path = vec![a, b];
for _ in 0..100 {
assert!(
!SimulatorConnection::packet_lost(&graph, &path),
"Packet should never be lost with 0% packet loss"
);
}
}
#[test_log::test]
fn test_packet_lost_empty_path_never_drops() {
let graph = NetworkGraph::new();
let path: Vec<SimulatorNodeId> = vec![];
assert!(!SimulatorConnection::packet_lost(&graph, &path));
}
#[test_log::test]
fn test_packet_lost_single_node_path_never_drops() {
let graph = NetworkGraph::new();
let a = test_node_id("pl_single");
let path = vec![a];
assert!(!SimulatorConnection::packet_lost(&graph, &path));
}
#[test_log::test(switchy_async::test)]
async fn test_send_fails_after_network_partition() {
let alice = SimulatorP2P::with_seed("alice_partition_send");
let bob_id = test_node_id("bob_partition_send");
let alice_id = alice.local_node_id().clone();
{
let mut graph = alice.network_graph.write().await;
graph.add_node(alice_id.clone());
graph.add_node(bob_id.clone());
graph.connect_nodes(
alice_id.clone(),
bob_id.clone(),
LinkInfo {
latency: Duration::from_millis(0),
packet_loss: 0.0,
bandwidth_limit: None,
is_active: true,
},
);
}
let mut conn = alice.connect(bob_id.clone()).await.unwrap();
assert!(conn.is_connected());
{
let mut graph = alice.network_graph.write().await;
graph.add_partition(&[alice_id], &[bob_id]);
}
let result = conn.send(b"hello after partition").await;
assert!(result.is_err());
let err = result.unwrap_err();
assert!(
err.contains("No route"),
"Expected 'No route' error after partition, got: {err}"
);
}
#[test_log::test(switchy_async::test)]
async fn test_message_delivery_fifo_ordering() {
let alice = SimulatorP2P::with_seed("alice_fifo");
let bob_id = test_node_id("bob_fifo");
let alice_id = alice.local_node_id().clone();
{
let mut graph = alice.network_graph.write().await;
graph.add_node(alice_id.clone());
graph.add_node(bob_id.clone());
graph.connect_nodes(
alice_id.clone(),
bob_id.clone(),
LinkInfo {
latency: Duration::from_millis(0),
packet_loss: 0.0,
bandwidth_limit: None,
is_active: true,
},
);
}
let mut conn = alice.connect(bob_id.clone()).await.unwrap();
conn.send(b"message1").await.unwrap();
conn.send(b"message2").await.unwrap();
conn.send(b"message3").await.unwrap();
let bob = SimulatorP2P {
node_id: bob_id.clone(),
network_graph: alice.network_graph.clone(),
connections: Arc::new(RwLock::new(BTreeMap::new())),
};
let mut bob_conn = bob.connect(alice_id).await.unwrap();
let msg1 = bob_conn.recv().await.unwrap();
let msg2 = bob_conn.recv().await.unwrap();
let msg3 = bob_conn.recv().await.unwrap();
assert_eq!(msg1, b"message1");
assert_eq!(msg2, b"message2");
assert_eq!(msg3, b"message3");
}
}