use super::{TopologyBuilder, TopologyConfig, TopologyEvent};
use crate::routing::DataPacket;
use crate::transport::{MeshConnection, MeshTransport, NodeId};
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use tokio::time::sleep;
use tracing::{debug, info, warn};
#[derive(Debug, Clone)]
struct RetryState {
attempts: u32,
next_retry: Instant,
}
fn calculate_backoff(
initial_backoff: Duration,
max_backoff: Duration,
backoff_multiplier: f64,
attempts: u32,
) -> Duration {
let multiplier = backoff_multiplier.powi(attempts as i32);
let backoff_secs = initial_backoff.as_secs_f64() * multiplier;
let capped_secs = backoff_secs.min(max_backoff.as_secs_f64());
Duration::from_secs_f64(capped_secs)
}
fn spawn_peer_connection_retry(
peer_id: String,
transport: Arc<dyn MeshTransport>,
peer_connection: Arc<RwLock<Option<Box<dyn MeshConnection>>>>,
selected_peer_id: Arc<RwLock<Option<NodeId>>>,
peer_retry_state: Arc<RwLock<Option<RetryState>>>,
telemetry_buffer: Arc<RwLock<Vec<DataPacket>>>,
config: TopologyConfig,
) {
tokio::spawn(async move {
let node_id = NodeId::new(peer_id.clone());
loop {
let (attempts, sleep_duration) = {
let retry_state = peer_retry_state.read().unwrap_or_else(|e| e.into_inner());
match retry_state.as_ref() {
None => {
return;
}
Some(state) => {
if state.attempts >= config.max_retries {
warn!(
"Max retries ({}) reached for peer {}, giving up",
config.max_retries, peer_id
);
peer_retry_state
.write()
.unwrap_or_else(|e| e.into_inner())
.take();
return;
}
let now = Instant::now();
let sleep_duration = if now < state.next_retry {
state.next_retry.duration_since(now)
} else {
Duration::from_secs(0)
};
(state.attempts, sleep_duration)
}
}
};
if sleep_duration > Duration::from_secs(0) {
sleep(sleep_duration).await;
}
info!(
"Retrying connection to peer {} (attempt {}/{})",
peer_id,
attempts + 1,
config.max_retries
);
match transport.connect(&node_id).await {
Ok(conn) => {
*peer_connection.write().unwrap_or_else(|e| e.into_inner()) = Some(conn);
*selected_peer_id.write().unwrap_or_else(|e| e.into_inner()) =
Some(node_id.clone());
peer_retry_state
.write()
.unwrap_or_else(|e| e.into_inner())
.take();
info!(
"Successfully connected to peer {} after {} retries",
peer_id, attempts
);
TopologyManager::flush_buffer(&telemetry_buffer, &transport, &node_id).await;
return;
}
Err(e) => {
warn!(
"Failed to connect to peer {} (attempt {}/{}): {}",
peer_id,
attempts + 1,
config.max_retries,
e
);
let new_attempts = attempts + 1;
if new_attempts >= config.max_retries {
warn!(
"Max retries ({}) reached for peer {}, giving up",
config.max_retries, peer_id
);
peer_retry_state
.write()
.unwrap_or_else(|e| e.into_inner())
.take();
return;
}
let backoff = calculate_backoff(
config.initial_backoff,
config.max_backoff,
config.backoff_multiplier,
new_attempts,
);
let next_retry = Instant::now() + backoff;
*peer_retry_state.write().unwrap_or_else(|e| e.into_inner()) =
Some(RetryState {
attempts: new_attempts,
next_retry,
});
debug!("Next retry for peer {} in {:?}", peer_id, backoff);
}
}
}
});
}
fn spawn_lateral_connection_retry(
peer_id: String,
transport: Arc<dyn MeshTransport>,
lateral_connections: Arc<RwLock<HashMap<String, Box<dyn MeshConnection>>>>,
lateral_retry_state: Arc<RwLock<HashMap<String, RetryState>>>,
config: TopologyConfig,
) {
tokio::spawn(async move {
let node_id = NodeId::new(peer_id.clone());
loop {
let (attempts, sleep_duration) = {
let retry_states = lateral_retry_state
.read()
.unwrap_or_else(|e| e.into_inner());
match retry_states.get(&peer_id) {
None => {
return;
}
Some(state) => {
if state.attempts >= config.max_retries {
warn!(
"Max retries ({}) reached for lateral peer {}, giving up",
config.max_retries, peer_id
);
lateral_retry_state
.write()
.unwrap_or_else(|e| e.into_inner())
.remove(&peer_id);
return;
}
let now = Instant::now();
let sleep_duration = if now < state.next_retry {
state.next_retry.duration_since(now)
} else {
Duration::from_secs(0)
};
(state.attempts, sleep_duration)
}
}
};
if sleep_duration > Duration::from_secs(0) {
sleep(sleep_duration).await;
}
info!(
"Retrying connection to lateral peer {} (attempt {}/{})",
peer_id,
attempts + 1,
config.max_retries
);
match transport.connect(&node_id).await {
Ok(conn) => {
lateral_connections
.write()
.unwrap_or_else(|e| e.into_inner())
.insert(peer_id.clone(), conn);
lateral_retry_state
.write()
.unwrap_or_else(|e| e.into_inner())
.remove(&peer_id);
info!(
"Successfully connected to lateral peer {} after {} retries",
peer_id, attempts
);
return;
}
Err(e) => {
warn!(
"Failed to connect to lateral peer {} (attempt {}/{}): {}",
peer_id,
attempts + 1,
config.max_retries,
e
);
let new_attempts = attempts + 1;
if new_attempts >= config.max_retries {
warn!(
"Max retries ({}) reached for lateral peer {}, giving up",
config.max_retries, peer_id
);
lateral_retry_state
.write()
.unwrap_or_else(|e| e.into_inner())
.remove(&peer_id);
return;
}
let backoff = calculate_backoff(
config.initial_backoff,
config.max_backoff,
config.backoff_multiplier,
new_attempts,
);
let next_retry = Instant::now() + backoff;
lateral_retry_state
.write()
.unwrap_or_else(|e| e.into_inner())
.insert(
peer_id.clone(),
RetryState {
attempts: new_attempts,
next_retry,
},
);
debug!("Next retry for lateral peer {} in {:?}", peer_id, backoff);
}
}
}
});
}
pub struct TopologyManager {
builder: TopologyBuilder,
transport: Arc<dyn MeshTransport>,
peer_connection: Arc<RwLock<Option<Box<dyn MeshConnection>>>>,
selected_peer_id: Arc<RwLock<Option<NodeId>>>,
lateral_connections: Arc<RwLock<HashMap<String, Box<dyn MeshConnection>>>>,
peer_retry_state: Arc<RwLock<Option<RetryState>>>,
lateral_retry_state: Arc<RwLock<HashMap<String, RetryState>>>,
telemetry_buffer: Arc<RwLock<Vec<DataPacket>>>,
task_handle: RwLock<Option<JoinHandle<()>>>,
}
impl TopologyManager {
pub fn new(builder: TopologyBuilder, transport: Arc<dyn MeshTransport>) -> Self {
Self {
builder,
transport,
peer_connection: Arc::new(RwLock::new(None)),
selected_peer_id: Arc::new(RwLock::new(None)),
lateral_connections: Arc::new(RwLock::new(HashMap::new())),
peer_retry_state: Arc::new(RwLock::new(None)),
lateral_retry_state: Arc::new(RwLock::new(HashMap::new())),
telemetry_buffer: Arc::new(RwLock::new(Vec::new())),
task_handle: RwLock::new(None),
}
}
pub async fn start(&self) -> Result<(), Box<dyn std::error::Error>> {
self.transport.start().await?;
self.builder.start().await;
if let Some(rx) = self.builder.subscribe() {
let transport = self.transport.clone();
let peer_connection = self.peer_connection.clone();
let selected_peer_id = self.selected_peer_id.clone();
let lateral_connections = self.lateral_connections.clone();
let peer_retry_state = self.peer_retry_state.clone();
let lateral_retry_state = self.lateral_retry_state.clone();
let telemetry_buffer = self.telemetry_buffer.clone();
let builder = self.builder.clone();
let config = self.builder.config().clone();
let handle = tokio::spawn(async move {
Self::event_loop(
rx,
transport,
peer_connection,
selected_peer_id,
lateral_connections,
peer_retry_state,
lateral_retry_state,
telemetry_buffer,
builder,
config,
)
.await;
});
*self.task_handle.write().unwrap_or_else(|e| e.into_inner()) = Some(handle);
}
Ok(())
}
pub async fn stop(&self) -> Result<(), Box<dyn std::error::Error>> {
if let Some(handle) = self
.task_handle
.write()
.unwrap_or_else(|e| e.into_inner())
.take()
{
handle.abort();
}
self.builder.stop().await;
let current_selected_peer_id = self
.selected_peer_id
.write()
.unwrap_or_else(|e| e.into_inner())
.take();
if let Some(selected_peer_id) = current_selected_peer_id {
if let Err(e) = self.transport.disconnect(&selected_peer_id).await {
warn!("Failed to disconnect from selected peer during stop: {}", e);
}
}
let lateral_peer_ids: Vec<String> = self
.lateral_connections
.read()
.unwrap_or_else(|e| e.into_inner())
.keys()
.cloned()
.collect();
for peer_id in lateral_peer_ids {
let node_id = NodeId::new(peer_id.clone());
if let Err(e) = self.transport.disconnect(&node_id).await {
warn!(
"Failed to disconnect from lateral peer {} during stop: {}",
peer_id, e
);
}
}
self.lateral_connections
.write()
.unwrap_or_else(|e| e.into_inner())
.clear();
self.transport.stop().await?;
Ok(())
}
pub fn get_selected_peer_id(&self) -> Option<NodeId> {
self.selected_peer_id
.read()
.unwrap_or_else(|e| e.into_inner())
.clone()
}
pub fn is_connected_to_peer(&self, node_id: &NodeId) -> bool {
self.selected_peer_id
.read()
.unwrap_or_else(|e| e.into_inner())
.as_ref()
.map(|id| id == node_id)
.unwrap_or(false)
}
pub fn builder(&self) -> &TopologyBuilder {
&self.builder
}
pub fn get_lateral_peer_ids(&self) -> Vec<String> {
self.lateral_connections
.read()
.unwrap_or_else(|e| e.into_inner())
.keys()
.cloned()
.collect()
}
pub fn lateral_peer_count(&self) -> usize {
self.lateral_connections
.read()
.unwrap_or_else(|e| e.into_inner())
.len()
}
pub fn send_telemetry(&self, packet: DataPacket) -> Result<bool, String> {
let max_buffer_size = self.builder.config().max_telemetry_buffer_size;
if max_buffer_size == 0 {
return Err(
"Telemetry buffering is disabled (max_telemetry_buffer_size = 0)".to_string(),
);
}
let mut buffer = self
.telemetry_buffer
.write()
.unwrap_or_else(|e| e.into_inner());
if buffer.len() >= max_buffer_size {
buffer.remove(0);
warn!(
"Telemetry buffer full ({}), dropping oldest packet",
max_buffer_size
);
}
info!(
"Queuing telemetry packet {} for parent (buffer size: {}/{})",
packet.packet_id,
buffer.len() + 1,
max_buffer_size
);
buffer.push(packet);
Ok(false)
}
pub fn telemetry_buffer_size(&self) -> usize {
self.telemetry_buffer
.read()
.unwrap_or_else(|e| e.into_inner())
.len()
}
fn drain_buffer(telemetry_buffer: &Arc<RwLock<Vec<DataPacket>>>) -> Vec<DataPacket> {
let mut buffer = telemetry_buffer.write().unwrap_or_else(|e| e.into_inner());
buffer.drain(..).collect()
}
async fn flush_buffer(
telemetry_buffer: &Arc<RwLock<Vec<DataPacket>>>,
transport: &Arc<dyn MeshTransport>,
peer_id: &NodeId,
) {
let packets = Self::drain_buffer(telemetry_buffer);
if packets.is_empty() {
return;
}
let count = packets.len();
info!(
"Flushing {} buffered telemetry packets to parent {}",
count, peer_id
);
for packet in &packets {
let data = serde_json::to_vec(packet).unwrap_or_default();
if let Err(e) = transport.send_to(peer_id, &data).await {
warn!(
"Failed to send telemetry packet {}: {}",
packet.packet_id, e
);
}
}
info!("Flushed {} telemetry packets", count);
}
#[allow(clippy::too_many_arguments)]
async fn event_loop(
mut rx: mpsc::UnboundedReceiver<TopologyEvent>,
transport: Arc<dyn MeshTransport>,
peer_connection: Arc<RwLock<Option<Box<dyn MeshConnection>>>>,
selected_peer_id: Arc<RwLock<Option<NodeId>>>,
lateral_connections: Arc<RwLock<HashMap<String, Box<dyn MeshConnection>>>>,
peer_retry_state: Arc<RwLock<Option<RetryState>>>,
lateral_retry_state: Arc<RwLock<HashMap<String, RetryState>>>,
telemetry_buffer: Arc<RwLock<Vec<DataPacket>>>,
builder: TopologyBuilder,
config: TopologyConfig,
) {
while let Some(event) = rx.recv().await {
match event {
TopologyEvent::PeerSelected {
selected_peer_id: new_peer_id,
..
} => {
info!("Peer selected: {}", new_peer_id);
let node_id = NodeId::new(new_peer_id.clone());
match transport.connect(&node_id).await {
Ok(conn) => {
*peer_connection.write().unwrap_or_else(|e| e.into_inner()) =
Some(conn);
*selected_peer_id.write().unwrap_or_else(|e| e.into_inner()) =
Some(node_id.clone());
peer_retry_state
.write()
.unwrap_or_else(|e| e.into_inner())
.take(); info!("Successfully connected to peer: {}", new_peer_id);
Self::flush_buffer(&telemetry_buffer, &transport, &node_id).await;
}
Err(e) => {
warn!("Failed to connect to peer {}: {}", new_peer_id, e);
if config.max_retries > 0 {
let backoff = calculate_backoff(
config.initial_backoff,
config.max_backoff,
config.backoff_multiplier,
0, );
*peer_retry_state.write().unwrap_or_else(|e| e.into_inner()) =
Some(RetryState {
attempts: 0,
next_retry: Instant::now() + backoff,
});
debug!("Scheduled retry for peer {} in {:?}", new_peer_id, backoff);
spawn_peer_connection_retry(
new_peer_id,
transport.clone(),
peer_connection.clone(),
selected_peer_id.clone(),
peer_retry_state.clone(),
telemetry_buffer.clone(),
config.clone(),
);
}
}
}
}
TopologyEvent::PeerChanged {
old_peer_id,
new_peer_id,
..
} => {
info!("Selected peer changed: {} -> {}", old_peer_id, new_peer_id);
peer_retry_state
.write()
.unwrap_or_else(|e| e.into_inner())
.take();
let old_id = NodeId::new(old_peer_id.clone());
if let Err(e) = transport.disconnect(&old_id).await {
warn!("Failed to disconnect from old peer {}: {}", old_peer_id, e);
}
let new_id = NodeId::new(new_peer_id.clone());
match transport.connect(&new_id).await {
Ok(conn) => {
*peer_connection.write().unwrap_or_else(|e| e.into_inner()) =
Some(conn);
*selected_peer_id.write().unwrap_or_else(|e| e.into_inner()) =
Some(new_id.clone());
info!("Successfully changed to peer: {}", new_peer_id);
Self::flush_buffer(&telemetry_buffer, &transport, &new_id).await;
}
Err(e) => {
warn!("Failed to connect to new peer {}: {}", new_peer_id, e);
if config.max_retries > 0 {
let backoff = calculate_backoff(
config.initial_backoff,
config.max_backoff,
config.backoff_multiplier,
0, );
*peer_retry_state.write().unwrap_or_else(|e| e.into_inner()) =
Some(RetryState {
attempts: 0,
next_retry: Instant::now() + backoff,
});
debug!(
"Scheduled retry for new peer {} in {:?}",
new_peer_id, backoff
);
spawn_peer_connection_retry(
new_peer_id,
transport.clone(),
peer_connection.clone(),
selected_peer_id.clone(),
peer_retry_state.clone(),
telemetry_buffer.clone(),
config.clone(),
);
}
}
}
}
TopologyEvent::PeerLost { lost_peer_id } => {
info!("Selected peer lost: {}", lost_peer_id);
*peer_connection.write().unwrap_or_else(|e| e.into_inner()) = None;
*selected_peer_id.write().unwrap_or_else(|e| e.into_inner()) = None;
let node_id = NodeId::new(lost_peer_id.clone());
if let Err(e) = transport.disconnect(&node_id).await {
warn!(
"Failed to disconnect from lost peer {}: {}",
lost_peer_id, e
);
}
info!("Telemetry will be buffered until new parent connection is established");
info!("Triggering immediate parent re-selection after peer loss");
builder.reevaluate_peer().await;
debug!("Cleared connection to lost peer: {}", lost_peer_id);
}
TopologyEvent::PeerAdded { linked_peer_id } => {
info!("Linked peer added: {}", linked_peer_id);
}
TopologyEvent::PeerRemoved { linked_peer_id } => {
info!("Linked peer removed (beacon expired): {}", linked_peer_id);
let node_id = NodeId::new(linked_peer_id.clone());
if transport.is_connected(&node_id) {
if let Err(e) = transport.disconnect(&node_id).await {
warn!(
"Failed to disconnect from stale linked peer {}: {}",
linked_peer_id, e
);
} else {
debug!("Disconnected from stale linked peer: {}", linked_peer_id);
}
}
}
TopologyEvent::LateralPeerDiscovered { peer_id, .. } => {
info!("Lateral peer discovered: {}", peer_id);
let max_lateral = builder.config().max_lateral_connections;
let current_count = lateral_connections
.read()
.unwrap_or_else(|e| e.into_inner())
.len();
if let Some(max) = max_lateral {
if current_count >= max {
debug!(
"Skipping lateral peer {} - at connection limit ({}/{})",
peer_id, current_count, max
);
return;
}
}
let node_id = NodeId::new(peer_id.clone());
match transport.connect(&node_id).await {
Ok(conn) => {
lateral_connections
.write()
.unwrap_or_else(|e| e.into_inner())
.insert(peer_id.clone(), conn);
lateral_retry_state
.write()
.unwrap_or_else(|e| e.into_inner())
.remove(&peer_id); info!(
"Connected to lateral peer: {} ({}/{})",
peer_id,
current_count + 1,
max_lateral
.map(|m| m.to_string())
.unwrap_or_else(|| "unlimited".to_string())
);
}
Err(e) => {
warn!("Failed to connect to lateral peer {}: {}", peer_id, e);
if config.max_retries > 0 {
let backoff = calculate_backoff(
config.initial_backoff,
config.max_backoff,
config.backoff_multiplier,
0, );
lateral_retry_state
.write()
.unwrap_or_else(|e| e.into_inner())
.insert(
peer_id.clone(),
RetryState {
attempts: 0,
next_retry: Instant::now() + backoff,
},
);
debug!(
"Scheduled retry for lateral peer {} in {:?}",
peer_id, backoff
);
spawn_lateral_connection_retry(
peer_id,
transport.clone(),
lateral_connections.clone(),
lateral_retry_state.clone(),
config.clone(),
);
}
}
}
}
TopologyEvent::LateralPeerLost { peer_id } => {
info!("Lateral peer lost: {}", peer_id);
lateral_retry_state
.write()
.unwrap_or_else(|e| e.into_inner())
.remove(&peer_id);
if lateral_connections
.read()
.unwrap_or_else(|e| e.into_inner())
.contains_key(&peer_id)
{
lateral_connections
.write()
.unwrap_or_else(|e| e.into_inner())
.remove(&peer_id);
let node_id = NodeId::new(peer_id.clone());
if let Err(e) = transport.disconnect(&node_id).await {
warn!("Failed to disconnect from lateral peer {}: {}", peer_id, e);
} else {
debug!("Disconnected from lateral peer: {}", peer_id);
}
}
}
TopologyEvent::RoleChanged { old_role, new_role } => {
info!("Role changed: {:?} -> {:?}", old_role, new_role);
}
TopologyEvent::LevelChanged {
old_level,
new_level,
} => {
info!(
"Hierarchy level changed: {:?} -> {:?}",
old_level, new_level
);
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::transport::{MeshConnection as MeshConnectionTrait, MeshTransport, NodeId, Result};
use std::sync::Arc;
struct MockTransport {
started: Arc<RwLock<bool>>,
stopped: Arc<RwLock<bool>>,
connections: Arc<RwLock<Vec<NodeId>>>,
}
impl MockTransport {
fn new() -> Self {
Self {
started: Arc::new(RwLock::new(false)),
stopped: Arc::new(RwLock::new(false)),
connections: Arc::new(RwLock::new(Vec::new())),
}
}
fn is_started(&self) -> bool {
*self.started.read().unwrap_or_else(|e| e.into_inner())
}
fn is_stopped(&self) -> bool {
*self.stopped.read().unwrap_or_else(|e| e.into_inner())
}
fn has_connection(&self, node_id: &NodeId) -> bool {
self.connections
.read()
.unwrap()
.iter()
.any(|id| id == node_id)
}
}
struct MockConnection {
peer_id: NodeId,
connected_at: std::time::Instant,
}
impl MeshConnectionTrait for MockConnection {
fn peer_id(&self) -> &NodeId {
&self.peer_id
}
fn is_alive(&self) -> bool {
true
}
fn connected_at(&self) -> std::time::Instant {
self.connected_at
}
}
#[async_trait::async_trait]
impl MeshTransport for MockTransport {
async fn start(&self) -> Result<()> {
*self.started.write().unwrap_or_else(|e| e.into_inner()) = true;
Ok(())
}
async fn stop(&self) -> Result<()> {
*self.stopped.write().unwrap_or_else(|e| e.into_inner()) = true;
Ok(())
}
async fn connect(&self, peer_id: &NodeId) -> Result<Box<dyn MeshConnectionTrait>> {
self.connections
.write()
.unwrap_or_else(|e| e.into_inner())
.push(peer_id.clone());
Ok(Box::new(MockConnection {
peer_id: peer_id.clone(),
connected_at: std::time::Instant::now(),
}))
}
async fn disconnect(&self, peer_id: &NodeId) -> Result<()> {
self.connections
.write()
.unwrap_or_else(|e| e.into_inner())
.retain(|id| id != peer_id);
Ok(())
}
fn get_connection(&self, peer_id: &NodeId) -> Option<Box<dyn MeshConnectionTrait>> {
if self.has_connection(peer_id) {
Some(Box::new(MockConnection {
peer_id: peer_id.clone(),
connected_at: std::time::Instant::now(),
}))
} else {
None
}
}
fn peer_count(&self) -> usize {
self.connections
.read()
.unwrap_or_else(|e| e.into_inner())
.len()
}
fn connected_peers(&self) -> Vec<NodeId> {
self.connections
.read()
.unwrap_or_else(|e| e.into_inner())
.clone()
}
fn subscribe_peer_events(&self) -> crate::transport::PeerEventReceiver {
let (_tx, rx) = tokio::sync::mpsc::channel(256);
rx
}
}
#[test]
fn test_node_id_api() {
let node_id1 = NodeId::new("test-node".to_string());
let node_id2 = NodeId::new("test-node".to_string());
let node_id3 = NodeId::new("other-node".to_string());
assert_eq!(node_id1, node_id2);
assert_ne!(node_id1, node_id3);
}
#[test]
fn test_mock_transport_creation() {
let transport = MockTransport::new();
assert!(!transport.is_started());
assert!(!transport.is_stopped());
assert_eq!(transport.peer_count(), 0);
}
#[test]
fn test_calculate_backoff_first_attempt() {
let initial = Duration::from_secs(1);
let max = Duration::from_secs(60);
let multiplier = 2.0;
let backoff = calculate_backoff(initial, max, multiplier, 0);
assert_eq!(backoff, initial);
}
#[test]
fn test_calculate_backoff_exponential_growth() {
let initial = Duration::from_secs(1);
let max = Duration::from_secs(60);
let multiplier = 2.0;
let backoff1 = calculate_backoff(initial, max, multiplier, 1);
assert_eq!(backoff1, Duration::from_secs(2));
let backoff2 = calculate_backoff(initial, max, multiplier, 2);
assert_eq!(backoff2, Duration::from_secs(4));
let backoff3 = calculate_backoff(initial, max, multiplier, 3);
assert_eq!(backoff3, Duration::from_secs(8));
}
#[test]
fn test_calculate_backoff_max_cap() {
let initial = Duration::from_secs(1);
let max = Duration::from_secs(10);
let multiplier = 2.0;
let backoff = calculate_backoff(initial, max, multiplier, 10);
assert_eq!(backoff, max);
}
#[test]
fn test_calculate_backoff_custom_multiplier() {
let initial = Duration::from_secs(1);
let max = Duration::from_secs(100);
let multiplier = 3.0;
let backoff1 = calculate_backoff(initial, max, multiplier, 1);
assert_eq!(backoff1, Duration::from_secs(3));
let backoff2 = calculate_backoff(initial, max, multiplier, 2);
assert_eq!(backoff2, Duration::from_secs(9));
}
#[test]
fn test_topology_config_defaults() {
let config = TopologyConfig::default();
assert_eq!(config.max_telemetry_buffer_size, 100);
assert_eq!(config.max_retries, 3);
assert_eq!(config.initial_backoff, Duration::from_secs(1));
assert_eq!(config.max_backoff, Duration::from_secs(60));
assert_eq!(config.backoff_multiplier, 2.0);
assert_eq!(config.max_lateral_connections, Some(10));
}
fn make_test_manager() -> TopologyManager {
make_test_manager_with_config(TopologyConfig::default())
}
fn make_test_manager_with_config(config: TopologyConfig) -> TopologyManager {
use crate::beacon::{BeaconObserver, GeoPosition, HierarchyLevel, MockBeaconStorage};
let storage = Arc::new(MockBeaconStorage::new());
let observer = Arc::new(BeaconObserver::new(storage, "9q8yy".to_string()));
let builder = TopologyBuilder::new(
config,
"test-node".to_string(),
GeoPosition::new(37.7749, -122.4194),
HierarchyLevel::Squad,
None,
observer,
);
let transport: Arc<dyn MeshTransport> = Arc::new(MockTransport::new());
TopologyManager::new(builder, transport)
}
#[test]
fn test_new_has_no_selected_peer() {
let mgr = make_test_manager();
assert!(mgr.get_selected_peer_id().is_none());
}
#[test]
fn test_new_has_empty_lateral_connections() {
let mgr = make_test_manager();
assert!(mgr.get_lateral_peer_ids().is_empty());
assert_eq!(mgr.lateral_peer_count(), 0);
}
#[test]
fn test_new_has_empty_telemetry_buffer() {
let mgr = make_test_manager();
assert_eq!(mgr.telemetry_buffer_size(), 0);
}
#[test]
fn test_new_has_no_peer_connection() {
let mgr = make_test_manager();
assert!(mgr
.peer_connection
.read()
.unwrap_or_else(|e| e.into_inner())
.is_none());
}
#[test]
fn test_new_has_no_retry_state() {
let mgr = make_test_manager();
assert!(mgr
.peer_retry_state
.read()
.unwrap_or_else(|e| e.into_inner())
.is_none());
assert!(mgr
.lateral_retry_state
.read()
.unwrap_or_else(|e| e.into_inner())
.is_empty());
}
#[test]
fn test_new_has_no_task_handle() {
let mgr = make_test_manager();
assert!(mgr
.task_handle
.read()
.unwrap_or_else(|e| e.into_inner())
.is_none());
}
#[test]
fn test_get_selected_peer_id_returns_none_initially() {
let mgr = make_test_manager();
assert!(mgr.get_selected_peer_id().is_none());
}
#[test]
fn test_get_selected_peer_id_returns_value_after_manual_set() {
let mgr = make_test_manager();
let node = NodeId::new("peer-alpha".to_string());
*mgr.selected_peer_id
.write()
.unwrap_or_else(|e| e.into_inner()) = Some(node.clone());
assert_eq!(mgr.get_selected_peer_id(), Some(node));
}
#[test]
fn test_is_connected_to_peer_false_initially() {
let mgr = make_test_manager();
let node = NodeId::new("peer-alpha".to_string());
assert!(!mgr.is_connected_to_peer(&node));
}
#[test]
fn test_is_connected_to_peer_true_when_selected() {
let mgr = make_test_manager();
let node = NodeId::new("peer-alpha".to_string());
*mgr.selected_peer_id
.write()
.unwrap_or_else(|e| e.into_inner()) = Some(node.clone());
assert!(mgr.is_connected_to_peer(&node));
}
#[test]
fn test_is_connected_to_peer_false_for_different_node() {
let mgr = make_test_manager();
let connected = NodeId::new("peer-alpha".to_string());
let other = NodeId::new("peer-beta".to_string());
*mgr.selected_peer_id
.write()
.unwrap_or_else(|e| e.into_inner()) = Some(connected);
assert!(!mgr.is_connected_to_peer(&other));
}
#[test]
fn test_get_lateral_peer_ids_empty_initially() {
let mgr = make_test_manager();
assert!(mgr.get_lateral_peer_ids().is_empty());
}
#[test]
fn test_lateral_peer_count_zero_initially() {
let mgr = make_test_manager();
assert_eq!(mgr.lateral_peer_count(), 0);
}
#[test]
fn test_lateral_peers_after_manual_insertion() {
let mgr = make_test_manager();
let conn = Box::new(MockConnection {
peer_id: NodeId::new("lat-1".to_string()),
connected_at: std::time::Instant::now(),
});
mgr.lateral_connections
.write()
.unwrap()
.insert("lat-1".to_string(), conn);
assert_eq!(mgr.lateral_peer_count(), 1);
assert_eq!(mgr.get_lateral_peer_ids(), vec!["lat-1".to_string()]);
}
#[test]
fn test_multiple_lateral_peers() {
let mgr = make_test_manager();
for name in &["lat-a", "lat-b", "lat-c"] {
let conn = Box::new(MockConnection {
peer_id: NodeId::new(name.to_string()),
connected_at: std::time::Instant::now(),
});
mgr.lateral_connections
.write()
.unwrap()
.insert(name.to_string(), conn);
}
assert_eq!(mgr.lateral_peer_count(), 3);
let mut ids = mgr.get_lateral_peer_ids();
ids.sort();
assert_eq!(ids, vec!["lat-a", "lat-b", "lat-c"]);
}
#[test]
fn test_send_telemetry_buffers_when_no_parent() {
let mgr = make_test_manager();
let packet = DataPacket::telemetry("node-1", vec![1, 2, 3]);
let result = mgr.send_telemetry(packet);
assert_eq!(result, Ok(false)); assert_eq!(mgr.telemetry_buffer_size(), 1);
}
#[test]
fn test_send_telemetry_multiple_buffers() {
let mgr = make_test_manager();
for i in 0..5 {
let packet = DataPacket::telemetry("node-1", vec![i]);
assert_eq!(mgr.send_telemetry(packet), Ok(false));
}
assert_eq!(mgr.telemetry_buffer_size(), 5);
}
#[test]
fn test_send_telemetry_drops_oldest_when_buffer_full() {
let config = TopologyConfig {
max_telemetry_buffer_size: 3,
..Default::default()
};
let mgr = make_test_manager_with_config(config);
for i in 0..3 {
let packet = DataPacket::telemetry("node-1", vec![i]);
assert_eq!(mgr.send_telemetry(packet), Ok(false));
}
assert_eq!(mgr.telemetry_buffer_size(), 3);
let packet = DataPacket::telemetry("node-1", vec![99]);
assert_eq!(mgr.send_telemetry(packet), Ok(false));
assert_eq!(mgr.telemetry_buffer_size(), 3);
let buffer = mgr
.telemetry_buffer
.read()
.unwrap_or_else(|e| e.into_inner());
assert_eq!(buffer[0].payload, vec![1]);
assert_eq!(buffer[1].payload, vec![2]);
assert_eq!(buffer[2].payload, vec![99]);
}
#[test]
fn test_send_telemetry_error_when_buffering_disabled() {
let config = TopologyConfig {
max_telemetry_buffer_size: 0,
..Default::default()
};
let mgr = make_test_manager_with_config(config);
let packet = DataPacket::telemetry("node-1", vec![1]);
let result = mgr.send_telemetry(packet);
assert!(result.is_err());
assert!(result
.unwrap_err()
.contains("Telemetry buffering is disabled"));
}
#[test]
fn test_send_telemetry_buffers_even_when_parent_present() {
let mgr = make_test_manager();
*mgr.selected_peer_id
.write()
.unwrap_or_else(|e| e.into_inner()) = Some(NodeId::new("parent-node".to_string()));
let packet = DataPacket::telemetry("node-1", vec![1, 2, 3]);
let result = mgr.send_telemetry(packet);
assert_eq!(result, Ok(false)); assert_eq!(mgr.telemetry_buffer_size(), 1);
}
#[test]
fn test_send_telemetry_buffers_multiple_when_parent_present() {
let mgr = make_test_manager();
*mgr.selected_peer_id
.write()
.unwrap_or_else(|e| e.into_inner()) = Some(NodeId::new("parent-node".to_string()));
for i in 0..10 {
let packet = DataPacket::telemetry("node-1", vec![i]);
assert_eq!(mgr.send_telemetry(packet), Ok(false));
}
assert_eq!(mgr.telemetry_buffer_size(), 10);
}
#[test]
fn test_telemetry_buffer_size_zero_initially() {
let mgr = make_test_manager();
assert_eq!(mgr.telemetry_buffer_size(), 0);
}
#[test]
fn test_telemetry_buffer_size_tracks_manual_pushes() {
let mgr = make_test_manager();
mgr.telemetry_buffer
.write()
.unwrap()
.push(DataPacket::telemetry("n", vec![]));
assert_eq!(mgr.telemetry_buffer_size(), 1);
mgr.telemetry_buffer
.write()
.unwrap()
.push(DataPacket::telemetry("n", vec![]));
assert_eq!(mgr.telemetry_buffer_size(), 2);
}
#[test]
fn test_drain_buffer_returns_all_packets() {
let buffer: Arc<RwLock<Vec<DataPacket>>> = Arc::new(RwLock::new(Vec::new()));
for i in 0..5 {
buffer
.write()
.unwrap()
.push(DataPacket::telemetry("n", vec![i]));
}
assert_eq!(buffer.read().unwrap_or_else(|e| e.into_inner()).len(), 5);
let packets = TopologyManager::drain_buffer(&buffer);
assert_eq!(packets.len(), 5);
assert_eq!(buffer.read().unwrap_or_else(|e| e.into_inner()).len(), 0);
}
#[test]
fn test_drain_buffer_empty_returns_empty_vec() {
let buffer: Arc<RwLock<Vec<DataPacket>>> = Arc::new(RwLock::new(Vec::new()));
let packets = TopologyManager::drain_buffer(&buffer);
assert!(packets.is_empty());
assert_eq!(buffer.read().unwrap_or_else(|e| e.into_inner()).len(), 0);
}
#[tokio::test]
async fn test_flush_buffer_sends_via_transport() {
let buffer: Arc<RwLock<Vec<DataPacket>>> = Arc::new(RwLock::new(Vec::new()));
for i in 0..3 {
buffer
.write()
.unwrap()
.push(DataPacket::telemetry("n", vec![i]));
}
let transport: Arc<dyn MeshTransport> = Arc::new(MockTransport::new());
let peer = NodeId::new("parent".into());
TopologyManager::flush_buffer(&buffer, &transport, &peer).await;
assert_eq!(buffer.read().unwrap_or_else(|e| e.into_inner()).len(), 0);
}
#[tokio::test]
async fn test_flush_buffer_noop_on_empty() {
let buffer: Arc<RwLock<Vec<DataPacket>>> = Arc::new(RwLock::new(Vec::new()));
let transport: Arc<dyn MeshTransport> = Arc::new(MockTransport::new());
let peer = NodeId::new("parent".into());
TopologyManager::flush_buffer(&buffer, &transport, &peer).await;
assert_eq!(buffer.read().unwrap_or_else(|e| e.into_inner()).len(), 0);
}
#[test]
fn test_builder_accessor_returns_correct_config() {
let config = TopologyConfig {
max_retries: 42,
..Default::default()
};
let mgr = make_test_manager_with_config(config);
assert_eq!(mgr.builder().config().max_retries, 42);
}
#[tokio::test]
async fn test_start_calls_transport_start() {
let mgr = make_test_manager();
let result = mgr.start().await;
assert!(result.is_ok());
assert!(mgr
.task_handle
.read()
.unwrap_or_else(|e| e.into_inner())
.is_some());
}
#[tokio::test]
async fn test_stop_after_start() {
let mgr = make_test_manager();
mgr.start().await.unwrap();
let result = mgr.stop().await;
assert!(result.is_ok());
assert!(mgr
.task_handle
.read()
.unwrap_or_else(|e| e.into_inner())
.is_none());
assert!(mgr.get_selected_peer_id().is_none());
}
#[tokio::test]
async fn test_stop_without_start() {
let mgr = make_test_manager();
let result = mgr.stop().await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_stop_disconnects_selected_peer() {
let mgr = make_test_manager();
mgr.start().await.unwrap();
let node = NodeId::new("peer-1".to_string());
*mgr.selected_peer_id
.write()
.unwrap_or_else(|e| e.into_inner()) = Some(node);
let result = mgr.stop().await;
assert!(result.is_ok());
assert!(mgr.get_selected_peer_id().is_none());
}
#[tokio::test]
async fn test_stop_clears_lateral_connections() {
let mgr = make_test_manager();
mgr.start().await.unwrap();
let conn = Box::new(MockConnection {
peer_id: NodeId::new("lat-1".to_string()),
connected_at: std::time::Instant::now(),
});
mgr.lateral_connections
.write()
.unwrap()
.insert("lat-1".to_string(), conn);
let result = mgr.stop().await;
assert!(result.is_ok());
assert_eq!(mgr.lateral_peer_count(), 0);
}
#[tokio::test]
async fn test_event_loop_peer_selected_success() {
use crate::beacon::{BeaconObserver, GeoPosition, HierarchyLevel, MockBeaconStorage};
let storage = Arc::new(MockBeaconStorage::new());
let observer = Arc::new(BeaconObserver::new(storage, "9q8yy".to_string()));
let config = TopologyConfig::default();
let builder = TopologyBuilder::new(
config.clone(),
"test-node".to_string(),
GeoPosition::new(37.7749, -122.4194),
HierarchyLevel::Squad,
None,
observer,
);
let transport: Arc<dyn MeshTransport> = Arc::new(MockTransport::new());
let (tx, rx) = mpsc::unbounded_channel();
let peer_connection: Arc<RwLock<Option<Box<dyn MeshConnectionTrait>>>> =
Arc::new(RwLock::new(None));
let selected_peer_id: Arc<RwLock<Option<NodeId>>> = Arc::new(RwLock::new(None));
let lateral_connections: Arc<RwLock<HashMap<String, Box<dyn MeshConnectionTrait>>>> =
Arc::new(RwLock::new(HashMap::new()));
let peer_retry_state: Arc<RwLock<Option<RetryState>>> = Arc::new(RwLock::new(None));
let lateral_retry_state: Arc<RwLock<HashMap<String, RetryState>>> =
Arc::new(RwLock::new(HashMap::new()));
let telemetry_buffer: Arc<RwLock<Vec<DataPacket>>> = Arc::new(RwLock::new(Vec::new()));
let sel_clone = selected_peer_id.clone();
let pc_clone = peer_connection.clone();
let handle = tokio::spawn(TopologyManager::event_loop(
rx,
transport,
peer_connection,
selected_peer_id,
lateral_connections,
peer_retry_state,
lateral_retry_state,
telemetry_buffer,
builder,
config,
));
let beacon = crate::beacon::GeographicBeacon::new(
"selected-peer".to_string(),
GeoPosition::new(37.78, -122.42),
HierarchyLevel::Platoon,
);
tx.send(TopologyEvent::PeerSelected {
selected_peer_id: "selected-peer".to_string(),
peer_beacon: beacon,
})
.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
let peer = sel_clone.read().unwrap_or_else(|e| e.into_inner()).clone();
assert!(peer.is_some());
assert_eq!(peer.unwrap().as_str(), "selected-peer");
assert!(pc_clone.read().unwrap_or_else(|e| e.into_inner()).is_some());
handle.abort();
}
#[tokio::test]
async fn test_event_loop_peer_lost() {
use crate::beacon::{BeaconObserver, GeoPosition, HierarchyLevel, MockBeaconStorage};
let storage = Arc::new(MockBeaconStorage::new());
let observer = Arc::new(BeaconObserver::new(storage, "9q8yy".to_string()));
let config = TopologyConfig::default();
let builder = TopologyBuilder::new(
config.clone(),
"test-node".to_string(),
GeoPosition::new(37.7749, -122.4194),
HierarchyLevel::Squad,
None,
observer,
);
let transport: Arc<dyn MeshTransport> = Arc::new(MockTransport::new());
let (tx, rx) = mpsc::unbounded_channel();
let peer_connection: Arc<RwLock<Option<Box<dyn MeshConnectionTrait>>>> =
Arc::new(RwLock::new(None));
let selected_peer_id: Arc<RwLock<Option<NodeId>>> =
Arc::new(RwLock::new(Some(NodeId::new("old-peer".to_string()))));
let lateral_connections: Arc<RwLock<HashMap<String, Box<dyn MeshConnectionTrait>>>> =
Arc::new(RwLock::new(HashMap::new()));
let peer_retry_state: Arc<RwLock<Option<RetryState>>> = Arc::new(RwLock::new(None));
let lateral_retry_state: Arc<RwLock<HashMap<String, RetryState>>> =
Arc::new(RwLock::new(HashMap::new()));
let telemetry_buffer: Arc<RwLock<Vec<DataPacket>>> = Arc::new(RwLock::new(Vec::new()));
let sel_clone = selected_peer_id.clone();
let pc_clone = peer_connection.clone();
let handle = tokio::spawn(TopologyManager::event_loop(
rx,
transport,
peer_connection,
selected_peer_id,
lateral_connections,
peer_retry_state,
lateral_retry_state,
telemetry_buffer,
builder,
config,
));
tx.send(TopologyEvent::PeerLost {
lost_peer_id: "old-peer".to_string(),
})
.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
assert!(sel_clone
.read()
.unwrap_or_else(|e| e.into_inner())
.is_none());
assert!(pc_clone.read().unwrap_or_else(|e| e.into_inner()).is_none());
handle.abort();
}
#[tokio::test]
async fn test_event_loop_peer_changed() {
use crate::beacon::{BeaconObserver, GeoPosition, HierarchyLevel, MockBeaconStorage};
let storage = Arc::new(MockBeaconStorage::new());
let observer = Arc::new(BeaconObserver::new(storage, "9q8yy".to_string()));
let config = TopologyConfig::default();
let builder = TopologyBuilder::new(
config.clone(),
"test-node".to_string(),
GeoPosition::new(37.7749, -122.4194),
HierarchyLevel::Squad,
None,
observer,
);
let transport: Arc<dyn MeshTransport> = Arc::new(MockTransport::new());
let (tx, rx) = mpsc::unbounded_channel();
let peer_connection: Arc<RwLock<Option<Box<dyn MeshConnectionTrait>>>> =
Arc::new(RwLock::new(None));
let selected_peer_id: Arc<RwLock<Option<NodeId>>> =
Arc::new(RwLock::new(Some(NodeId::new("old-peer".to_string()))));
let lateral_connections: Arc<RwLock<HashMap<String, Box<dyn MeshConnectionTrait>>>> =
Arc::new(RwLock::new(HashMap::new()));
let peer_retry_state: Arc<RwLock<Option<RetryState>>> = Arc::new(RwLock::new(None));
let lateral_retry_state: Arc<RwLock<HashMap<String, RetryState>>> =
Arc::new(RwLock::new(HashMap::new()));
let telemetry_buffer: Arc<RwLock<Vec<DataPacket>>> = Arc::new(RwLock::new(Vec::new()));
let sel_clone = selected_peer_id.clone();
let handle = tokio::spawn(TopologyManager::event_loop(
rx,
transport,
peer_connection,
selected_peer_id,
lateral_connections,
peer_retry_state,
lateral_retry_state,
telemetry_buffer,
builder,
config,
));
let beacon = crate::beacon::GeographicBeacon::new(
"new-peer".to_string(),
GeoPosition::new(37.78, -122.42),
HierarchyLevel::Platoon,
);
tx.send(TopologyEvent::PeerChanged {
old_peer_id: "old-peer".to_string(),
new_peer_id: "new-peer".to_string(),
new_peer_beacon: beacon,
})
.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
let peer = sel_clone.read().unwrap_or_else(|e| e.into_inner()).clone();
assert!(peer.is_some());
assert_eq!(peer.unwrap().as_str(), "new-peer");
handle.abort();
}
#[tokio::test]
async fn test_event_loop_lateral_peer_discovered() {
use crate::beacon::{BeaconObserver, GeoPosition, HierarchyLevel, MockBeaconStorage};
let storage = Arc::new(MockBeaconStorage::new());
let observer = Arc::new(BeaconObserver::new(storage, "9q8yy".to_string()));
let config = TopologyConfig::default();
let builder = TopologyBuilder::new(
config.clone(),
"test-node".to_string(),
GeoPosition::new(37.7749, -122.4194),
HierarchyLevel::Squad,
None,
observer,
);
let transport: Arc<dyn MeshTransport> = Arc::new(MockTransport::new());
let (tx, rx) = mpsc::unbounded_channel();
let peer_connection: Arc<RwLock<Option<Box<dyn MeshConnectionTrait>>>> =
Arc::new(RwLock::new(None));
let selected_peer_id: Arc<RwLock<Option<NodeId>>> = Arc::new(RwLock::new(None));
let lateral_connections: Arc<RwLock<HashMap<String, Box<dyn MeshConnectionTrait>>>> =
Arc::new(RwLock::new(HashMap::new()));
let peer_retry_state: Arc<RwLock<Option<RetryState>>> = Arc::new(RwLock::new(None));
let lateral_retry_state: Arc<RwLock<HashMap<String, RetryState>>> =
Arc::new(RwLock::new(HashMap::new()));
let telemetry_buffer: Arc<RwLock<Vec<DataPacket>>> = Arc::new(RwLock::new(Vec::new()));
let lat_clone = lateral_connections.clone();
let handle = tokio::spawn(TopologyManager::event_loop(
rx,
transport,
peer_connection,
selected_peer_id,
lateral_connections,
peer_retry_state,
lateral_retry_state,
telemetry_buffer,
builder,
config,
));
let beacon = crate::beacon::GeographicBeacon::new(
"lateral-1".to_string(),
GeoPosition::new(37.78, -122.42),
HierarchyLevel::Squad,
);
tx.send(TopologyEvent::LateralPeerDiscovered {
peer_id: "lateral-1".to_string(),
peer_beacon: beacon,
})
.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
assert!(lat_clone
.read()
.unwrap_or_else(|e| e.into_inner())
.contains_key("lateral-1"));
handle.abort();
}
#[tokio::test]
async fn test_event_loop_lateral_peer_lost() {
use crate::beacon::{BeaconObserver, GeoPosition, HierarchyLevel, MockBeaconStorage};
let storage = Arc::new(MockBeaconStorage::new());
let observer = Arc::new(BeaconObserver::new(storage, "9q8yy".to_string()));
let config = TopologyConfig::default();
let builder = TopologyBuilder::new(
config.clone(),
"test-node".to_string(),
GeoPosition::new(37.7749, -122.4194),
HierarchyLevel::Squad,
None,
observer,
);
let transport: Arc<dyn MeshTransport> = Arc::new(MockTransport::new());
let (tx, rx) = mpsc::unbounded_channel();
let peer_connection: Arc<RwLock<Option<Box<dyn MeshConnectionTrait>>>> =
Arc::new(RwLock::new(None));
let selected_peer_id: Arc<RwLock<Option<NodeId>>> = Arc::new(RwLock::new(None));
let lateral_connections: Arc<RwLock<HashMap<String, Box<dyn MeshConnectionTrait>>>> =
Arc::new(RwLock::new(HashMap::new()));
let peer_retry_state: Arc<RwLock<Option<RetryState>>> = Arc::new(RwLock::new(None));
let lateral_retry_state: Arc<RwLock<HashMap<String, RetryState>>> =
Arc::new(RwLock::new(HashMap::new()));
let telemetry_buffer: Arc<RwLock<Vec<DataPacket>>> = Arc::new(RwLock::new(Vec::new()));
let conn = Box::new(MockConnection {
peer_id: NodeId::new("lateral-1".to_string()),
connected_at: std::time::Instant::now(),
});
lateral_connections
.write()
.unwrap()
.insert("lateral-1".to_string(), conn);
let lat_clone = lateral_connections.clone();
let handle = tokio::spawn(TopologyManager::event_loop(
rx,
transport,
peer_connection,
selected_peer_id,
lateral_connections,
peer_retry_state,
lateral_retry_state,
telemetry_buffer,
builder,
config,
));
tx.send(TopologyEvent::LateralPeerLost {
peer_id: "lateral-1".to_string(),
})
.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
assert!(!lat_clone
.read()
.unwrap_or_else(|e| e.into_inner())
.contains_key("lateral-1"));
handle.abort();
}
#[tokio::test]
async fn test_event_loop_flushes_buffer_on_peer_selected() {
use crate::beacon::{BeaconObserver, GeoPosition, HierarchyLevel, MockBeaconStorage};
let storage = Arc::new(MockBeaconStorage::new());
let observer = Arc::new(BeaconObserver::new(storage, "9q8yy".to_string()));
let config = TopologyConfig::default();
let builder = TopologyBuilder::new(
config.clone(),
"test-node".to_string(),
GeoPosition::new(37.7749, -122.4194),
HierarchyLevel::Squad,
None,
observer,
);
let transport: Arc<dyn MeshTransport> = Arc::new(MockTransport::new());
let (tx, rx) = mpsc::unbounded_channel();
let peer_connection: Arc<RwLock<Option<Box<dyn MeshConnectionTrait>>>> =
Arc::new(RwLock::new(None));
let selected_peer_id: Arc<RwLock<Option<NodeId>>> = Arc::new(RwLock::new(None));
let lateral_connections: Arc<RwLock<HashMap<String, Box<dyn MeshConnectionTrait>>>> =
Arc::new(RwLock::new(HashMap::new()));
let peer_retry_state: Arc<RwLock<Option<RetryState>>> = Arc::new(RwLock::new(None));
let lateral_retry_state: Arc<RwLock<HashMap<String, RetryState>>> =
Arc::new(RwLock::new(HashMap::new()));
let telemetry_buffer: Arc<RwLock<Vec<DataPacket>>> = Arc::new(RwLock::new(Vec::new()));
for i in 0..3 {
telemetry_buffer
.write()
.unwrap()
.push(DataPacket::telemetry("node-1", vec![i]));
}
assert_eq!(
telemetry_buffer
.read()
.unwrap_or_else(|e| e.into_inner())
.len(),
3
);
let buf_clone = telemetry_buffer.clone();
let handle = tokio::spawn(TopologyManager::event_loop(
rx,
transport,
peer_connection,
selected_peer_id,
lateral_connections,
peer_retry_state,
lateral_retry_state,
telemetry_buffer,
builder,
config,
));
let beacon = crate::beacon::GeographicBeacon::new(
"new-parent".to_string(),
GeoPosition::new(37.78, -122.42),
HierarchyLevel::Platoon,
);
tx.send(TopologyEvent::PeerSelected {
selected_peer_id: "new-parent".to_string(),
peer_beacon: beacon,
})
.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
assert_eq!(buf_clone.read().unwrap_or_else(|e| e.into_inner()).len(), 0);
handle.abort();
}
#[tokio::test]
async fn test_event_loop_role_changed_handled() {
use crate::beacon::{BeaconObserver, GeoPosition, HierarchyLevel, MockBeaconStorage};
use crate::hierarchy::NodeRole;
let storage = Arc::new(MockBeaconStorage::new());
let observer = Arc::new(BeaconObserver::new(storage, "9q8yy".to_string()));
let config = TopologyConfig::default();
let builder = TopologyBuilder::new(
config.clone(),
"test-node".to_string(),
GeoPosition::new(37.7749, -122.4194),
HierarchyLevel::Squad,
None,
observer,
);
let transport: Arc<dyn MeshTransport> = Arc::new(MockTransport::new());
let (tx, rx) = mpsc::unbounded_channel();
let peer_connection: Arc<RwLock<Option<Box<dyn MeshConnectionTrait>>>> =
Arc::new(RwLock::new(None));
let selected_peer_id: Arc<RwLock<Option<NodeId>>> = Arc::new(RwLock::new(None));
let lateral_connections: Arc<RwLock<HashMap<String, Box<dyn MeshConnectionTrait>>>> =
Arc::new(RwLock::new(HashMap::new()));
let peer_retry_state: Arc<RwLock<Option<RetryState>>> = Arc::new(RwLock::new(None));
let lateral_retry_state: Arc<RwLock<HashMap<String, RetryState>>> =
Arc::new(RwLock::new(HashMap::new()));
let telemetry_buffer: Arc<RwLock<Vec<DataPacket>>> = Arc::new(RwLock::new(Vec::new()));
let handle = tokio::spawn(TopologyManager::event_loop(
rx,
transport,
peer_connection,
selected_peer_id,
lateral_connections,
peer_retry_state,
lateral_retry_state,
telemetry_buffer,
builder,
config,
));
tx.send(TopologyEvent::RoleChanged {
old_role: NodeRole::Member,
new_role: NodeRole::Leader,
})
.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
handle.abort();
}
#[tokio::test]
async fn test_event_loop_level_changed_handled() {
use crate::beacon::{BeaconObserver, GeoPosition, HierarchyLevel, MockBeaconStorage};
let storage = Arc::new(MockBeaconStorage::new());
let observer = Arc::new(BeaconObserver::new(storage, "9q8yy".to_string()));
let config = TopologyConfig::default();
let builder = TopologyBuilder::new(
config.clone(),
"test-node".to_string(),
GeoPosition::new(37.7749, -122.4194),
HierarchyLevel::Squad,
None,
observer,
);
let transport: Arc<dyn MeshTransport> = Arc::new(MockTransport::new());
let (tx, rx) = mpsc::unbounded_channel();
let peer_connection: Arc<RwLock<Option<Box<dyn MeshConnectionTrait>>>> =
Arc::new(RwLock::new(None));
let selected_peer_id: Arc<RwLock<Option<NodeId>>> = Arc::new(RwLock::new(None));
let lateral_connections: Arc<RwLock<HashMap<String, Box<dyn MeshConnectionTrait>>>> =
Arc::new(RwLock::new(HashMap::new()));
let peer_retry_state: Arc<RwLock<Option<RetryState>>> = Arc::new(RwLock::new(None));
let lateral_retry_state: Arc<RwLock<HashMap<String, RetryState>>> =
Arc::new(RwLock::new(HashMap::new()));
let telemetry_buffer: Arc<RwLock<Vec<DataPacket>>> = Arc::new(RwLock::new(Vec::new()));
let handle = tokio::spawn(TopologyManager::event_loop(
rx,
transport,
peer_connection,
selected_peer_id,
lateral_connections,
peer_retry_state,
lateral_retry_state,
telemetry_buffer,
builder,
config,
));
tx.send(TopologyEvent::LevelChanged {
old_level: HierarchyLevel::Squad,
new_level: HierarchyLevel::Platoon,
})
.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
handle.abort();
}
#[tokio::test]
async fn test_event_loop_peer_added_handled() {
use crate::beacon::{BeaconObserver, GeoPosition, HierarchyLevel, MockBeaconStorage};
let storage = Arc::new(MockBeaconStorage::new());
let observer = Arc::new(BeaconObserver::new(storage, "9q8yy".to_string()));
let config = TopologyConfig::default();
let builder = TopologyBuilder::new(
config.clone(),
"test-node".to_string(),
GeoPosition::new(37.7749, -122.4194),
HierarchyLevel::Squad,
None,
observer,
);
let transport: Arc<dyn MeshTransport> = Arc::new(MockTransport::new());
let (tx, rx) = mpsc::unbounded_channel();
let peer_connection: Arc<RwLock<Option<Box<dyn MeshConnectionTrait>>>> =
Arc::new(RwLock::new(None));
let selected_peer_id: Arc<RwLock<Option<NodeId>>> = Arc::new(RwLock::new(None));
let lateral_connections: Arc<RwLock<HashMap<String, Box<dyn MeshConnectionTrait>>>> =
Arc::new(RwLock::new(HashMap::new()));
let peer_retry_state: Arc<RwLock<Option<RetryState>>> = Arc::new(RwLock::new(None));
let lateral_retry_state: Arc<RwLock<HashMap<String, RetryState>>> =
Arc::new(RwLock::new(HashMap::new()));
let telemetry_buffer: Arc<RwLock<Vec<DataPacket>>> = Arc::new(RwLock::new(Vec::new()));
let handle = tokio::spawn(TopologyManager::event_loop(
rx,
transport,
peer_connection,
selected_peer_id,
lateral_connections,
peer_retry_state,
lateral_retry_state,
telemetry_buffer,
builder,
config,
));
tx.send(TopologyEvent::PeerAdded {
linked_peer_id: "linked-1".to_string(),
})
.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
handle.abort();
}
#[tokio::test]
async fn test_event_loop_peer_removed_handled() {
use crate::beacon::{BeaconObserver, GeoPosition, HierarchyLevel, MockBeaconStorage};
let storage = Arc::new(MockBeaconStorage::new());
let observer = Arc::new(BeaconObserver::new(storage, "9q8yy".to_string()));
let config = TopologyConfig::default();
let builder = TopologyBuilder::new(
config.clone(),
"test-node".to_string(),
GeoPosition::new(37.7749, -122.4194),
HierarchyLevel::Squad,
None,
observer,
);
let transport: Arc<dyn MeshTransport> = Arc::new(MockTransport::new());
let (tx, rx) = mpsc::unbounded_channel();
let peer_connection: Arc<RwLock<Option<Box<dyn MeshConnectionTrait>>>> =
Arc::new(RwLock::new(None));
let selected_peer_id: Arc<RwLock<Option<NodeId>>> = Arc::new(RwLock::new(None));
let lateral_connections: Arc<RwLock<HashMap<String, Box<dyn MeshConnectionTrait>>>> =
Arc::new(RwLock::new(HashMap::new()));
let peer_retry_state: Arc<RwLock<Option<RetryState>>> = Arc::new(RwLock::new(None));
let lateral_retry_state: Arc<RwLock<HashMap<String, RetryState>>> =
Arc::new(RwLock::new(HashMap::new()));
let telemetry_buffer: Arc<RwLock<Vec<DataPacket>>> = Arc::new(RwLock::new(Vec::new()));
let handle = tokio::spawn(TopologyManager::event_loop(
rx,
transport,
peer_connection,
selected_peer_id,
lateral_connections,
peer_retry_state,
lateral_retry_state,
telemetry_buffer,
builder,
config,
));
tx.send(TopologyEvent::PeerRemoved {
linked_peer_id: "gone-peer".to_string(),
})
.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
handle.abort();
}
#[tokio::test]
async fn test_send_telemetry_buffer_then_parent_arrives() {
let mgr = make_test_manager();
for i in 0..3 {
mgr.send_telemetry(DataPacket::telemetry("n", vec![i]))
.unwrap();
}
assert_eq!(mgr.telemetry_buffer_size(), 3);
let peer = NodeId::new("parent".into());
TopologyManager::flush_buffer(&mgr.telemetry_buffer, &mgr.transport, &peer).await;
assert_eq!(mgr.telemetry_buffer_size(), 0);
*mgr.selected_peer_id
.write()
.unwrap_or_else(|e| e.into_inner()) = Some(NodeId::new("parent".to_string()));
let result = mgr.send_telemetry(DataPacket::telemetry("n", vec![42]));
assert_eq!(result, Ok(false)); assert_eq!(mgr.telemetry_buffer_size(), 1);
}
#[test]
fn test_retry_state_creation() {
let state = RetryState {
attempts: 0,
next_retry: Instant::now() + Duration::from_secs(1),
};
assert_eq!(state.attempts, 0);
assert!(state.next_retry > Instant::now() - Duration::from_millis(100));
}
#[test]
fn test_retry_state_clone() {
let state = RetryState {
attempts: 5,
next_retry: Instant::now(),
};
let cloned = state.clone();
assert_eq!(cloned.attempts, 5);
}
#[tokio::test]
async fn test_mock_transport_start_stop() {
let transport = MockTransport::new();
transport.start().await.unwrap();
assert!(transport.is_started());
transport.stop().await.unwrap();
assert!(transport.is_stopped());
}
#[tokio::test]
async fn test_mock_transport_connect_disconnect() {
let transport = MockTransport::new();
let node = NodeId::new("p1".to_string());
let conn = transport.connect(&node).await.unwrap();
assert_eq!(conn.peer_id(), &node);
assert!(conn.is_alive());
assert!(transport.has_connection(&node));
assert_eq!(transport.peer_count(), 1);
assert_eq!(transport.connected_peers(), vec![node.clone()]);
transport.disconnect(&node).await.unwrap();
assert!(!transport.has_connection(&node));
assert_eq!(transport.peer_count(), 0);
}
#[tokio::test]
async fn test_mock_transport_get_connection() {
let transport = MockTransport::new();
let node = NodeId::new("p1".to_string());
assert!(transport.get_connection(&node).is_none());
transport.connect(&node).await.unwrap();
assert!(transport.get_connection(&node).is_some());
}
#[test]
fn test_mock_transport_is_connected() {
let transport = MockTransport::new();
let node = NodeId::new("p1".to_string());
assert!(!transport.is_connected(&node));
}
#[test]
fn test_mock_transport_subscribe_peer_events() {
let transport = MockTransport::new();
let _rx = transport.subscribe_peer_events();
}
struct FailingTransport {
started: Arc<RwLock<bool>>,
stopped: Arc<RwLock<bool>>,
fail_count: Arc<RwLock<u32>>,
connections: Arc<RwLock<Vec<NodeId>>>,
}
impl FailingTransport {
fn new(fail_n_times: u32) -> Self {
Self {
started: Arc::new(RwLock::new(false)),
stopped: Arc::new(RwLock::new(false)),
fail_count: Arc::new(RwLock::new(fail_n_times)),
connections: Arc::new(RwLock::new(Vec::new())),
}
}
}
#[async_trait::async_trait]
impl MeshTransport for FailingTransport {
async fn start(&self) -> Result<()> {
*self.started.write().unwrap_or_else(|e| e.into_inner()) = true;
Ok(())
}
async fn stop(&self) -> Result<()> {
*self.stopped.write().unwrap_or_else(|e| e.into_inner()) = true;
Ok(())
}
async fn connect(&self, peer_id: &NodeId) -> Result<Box<dyn MeshConnectionTrait>> {
let mut fail_count = self.fail_count.write().unwrap_or_else(|e| e.into_inner());
if *fail_count > 0 {
*fail_count -= 1;
return Err(crate::transport::TransportError::ConnectionFailed(format!(
"Simulated failure connecting to {}",
peer_id
)));
}
self.connections
.write()
.unwrap_or_else(|e| e.into_inner())
.push(peer_id.clone());
Ok(Box::new(MockConnection {
peer_id: peer_id.clone(),
connected_at: std::time::Instant::now(),
}))
}
async fn disconnect(&self, peer_id: &NodeId) -> Result<()> {
self.connections
.write()
.unwrap_or_else(|e| e.into_inner())
.retain(|id| id != peer_id);
Ok(())
}
fn get_connection(&self, _peer_id: &NodeId) -> Option<Box<dyn MeshConnectionTrait>> {
None
}
fn peer_count(&self) -> usize {
self.connections
.read()
.unwrap_or_else(|e| e.into_inner())
.len()
}
fn connected_peers(&self) -> Vec<NodeId> {
self.connections
.read()
.unwrap_or_else(|e| e.into_inner())
.clone()
}
fn subscribe_peer_events(&self) -> crate::transport::PeerEventReceiver {
let (_tx, rx) = tokio::sync::mpsc::channel(256);
rx
}
}
#[tokio::test]
async fn test_event_loop_peer_selected_with_retry() {
use crate::beacon::{BeaconObserver, GeoPosition, HierarchyLevel, MockBeaconStorage};
let storage = Arc::new(MockBeaconStorage::new());
let observer = Arc::new(BeaconObserver::new(storage, "9q8yy".to_string()));
let config = TopologyConfig {
max_retries: 3,
initial_backoff: Duration::from_millis(10),
max_backoff: Duration::from_millis(50),
..Default::default()
};
let builder = TopologyBuilder::new(
config.clone(),
"test-node".to_string(),
GeoPosition::new(37.7749, -122.4194),
HierarchyLevel::Squad,
None,
observer,
);
let transport: Arc<dyn MeshTransport> = Arc::new(FailingTransport::new(2));
let (tx, rx) = mpsc::unbounded_channel();
let peer_connection: Arc<RwLock<Option<Box<dyn MeshConnectionTrait>>>> =
Arc::new(RwLock::new(None));
let selected_peer_id: Arc<RwLock<Option<NodeId>>> = Arc::new(RwLock::new(None));
let lateral_connections: Arc<RwLock<HashMap<String, Box<dyn MeshConnectionTrait>>>> =
Arc::new(RwLock::new(HashMap::new()));
let peer_retry_state: Arc<RwLock<Option<RetryState>>> = Arc::new(RwLock::new(None));
let lateral_retry_state: Arc<RwLock<HashMap<String, RetryState>>> =
Arc::new(RwLock::new(HashMap::new()));
let telemetry_buffer: Arc<RwLock<Vec<DataPacket>>> = Arc::new(RwLock::new(Vec::new()));
let sel_clone = selected_peer_id.clone();
let handle = tokio::spawn(TopologyManager::event_loop(
rx,
transport,
peer_connection,
selected_peer_id,
lateral_connections,
peer_retry_state,
lateral_retry_state,
telemetry_buffer,
builder,
config,
));
let beacon = crate::beacon::GeographicBeacon::new(
"retry-peer".to_string(),
GeoPosition::new(37.78, -122.42),
HierarchyLevel::Platoon,
);
tx.send(TopologyEvent::PeerSelected {
selected_peer_id: "retry-peer".to_string(),
peer_beacon: beacon,
})
.unwrap();
tokio::time::sleep(Duration::from_millis(500)).await;
let peer = sel_clone.read().unwrap_or_else(|e| e.into_inner()).clone();
assert!(peer.is_some(), "Peer should connect after retries");
assert_eq!(peer.unwrap().as_str(), "retry-peer");
handle.abort();
}
#[tokio::test]
async fn test_event_loop_peer_changed_with_retry() {
use crate::beacon::{BeaconObserver, GeoPosition, HierarchyLevel, MockBeaconStorage};
let storage = Arc::new(MockBeaconStorage::new());
let observer = Arc::new(BeaconObserver::new(storage, "9q8yy".to_string()));
let config = TopologyConfig {
max_retries: 3,
initial_backoff: Duration::from_millis(10),
max_backoff: Duration::from_millis(50),
..Default::default()
};
let builder = TopologyBuilder::new(
config.clone(),
"test-node".to_string(),
GeoPosition::new(37.7749, -122.4194),
HierarchyLevel::Squad,
None,
observer,
);
let transport: Arc<dyn MeshTransport> = Arc::new(FailingTransport::new(1));
let (tx, rx) = mpsc::unbounded_channel();
let peer_connection: Arc<RwLock<Option<Box<dyn MeshConnectionTrait>>>> =
Arc::new(RwLock::new(None));
let selected_peer_id: Arc<RwLock<Option<NodeId>>> =
Arc::new(RwLock::new(Some(NodeId::new("old-peer".to_string()))));
let lateral_connections: Arc<RwLock<HashMap<String, Box<dyn MeshConnectionTrait>>>> =
Arc::new(RwLock::new(HashMap::new()));
let peer_retry_state: Arc<RwLock<Option<RetryState>>> = Arc::new(RwLock::new(None));
let lateral_retry_state: Arc<RwLock<HashMap<String, RetryState>>> =
Arc::new(RwLock::new(HashMap::new()));
let telemetry_buffer: Arc<RwLock<Vec<DataPacket>>> = Arc::new(RwLock::new(Vec::new()));
let sel_clone = selected_peer_id.clone();
let handle = tokio::spawn(TopologyManager::event_loop(
rx,
transport,
peer_connection,
selected_peer_id,
lateral_connections,
peer_retry_state,
lateral_retry_state,
telemetry_buffer,
builder,
config,
));
let beacon = crate::beacon::GeographicBeacon::new(
"new-peer".to_string(),
GeoPosition::new(37.78, -122.42),
HierarchyLevel::Platoon,
);
tx.send(TopologyEvent::PeerChanged {
old_peer_id: "old-peer".to_string(),
new_peer_id: "new-peer".to_string(),
new_peer_beacon: beacon,
})
.unwrap();
tokio::time::sleep(Duration::from_millis(300)).await;
let peer = sel_clone.read().unwrap_or_else(|e| e.into_inner()).clone();
assert!(peer.is_some());
assert_eq!(peer.unwrap().as_str(), "new-peer");
handle.abort();
}
#[tokio::test]
async fn test_event_loop_lateral_peer_with_retry() {
use crate::beacon::{BeaconObserver, GeoPosition, HierarchyLevel, MockBeaconStorage};
let storage = Arc::new(MockBeaconStorage::new());
let observer = Arc::new(BeaconObserver::new(storage, "9q8yy".to_string()));
let config = TopologyConfig {
max_retries: 3,
initial_backoff: Duration::from_millis(10),
max_backoff: Duration::from_millis(50),
..Default::default()
};
let builder = TopologyBuilder::new(
config.clone(),
"test-node".to_string(),
GeoPosition::new(37.7749, -122.4194),
HierarchyLevel::Squad,
None,
observer,
);
let transport: Arc<dyn MeshTransport> = Arc::new(FailingTransport::new(1));
let (tx, rx) = mpsc::unbounded_channel();
let peer_connection: Arc<RwLock<Option<Box<dyn MeshConnectionTrait>>>> =
Arc::new(RwLock::new(None));
let selected_peer_id: Arc<RwLock<Option<NodeId>>> = Arc::new(RwLock::new(None));
let lateral_connections: Arc<RwLock<HashMap<String, Box<dyn MeshConnectionTrait>>>> =
Arc::new(RwLock::new(HashMap::new()));
let peer_retry_state: Arc<RwLock<Option<RetryState>>> = Arc::new(RwLock::new(None));
let lateral_retry_state: Arc<RwLock<HashMap<String, RetryState>>> =
Arc::new(RwLock::new(HashMap::new()));
let telemetry_buffer: Arc<RwLock<Vec<DataPacket>>> = Arc::new(RwLock::new(Vec::new()));
let lat_clone = lateral_connections.clone();
let handle = tokio::spawn(TopologyManager::event_loop(
rx,
transport,
peer_connection,
selected_peer_id,
lateral_connections,
peer_retry_state,
lateral_retry_state,
telemetry_buffer,
builder,
config,
));
let beacon = crate::beacon::GeographicBeacon::new(
"lat-retry".to_string(),
GeoPosition::new(37.78, -122.42),
HierarchyLevel::Squad,
);
tx.send(TopologyEvent::LateralPeerDiscovered {
peer_id: "lat-retry".to_string(),
peer_beacon: beacon,
})
.unwrap();
tokio::time::sleep(Duration::from_millis(300)).await;
assert!(lat_clone
.read()
.unwrap_or_else(|e| e.into_inner())
.contains_key("lat-retry"));
handle.abort();
}
#[tokio::test]
async fn test_event_loop_lateral_limit_reached() {
use crate::beacon::{BeaconObserver, GeoPosition, HierarchyLevel, MockBeaconStorage};
let storage = Arc::new(MockBeaconStorage::new());
let observer = Arc::new(BeaconObserver::new(storage, "9q8yy".to_string()));
let config = TopologyConfig {
max_lateral_connections: Some(1),
..Default::default()
};
let builder = TopologyBuilder::new(
config.clone(),
"test-node".to_string(),
GeoPosition::new(37.7749, -122.4194),
HierarchyLevel::Squad,
None,
observer,
);
let transport: Arc<dyn MeshTransport> = Arc::new(MockTransport::new());
let (tx, rx) = mpsc::unbounded_channel();
let peer_connection: Arc<RwLock<Option<Box<dyn MeshConnectionTrait>>>> =
Arc::new(RwLock::new(None));
let selected_peer_id: Arc<RwLock<Option<NodeId>>> = Arc::new(RwLock::new(None));
let lateral_connections: Arc<RwLock<HashMap<String, Box<dyn MeshConnectionTrait>>>> =
Arc::new(RwLock::new(HashMap::new()));
let peer_retry_state: Arc<RwLock<Option<RetryState>>> = Arc::new(RwLock::new(None));
let lateral_retry_state: Arc<RwLock<HashMap<String, RetryState>>> =
Arc::new(RwLock::new(HashMap::new()));
let telemetry_buffer: Arc<RwLock<Vec<DataPacket>>> = Arc::new(RwLock::new(Vec::new()));
let lat_clone = lateral_connections.clone();
let handle = tokio::spawn(TopologyManager::event_loop(
rx,
transport,
peer_connection,
selected_peer_id,
lateral_connections,
peer_retry_state,
lateral_retry_state,
telemetry_buffer,
builder,
config,
));
let beacon1 = crate::beacon::GeographicBeacon::new(
"lat-1".to_string(),
GeoPosition::new(37.78, -122.42),
HierarchyLevel::Squad,
);
tx.send(TopologyEvent::LateralPeerDiscovered {
peer_id: "lat-1".to_string(),
peer_beacon: beacon1,
})
.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
assert_eq!(lat_clone.read().unwrap_or_else(|e| e.into_inner()).len(), 1);
let beacon2 = crate::beacon::GeographicBeacon::new(
"lat-2".to_string(),
GeoPosition::new(37.78, -122.42),
HierarchyLevel::Squad,
);
tx.send(TopologyEvent::LateralPeerDiscovered {
peer_id: "lat-2".to_string(),
peer_beacon: beacon2,
})
.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
handle.abort();
}
#[tokio::test]
async fn test_event_loop_peer_removed_with_connected_transport() {
use crate::beacon::{BeaconObserver, GeoPosition, HierarchyLevel, MockBeaconStorage};
let storage = Arc::new(MockBeaconStorage::new());
let observer = Arc::new(BeaconObserver::new(storage, "9q8yy".to_string()));
let config = TopologyConfig::default();
let builder = TopologyBuilder::new(
config.clone(),
"test-node".to_string(),
GeoPosition::new(37.7749, -122.4194),
HierarchyLevel::Squad,
None,
observer,
);
let transport = Arc::new(MockTransport::new());
let node_id = NodeId::new("stale-peer".to_string());
transport
.connections
.write()
.unwrap_or_else(|e| e.into_inner())
.push(node_id.clone());
let transport: Arc<dyn MeshTransport> = transport;
let (tx, rx) = mpsc::unbounded_channel();
let peer_connection: Arc<RwLock<Option<Box<dyn MeshConnectionTrait>>>> =
Arc::new(RwLock::new(None));
let selected_peer_id: Arc<RwLock<Option<NodeId>>> = Arc::new(RwLock::new(None));
let lateral_connections: Arc<RwLock<HashMap<String, Box<dyn MeshConnectionTrait>>>> =
Arc::new(RwLock::new(HashMap::new()));
let peer_retry_state: Arc<RwLock<Option<RetryState>>> = Arc::new(RwLock::new(None));
let lateral_retry_state: Arc<RwLock<HashMap<String, RetryState>>> =
Arc::new(RwLock::new(HashMap::new()));
let telemetry_buffer: Arc<RwLock<Vec<DataPacket>>> = Arc::new(RwLock::new(Vec::new()));
let handle = tokio::spawn(TopologyManager::event_loop(
rx,
transport.clone(),
peer_connection,
selected_peer_id,
lateral_connections,
peer_retry_state,
lateral_retry_state,
telemetry_buffer,
builder,
config,
));
tx.send(TopologyEvent::PeerRemoved {
linked_peer_id: "stale-peer".to_string(),
})
.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
assert_eq!(transport.peer_count(), 0);
handle.abort();
}
#[tokio::test]
async fn test_event_loop_buffer_flush_on_peer_changed() {
use crate::beacon::{BeaconObserver, GeoPosition, HierarchyLevel, MockBeaconStorage};
let storage = Arc::new(MockBeaconStorage::new());
let observer = Arc::new(BeaconObserver::new(storage, "9q8yy".to_string()));
let config = TopologyConfig::default();
let builder = TopologyBuilder::new(
config.clone(),
"test-node".to_string(),
GeoPosition::new(37.7749, -122.4194),
HierarchyLevel::Squad,
None,
observer,
);
let transport: Arc<dyn MeshTransport> = Arc::new(MockTransport::new());
let (tx, rx) = mpsc::unbounded_channel();
let peer_connection: Arc<RwLock<Option<Box<dyn MeshConnectionTrait>>>> =
Arc::new(RwLock::new(None));
let selected_peer_id: Arc<RwLock<Option<NodeId>>> =
Arc::new(RwLock::new(Some(NodeId::new("old-peer".to_string()))));
let lateral_connections: Arc<RwLock<HashMap<String, Box<dyn MeshConnectionTrait>>>> =
Arc::new(RwLock::new(HashMap::new()));
let peer_retry_state: Arc<RwLock<Option<RetryState>>> = Arc::new(RwLock::new(None));
let lateral_retry_state: Arc<RwLock<HashMap<String, RetryState>>> =
Arc::new(RwLock::new(HashMap::new()));
let telemetry_buffer: Arc<RwLock<Vec<DataPacket>>> = Arc::new(RwLock::new(Vec::new()));
for i in 0..3 {
telemetry_buffer
.write()
.unwrap()
.push(DataPacket::telemetry("n", vec![i]));
}
let buf_clone = telemetry_buffer.clone();
let handle = tokio::spawn(TopologyManager::event_loop(
rx,
transport,
peer_connection,
selected_peer_id,
lateral_connections,
peer_retry_state,
lateral_retry_state,
telemetry_buffer,
builder,
config,
));
let beacon = crate::beacon::GeographicBeacon::new(
"new-parent".to_string(),
GeoPosition::new(37.78, -122.42),
HierarchyLevel::Platoon,
);
tx.send(TopologyEvent::PeerChanged {
old_peer_id: "old-peer".to_string(),
new_peer_id: "new-parent".to_string(),
new_peer_beacon: beacon,
})
.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
assert_eq!(buf_clone.read().unwrap_or_else(|e| e.into_inner()).len(), 0);
handle.abort();
}
#[tokio::test]
async fn test_event_loop_lateral_peer_lost_with_retry_state() {
use crate::beacon::{BeaconObserver, GeoPosition, HierarchyLevel, MockBeaconStorage};
let storage = Arc::new(MockBeaconStorage::new());
let observer = Arc::new(BeaconObserver::new(storage, "9q8yy".to_string()));
let config = TopologyConfig::default();
let builder = TopologyBuilder::new(
config.clone(),
"test-node".to_string(),
GeoPosition::new(37.7749, -122.4194),
HierarchyLevel::Squad,
None,
observer,
);
let transport: Arc<dyn MeshTransport> = Arc::new(MockTransport::new());
let (tx, rx) = mpsc::unbounded_channel();
let peer_connection: Arc<RwLock<Option<Box<dyn MeshConnectionTrait>>>> =
Arc::new(RwLock::new(None));
let selected_peer_id: Arc<RwLock<Option<NodeId>>> = Arc::new(RwLock::new(None));
let lateral_connections: Arc<RwLock<HashMap<String, Box<dyn MeshConnectionTrait>>>> =
Arc::new(RwLock::new(HashMap::new()));
let peer_retry_state: Arc<RwLock<Option<RetryState>>> = Arc::new(RwLock::new(None));
let lateral_retry_state: Arc<RwLock<HashMap<String, RetryState>>> =
Arc::new(RwLock::new(HashMap::new()));
let telemetry_buffer: Arc<RwLock<Vec<DataPacket>>> = Arc::new(RwLock::new(Vec::new()));
lateral_retry_state
.write()
.unwrap_or_else(|e| e.into_inner())
.insert(
"lat-1".to_string(),
RetryState {
attempts: 1,
next_retry: Instant::now() + Duration::from_secs(10),
},
);
let retry_clone = lateral_retry_state.clone();
let handle = tokio::spawn(TopologyManager::event_loop(
rx,
transport,
peer_connection,
selected_peer_id,
lateral_connections,
peer_retry_state,
lateral_retry_state,
telemetry_buffer,
builder,
config,
));
tx.send(TopologyEvent::LateralPeerLost {
peer_id: "lat-1".to_string(),
})
.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
assert!(!retry_clone
.read()
.unwrap_or_else(|e| e.into_inner())
.contains_key("lat-1"));
handle.abort();
}
}