use crate::beacon::{BeaconObserver, GeoPosition, GeographicBeacon, HierarchyLevel, NodeProfile};
use crate::hierarchy::{HierarchyStrategy, NodeRole};
use crate::topology::partition::PartitionDetector;
use crate::topology::selection::{PeerSelector, SelectionConfig};
use std::collections::{HashMap, HashSet};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
#[derive(Debug, Clone)]
#[allow(clippy::enum_variant_names)] pub enum TopologyEvent {
PeerSelected {
selected_peer_id: String,
peer_beacon: GeographicBeacon,
},
PeerChanged {
old_peer_id: String,
new_peer_id: String,
new_peer_beacon: GeographicBeacon,
},
PeerLost { lost_peer_id: String },
PeerAdded { linked_peer_id: String },
PeerRemoved { linked_peer_id: String },
LateralPeerDiscovered {
peer_id: String,
peer_beacon: GeographicBeacon,
},
LateralPeerLost { peer_id: String },
RoleChanged {
old_role: NodeRole,
new_role: NodeRole,
},
LevelChanged {
old_level: HierarchyLevel,
new_level: HierarchyLevel,
},
}
#[derive(Debug, Clone)]
pub struct TopologyState {
pub selected_peer: Option<SelectedPeer>,
pub linked_peers: HashMap<String, Instant>,
pub lateral_peers: HashMap<String, Instant>,
pub role: NodeRole,
pub hierarchy_level: HierarchyLevel,
}
impl Default for TopologyState {
fn default() -> Self {
Self {
selected_peer: None,
linked_peers: HashMap::new(),
lateral_peers: HashMap::new(),
role: NodeRole::default(),
hierarchy_level: HierarchyLevel::Squad, }
}
}
#[derive(Debug, Clone)]
pub struct SelectedPeer {
pub node_id: String,
pub beacon: GeographicBeacon,
pub selected_at: Instant,
}
#[derive(Debug, Clone)]
pub struct TopologyConfig {
pub selection: SelectionConfig,
pub reevaluation_interval: Option<Duration>,
pub peer_change_cooldown: Duration,
pub peer_timeout: Duration,
pub hierarchy_strategy: Option<Arc<dyn HierarchyStrategy>>,
pub max_lateral_connections: Option<usize>,
pub max_retries: u32,
pub initial_backoff: Duration,
pub max_backoff: Duration,
pub backoff_multiplier: f64,
pub max_telemetry_buffer_size: usize,
pub metrics_collector: Option<Arc<dyn crate::topology::metrics::MetricsCollector>>,
pub partition_detector: Option<Arc<Mutex<PartitionDetector>>>,
}
impl Default for TopologyConfig {
fn default() -> Self {
Self {
selection: SelectionConfig::default(),
reevaluation_interval: Some(Duration::from_secs(30)),
peer_change_cooldown: Duration::from_secs(60),
peer_timeout: Duration::from_secs(180), hierarchy_strategy: None, max_lateral_connections: Some(10), max_retries: 3, initial_backoff: Duration::from_secs(1), max_backoff: Duration::from_secs(60), backoff_multiplier: 2.0, max_telemetry_buffer_size: 100, metrics_collector: None, partition_detector: None, }
}
}
pub struct TopologyBuilder {
config: TopologyConfig,
#[allow(dead_code)]
node_id: String,
position: Arc<Mutex<GeoPosition>>,
hierarchy_level: HierarchyLevel,
#[allow(dead_code)]
profile: Option<NodeProfile>,
observer: Arc<BeaconObserver>,
state: Arc<Mutex<TopologyState>>,
event_tx: mpsc::UnboundedSender<TopologyEvent>,
event_rx: Mutex<Option<mpsc::UnboundedReceiver<TopologyEvent>>>,
task_handle: Mutex<Option<JoinHandle<()>>>,
}
impl Clone for TopologyBuilder {
fn clone(&self) -> Self {
Self {
config: self.config.clone(),
node_id: self.node_id.clone(),
position: self.position.clone(),
hierarchy_level: self.hierarchy_level,
profile: self.profile.clone(),
observer: self.observer.clone(),
state: self.state.clone(),
event_tx: self.event_tx.clone(),
event_rx: Mutex::new(None), task_handle: Mutex::new(None), }
}
}
impl TopologyBuilder {
pub fn new(
config: TopologyConfig,
node_id: String,
position: GeoPosition,
hierarchy_level: HierarchyLevel,
profile: Option<NodeProfile>,
observer: Arc<BeaconObserver>,
) -> Self {
let (event_tx, event_rx) = mpsc::unbounded_channel();
Self {
config,
node_id,
position: Arc::new(Mutex::new(position)),
hierarchy_level,
profile,
observer,
state: Arc::new(Mutex::new(TopologyState::default())),
event_tx,
event_rx: Mutex::new(Some(event_rx)),
task_handle: Mutex::new(None),
}
}
pub async fn start(&self) {
self.start_with_token(None).await;
}
pub async fn start_with_token(&self, cancel: Option<tokio_util::sync::CancellationToken>) {
let mut handle_guard = self.task_handle.lock().unwrap_or_else(|e| e.into_inner());
if handle_guard.is_some() {
return; }
let config = self.config.clone();
let position = self.position.clone();
let hierarchy_level = self.hierarchy_level;
let profile = self.profile.clone();
let observer = self.observer.clone();
let state = self.state.clone();
let event_tx = self.event_tx.clone();
let handle = tokio::spawn(async move {
let mut interval = config.reevaluation_interval.map(tokio::time::interval);
loop {
let cancelled = if let Some(ref token) = cancel {
if let Some(ref mut int) = interval {
tokio::select! {
_ = int.tick() => false,
() = token.cancelled() => true,
}
} else {
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(60)) => false,
() = token.cancelled() => true,
}
}
} else if let Some(ref mut int) = interval {
int.tick().await;
false
} else {
tokio::time::sleep(Duration::from_secs(60)).await;
false
};
if cancelled {
tracing::debug!("Topology builder shutting down via cancellation token");
break;
}
let current_pos = *position.lock().unwrap_or_else(|e| e.into_inner());
let selector =
PeerSelector::new(config.selection.clone(), current_pos, hierarchy_level);
let nearby = observer.get_nearby_beacons().await;
let mut state_lock = state.lock().unwrap_or_else(|e| e.into_inner());
let current_hierarchy_level = if let (Some(strategy), Some(prof)) =
(config.hierarchy_strategy.as_ref(), profile.as_ref())
{
let new_level = strategy.determine_level(prof);
let new_role = strategy.determine_role(prof, &nearby);
if new_level != state_lock.hierarchy_level {
let old_level = state_lock.hierarchy_level;
state_lock.hierarchy_level = new_level;
let _ = event_tx.send(TopologyEvent::LevelChanged {
old_level,
new_level,
});
}
if new_role != state_lock.role {
let old_role = state_lock.role;
state_lock.role = new_role;
let _ = event_tx.send(TopologyEvent::RoleChanged { old_role, new_role });
}
new_level
} else {
hierarchy_level
};
let needs_peer =
Self::check_peer_status(&mut state_lock, &config, &nearby, &event_tx);
if needs_peer {
if let Some(candidate) = selector.select_peer(&nearby) {
Self::update_selected_peer(&mut state_lock, &event_tx, candidate.beacon);
}
}
Self::update_linked_peers(
&mut state_lock,
&config,
&nearby,
current_hierarchy_level,
&event_tx,
);
Self::update_lateral_peers(
&mut state_lock,
&config,
&nearby,
current_hierarchy_level,
&event_tx,
);
drop(state_lock);
if let Some(ref partition_detector) = config.partition_detector {
let beacons_map: HashMap<String, GeographicBeacon> = nearby
.iter()
.map(|beacon| (beacon.node_id.clone(), beacon.clone()))
.collect();
if let Some(_event) = partition_detector
.lock()
.unwrap_or_else(|e| e.into_inner())
.check_partition(&beacons_map)
{
}
}
}
});
*handle_guard = Some(handle);
}
pub async fn stop(&self) {
if let Some(handle) = self
.task_handle
.lock()
.unwrap_or_else(|e| e.into_inner())
.take()
{
handle.abort();
}
}
pub fn get_state(&self) -> TopologyState {
self.state.lock().unwrap_or_else(|e| e.into_inner()).clone()
}
pub fn get_selected_peer(&self) -> Option<SelectedPeer> {
self.state
.lock()
.unwrap_or_else(|e| e.into_inner())
.selected_peer
.clone()
}
pub fn config(&self) -> &TopologyConfig {
&self.config
}
pub fn subscribe(&self) -> Option<mpsc::UnboundedReceiver<TopologyEvent>> {
self.event_rx
.lock()
.unwrap_or_else(|e| e.into_inner())
.take()
}
pub fn update_position(&self, position: GeoPosition) {
*self.position.lock().unwrap_or_else(|e| e.into_inner()) = position;
}
pub async fn reevaluate_peer(&self) {
let current_pos = *self.position.lock().unwrap_or_else(|e| e.into_inner());
let selector = PeerSelector::new(
self.config.selection.clone(),
current_pos,
self.hierarchy_level,
);
let nearby = self.observer.get_nearby_beacons().await;
let mut state_lock = self.state.lock().unwrap_or_else(|e| e.into_inner());
if let Some(candidate) = selector.select_peer(&nearby) {
let should_switch = if let Some(ref current) = state_lock.selected_peer {
let elapsed = current.selected_at.elapsed();
if elapsed < self.config.peer_change_cooldown {
false
} else {
let current_score = if let Some(current_beacon) =
nearby.iter().find(|b| b.node_id == current.node_id)
{
selector
.select_peer(std::slice::from_ref(current_beacon))
.map(|c| c.score)
.unwrap_or(0.0)
} else {
0.0 };
candidate.score > current_score * 1.1 }
} else {
true };
if should_switch {
Self::update_selected_peer(&mut state_lock, &self.event_tx, candidate.beacon);
}
}
}
fn check_peer_status(
state: &mut TopologyState,
config: &TopologyConfig,
nearby: &[GeographicBeacon],
event_tx: &mpsc::UnboundedSender<TopologyEvent>,
) -> bool {
if let Some(ref selected_peer) = state.selected_peer {
if nearby.iter().any(|b| b.node_id == selected_peer.node_id) {
false
} else {
if selected_peer.selected_at.elapsed() > config.peer_timeout {
let lost_peer_id = selected_peer.node_id.clone();
state.selected_peer = None;
let _ = event_tx.send(TopologyEvent::PeerLost { lost_peer_id });
true
} else {
false
}
}
} else {
true
}
}
fn update_selected_peer(
state: &mut TopologyState,
event_tx: &mpsc::UnboundedSender<TopologyEvent>,
new_peer_beacon: GeographicBeacon,
) {
let new_peer_id = new_peer_beacon.node_id.clone();
let event = if let Some(ref current) = state.selected_peer {
TopologyEvent::PeerChanged {
old_peer_id: current.node_id.clone(),
new_peer_id: new_peer_id.clone(),
new_peer_beacon: new_peer_beacon.clone(),
}
} else {
TopologyEvent::PeerSelected {
selected_peer_id: new_peer_id.clone(),
peer_beacon: new_peer_beacon.clone(),
}
};
state.selected_peer = Some(SelectedPeer {
node_id: new_peer_id,
beacon: new_peer_beacon,
selected_at: Instant::now(),
});
let _ = event_tx.send(event);
}
fn update_linked_peers(
state: &mut TopologyState,
config: &TopologyConfig,
nearby: &[GeographicBeacon],
own_level: HierarchyLevel,
event_tx: &mpsc::UnboundedSender<TopologyEvent>,
) {
let now = Instant::now();
let potential_linked: Vec<&GeographicBeacon> = nearby
.iter()
.filter(|beacon| {
own_level.can_be_parent_of(&beacon.hierarchy_level)
})
.collect();
for beacon in &potential_linked {
if let Some(last_seen) = state.linked_peers.get_mut(&beacon.node_id) {
*last_seen = now;
} else {
state.linked_peers.insert(beacon.node_id.clone(), now);
let _ = event_tx.send(TopologyEvent::PeerAdded {
linked_peer_id: beacon.node_id.clone(),
});
}
}
let potential_linked_ids: HashSet<_> =
potential_linked.iter().map(|b| &b.node_id).collect();
let mut expired_peers = Vec::new();
for (peer_id, last_seen) in &state.linked_peers {
if !potential_linked_ids.contains(peer_id) && last_seen.elapsed() > config.peer_timeout
{
expired_peers.push(peer_id.clone());
}
}
for peer_id in expired_peers {
state.linked_peers.remove(&peer_id);
let _ = event_tx.send(TopologyEvent::PeerRemoved {
linked_peer_id: peer_id,
});
}
}
fn update_lateral_peers(
state: &mut TopologyState,
config: &TopologyConfig,
nearby: &[GeographicBeacon],
own_level: HierarchyLevel,
event_tx: &mpsc::UnboundedSender<TopologyEvent>,
) {
let now = Instant::now();
let potential_lateral: Vec<&GeographicBeacon> = nearby
.iter()
.filter(|beacon| beacon.hierarchy_level == own_level)
.collect();
for beacon in &potential_lateral {
if let Some(last_seen) = state.lateral_peers.get_mut(&beacon.node_id) {
*last_seen = now;
} else {
state.lateral_peers.insert(beacon.node_id.clone(), now);
let _ = event_tx.send(TopologyEvent::LateralPeerDiscovered {
peer_id: beacon.node_id.clone(),
peer_beacon: (*beacon).clone(),
});
}
}
let potential_lateral_ids: HashSet<_> =
potential_lateral.iter().map(|b| &b.node_id).collect();
let mut expired_peers = Vec::new();
for (peer_id, last_seen) in &state.lateral_peers {
if !potential_lateral_ids.contains(peer_id) && last_seen.elapsed() > config.peer_timeout
{
expired_peers.push(peer_id.clone());
}
}
for peer_id in expired_peers {
state.lateral_peers.remove(&peer_id);
let _ = event_tx.send(TopologyEvent::LateralPeerLost { peer_id });
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::beacon::MockBeaconStorage;
use std::sync::Arc;
#[tokio::test]
async fn test_topology_builder_creation() {
let storage = Arc::new(MockBeaconStorage::new());
let observer_geohash = "9q8yy".to_string();
let observer = Arc::new(BeaconObserver::new(storage, observer_geohash));
let builder = TopologyBuilder::new(
TopologyConfig::default(),
"test-node".to_string(),
GeoPosition::new(37.7749, -122.4194),
HierarchyLevel::Squad,
None,
observer,
);
let state = builder.get_state();
assert!(state.selected_peer.is_none());
assert!(state.linked_peers.is_empty());
}
#[tokio::test]
async fn test_subscribe_returns_receiver() {
let storage = Arc::new(MockBeaconStorage::new());
let observer_geohash = "9q8yy".to_string();
let observer = Arc::new(BeaconObserver::new(storage, observer_geohash));
let builder = TopologyBuilder::new(
TopologyConfig::default(),
"test-node".to_string(),
GeoPosition::new(37.7749, -122.4194),
HierarchyLevel::Squad,
None,
observer,
);
let rx = builder.subscribe();
assert!(rx.is_some());
let rx2 = builder.subscribe();
assert!(rx2.is_none());
}
#[tokio::test]
async fn test_update_position() {
let storage = Arc::new(MockBeaconStorage::new());
let observer_geohash = "9q8yy".to_string();
let observer = Arc::new(BeaconObserver::new(storage, observer_geohash));
let builder = TopologyBuilder::new(
TopologyConfig::default(),
"test-node".to_string(),
GeoPosition::new(37.7749, -122.4194),
HierarchyLevel::Squad,
None,
observer,
);
let new_pos = GeoPosition::new(37.8000, -122.4000);
builder.update_position(new_pos);
let updated_pos = *builder.position.lock().unwrap_or_else(|e| e.into_inner());
assert_eq!(updated_pos.lat, 37.8000);
assert_eq!(updated_pos.lon, -122.4000);
}
#[test]
fn test_linked_peer_tracking() {
use crate::beacon::GeoPosition;
let mut nearby_beacons = Vec::new();
let mut linked_beacon = GeographicBeacon::new(
"linked-peer".to_string(),
GeoPosition::new(37.7750, -122.4195),
HierarchyLevel::Platform,
);
linked_beacon.can_parent = false; nearby_beacons.push(linked_beacon);
let mut same_level_beacon = GeographicBeacon::new(
"same-level".to_string(),
GeoPosition::new(37.7751, -122.4196),
HierarchyLevel::Platoon,
);
same_level_beacon.can_parent = true;
nearby_beacons.push(same_level_beacon);
let mut state = TopologyState::default();
let config = TopologyConfig::default();
let (event_tx, mut event_rx) = mpsc::unbounded_channel();
TopologyBuilder::update_linked_peers(
&mut state,
&config,
&nearby_beacons,
HierarchyLevel::Platoon, &event_tx,
);
assert_eq!(state.linked_peers.len(), 1);
assert!(state.linked_peers.contains_key("linked-peer"));
let event = event_rx.try_recv().unwrap();
match event {
TopologyEvent::PeerAdded { linked_peer_id } => {
assert_eq!(linked_peer_id, "linked-peer");
}
_ => panic!("Expected PeerAdded event"),
}
}
#[test]
fn test_linked_peer_expiry() {
use std::time::Duration;
let mut state = TopologyState::default();
let config = TopologyConfig {
peer_timeout: Duration::from_millis(100), ..Default::default()
};
let (event_tx, mut event_rx) = mpsc::unbounded_channel();
let old_time = Instant::now() - Duration::from_millis(200);
state
.linked_peers
.insert("stale-peer".to_string(), old_time);
let nearby_beacons = Vec::new();
TopologyBuilder::update_linked_peers(
&mut state,
&config,
&nearby_beacons,
HierarchyLevel::Platoon,
&event_tx,
);
assert_eq!(state.linked_peers.len(), 0);
let event = event_rx.try_recv().unwrap();
match event {
TopologyEvent::PeerRemoved { linked_peer_id } => {
assert_eq!(linked_peer_id, "stale-peer");
}
_ => panic!("Expected PeerRemoved event"),
}
}
#[tokio::test]
async fn test_get_state_initial_values() {
let storage = Arc::new(MockBeaconStorage::new());
let observer = Arc::new(BeaconObserver::new(storage, "9q8yy".to_string()));
let builder = TopologyBuilder::new(
TopologyConfig::default(),
"node-1".to_string(),
GeoPosition::new(37.7749, -122.4194),
HierarchyLevel::Squad,
None,
observer,
);
let state = builder.get_state();
assert!(state.selected_peer.is_none());
assert!(state.linked_peers.is_empty());
assert!(state.lateral_peers.is_empty());
assert_eq!(state.hierarchy_level, HierarchyLevel::Squad);
assert_eq!(state.role, crate::hierarchy::NodeRole::Standalone);
}
#[tokio::test]
async fn test_get_selected_peer_returns_none_initially() {
let storage = Arc::new(MockBeaconStorage::new());
let observer = Arc::new(BeaconObserver::new(storage, "9q8yy".to_string()));
let builder = TopologyBuilder::new(
TopologyConfig::default(),
"node-1".to_string(),
GeoPosition::new(37.7749, -122.4194),
HierarchyLevel::Squad,
None,
observer,
);
assert!(builder.get_selected_peer().is_none());
}
#[tokio::test]
async fn test_config_returns_the_config() {
let storage = Arc::new(MockBeaconStorage::new());
let observer = Arc::new(BeaconObserver::new(storage, "9q8yy".to_string()));
let config = TopologyConfig {
peer_change_cooldown: Duration::from_secs(120),
peer_timeout: Duration::from_secs(300),
max_retries: 5,
..Default::default()
};
let builder = TopologyBuilder::new(
config,
"node-1".to_string(),
GeoPosition::new(37.7749, -122.4194),
HierarchyLevel::Squad,
None,
observer,
);
let cfg = builder.config();
assert_eq!(cfg.peer_change_cooldown, Duration::from_secs(120));
assert_eq!(cfg.peer_timeout, Duration::from_secs(300));
assert_eq!(cfg.max_retries, 5);
}
#[tokio::test]
async fn test_stop_aborts_task_handle() {
let storage = Arc::new(MockBeaconStorage::new());
let observer = Arc::new(BeaconObserver::new(storage, "9q8yy".to_string()));
let builder = TopologyBuilder::new(
TopologyConfig::default(),
"node-1".to_string(),
GeoPosition::new(37.7749, -122.4194),
HierarchyLevel::Squad,
None,
observer,
);
builder.stop().await;
builder.start().await;
assert!(builder
.task_handle
.lock()
.unwrap_or_else(|e| e.into_inner())
.is_some());
builder.stop().await;
assert!(builder
.task_handle
.lock()
.unwrap_or_else(|e| e.into_inner())
.is_none());
}
#[tokio::test]
async fn test_start_is_idempotent() {
let storage = Arc::new(MockBeaconStorage::new());
let observer = Arc::new(BeaconObserver::new(storage, "9q8yy".to_string()));
let builder = TopologyBuilder::new(
TopologyConfig::default(),
"node-1".to_string(),
GeoPosition::new(37.7749, -122.4194),
HierarchyLevel::Squad,
None,
observer,
);
builder.start().await;
builder.start().await;
assert!(builder
.task_handle
.lock()
.unwrap_or_else(|e| e.into_inner())
.is_some());
builder.stop().await;
}
#[tokio::test]
async fn test_reevaluate_peer_no_candidates() {
let storage = Arc::new(MockBeaconStorage::new());
let observer = Arc::new(BeaconObserver::new(storage, "9q8yy".to_string()));
let builder = TopologyBuilder::new(
TopologyConfig::default(),
"node-1".to_string(),
GeoPosition::new(37.7749, -122.4194),
HierarchyLevel::Squad,
None,
observer,
);
builder.reevaluate_peer().await;
assert!(builder.get_selected_peer().is_none());
}
#[tokio::test]
async fn test_reevaluate_peer_cooldown_not_elapsed() {
let storage = Arc::new(MockBeaconStorage::new());
let observer = Arc::new(BeaconObserver::new(storage, "9q8yy".to_string()));
let config = TopologyConfig {
peer_change_cooldown: Duration::from_secs(3600), ..Default::default()
};
let builder = TopologyBuilder::new(
config,
"node-1".to_string(),
GeoPosition::new(37.7749, -122.4194),
HierarchyLevel::Squad,
None,
observer,
);
let beacon = create_test_beacon("peer-A", HierarchyLevel::Platoon);
{
let mut state = builder.state.lock().unwrap_or_else(|e| e.into_inner());
TopologyBuilder::update_selected_peer(&mut state, &builder.event_tx, beacon);
}
assert_eq!(builder.get_selected_peer().unwrap().node_id, "peer-A");
builder.reevaluate_peer().await;
assert_eq!(builder.get_selected_peer().unwrap().node_id, "peer-A");
}
#[test]
fn test_check_peer_status_no_selected_peer() {
let mut state = TopologyState::default();
let config = TopologyConfig::default();
let (event_tx, _event_rx) = mpsc::unbounded_channel();
let nearby: Vec<GeographicBeacon> = vec![];
let needs_peer =
TopologyBuilder::check_peer_status(&mut state, &config, &nearby, &event_tx);
assert!(needs_peer);
}
#[test]
fn test_check_peer_status_peer_still_visible() {
let mut state = TopologyState::default();
let config = TopologyConfig::default();
let (event_tx, _event_rx) = mpsc::unbounded_channel();
let beacon = create_test_beacon("peer-A", HierarchyLevel::Platoon);
state.selected_peer = Some(SelectedPeer {
node_id: "peer-A".to_string(),
beacon: beacon.clone(),
selected_at: Instant::now(),
});
let nearby = vec![beacon];
let needs_peer =
TopologyBuilder::check_peer_status(&mut state, &config, &nearby, &event_tx);
assert!(!needs_peer);
assert!(state.selected_peer.is_some()); }
#[test]
fn test_check_peer_status_peer_lost_after_timeout() {
let mut state = TopologyState::default();
let config = TopologyConfig {
peer_timeout: Duration::from_millis(50),
..Default::default()
};
let (event_tx, mut event_rx) = mpsc::unbounded_channel();
let beacon = create_test_beacon("peer-A", HierarchyLevel::Platoon);
state.selected_peer = Some(SelectedPeer {
node_id: "peer-A".to_string(),
beacon,
selected_at: Instant::now() - Duration::from_millis(200),
});
let nearby: Vec<GeographicBeacon> = vec![];
let needs_peer =
TopologyBuilder::check_peer_status(&mut state, &config, &nearby, &event_tx);
assert!(needs_peer);
assert!(state.selected_peer.is_none());
let event = event_rx.try_recv().unwrap();
match event {
TopologyEvent::PeerLost { lost_peer_id } => {
assert_eq!(lost_peer_id, "peer-A");
}
_ => panic!("Expected PeerLost event"),
}
}
#[test]
fn test_check_peer_status_peer_not_visible_but_not_timed_out() {
let mut state = TopologyState::default();
let config = TopologyConfig {
peer_timeout: Duration::from_secs(3600), ..Default::default()
};
let (event_tx, _event_rx) = mpsc::unbounded_channel();
let beacon = create_test_beacon("peer-A", HierarchyLevel::Platoon);
state.selected_peer = Some(SelectedPeer {
node_id: "peer-A".to_string(),
beacon,
selected_at: Instant::now(),
});
let nearby: Vec<GeographicBeacon> = vec![];
let needs_peer =
TopologyBuilder::check_peer_status(&mut state, &config, &nearby, &event_tx);
assert!(!needs_peer); assert!(state.selected_peer.is_some()); }
#[test]
fn test_update_selected_peer_first_selection() {
let mut state = TopologyState::default();
let (event_tx, mut event_rx) = mpsc::unbounded_channel();
let beacon = create_test_beacon("peer-A", HierarchyLevel::Platoon);
TopologyBuilder::update_selected_peer(&mut state, &event_tx, beacon);
let selected = state.selected_peer.as_ref().unwrap();
assert_eq!(selected.node_id, "peer-A");
let event = event_rx.try_recv().unwrap();
match event {
TopologyEvent::PeerSelected {
selected_peer_id,
peer_beacon,
} => {
assert_eq!(selected_peer_id, "peer-A");
assert_eq!(peer_beacon.node_id, "peer-A");
}
_ => panic!("Expected PeerSelected event"),
}
}
#[test]
fn test_update_selected_peer_change_event() {
let mut state = TopologyState::default();
let (event_tx, mut event_rx) = mpsc::unbounded_channel();
let beacon_a = create_test_beacon("peer-A", HierarchyLevel::Platoon);
TopologyBuilder::update_selected_peer(&mut state, &event_tx, beacon_a);
let _ = event_rx.try_recv();
let beacon_b = create_test_beacon("peer-B", HierarchyLevel::Platoon);
TopologyBuilder::update_selected_peer(&mut state, &event_tx, beacon_b);
assert_eq!(state.selected_peer.as_ref().unwrap().node_id, "peer-B");
let event = event_rx.try_recv().unwrap();
match event {
TopologyEvent::PeerChanged {
old_peer_id,
new_peer_id,
new_peer_beacon,
} => {
assert_eq!(old_peer_id, "peer-A");
assert_eq!(new_peer_id, "peer-B");
assert_eq!(new_peer_beacon.node_id, "peer-B");
}
_ => panic!("Expected PeerChanged event"),
}
}
#[test]
fn test_update_lateral_peers_discovery() {
let mut state = TopologyState::default();
let config = TopologyConfig::default();
let (event_tx, mut event_rx) = mpsc::unbounded_channel();
let lateral_beacon = create_test_beacon("lateral-1", HierarchyLevel::Platoon);
let different_level_beacon = create_test_beacon("lower-1", HierarchyLevel::Squad);
let nearby = vec![lateral_beacon.clone(), different_level_beacon];
TopologyBuilder::update_lateral_peers(
&mut state,
&config,
&nearby,
HierarchyLevel::Platoon,
&event_tx,
);
assert_eq!(state.lateral_peers.len(), 1);
assert!(state.lateral_peers.contains_key("lateral-1"));
let event = event_rx.try_recv().unwrap();
match event {
TopologyEvent::LateralPeerDiscovered {
peer_id,
peer_beacon,
} => {
assert_eq!(peer_id, "lateral-1");
assert_eq!(peer_beacon.node_id, "lateral-1");
}
_ => panic!("Expected LateralPeerDiscovered event"),
}
assert!(event_rx.try_recv().is_err());
}
#[test]
fn test_update_lateral_peers_refresh_existing() {
let mut state = TopologyState::default();
let config = TopologyConfig::default();
let (event_tx, mut event_rx) = mpsc::unbounded_channel();
let old_time = Instant::now() - Duration::from_secs(60);
state
.lateral_peers
.insert("lateral-1".to_string(), old_time);
let lateral_beacon = create_test_beacon("lateral-1", HierarchyLevel::Platoon);
let nearby = vec![lateral_beacon];
TopologyBuilder::update_lateral_peers(
&mut state,
&config,
&nearby,
HierarchyLevel::Platoon,
&event_tx,
);
assert_eq!(state.lateral_peers.len(), 1);
let last_seen = state.lateral_peers.get("lateral-1").unwrap();
assert!(*last_seen > old_time);
assert!(event_rx.try_recv().is_err());
}
#[test]
fn test_update_lateral_peers_expiry() {
let mut state = TopologyState::default();
let config = TopologyConfig {
peer_timeout: Duration::from_millis(50),
..Default::default()
};
let (event_tx, mut event_rx) = mpsc::unbounded_channel();
let old_time = Instant::now() - Duration::from_millis(200);
state
.lateral_peers
.insert("stale-lateral".to_string(), old_time);
let nearby: Vec<GeographicBeacon> = vec![];
TopologyBuilder::update_lateral_peers(
&mut state,
&config,
&nearby,
HierarchyLevel::Platoon,
&event_tx,
);
assert!(state.lateral_peers.is_empty());
let event = event_rx.try_recv().unwrap();
match event {
TopologyEvent::LateralPeerLost { peer_id } => {
assert_eq!(peer_id, "stale-lateral");
}
_ => panic!("Expected LateralPeerLost event"),
}
}
#[test]
fn test_update_lateral_peers_not_expired_within_timeout() {
let mut state = TopologyState::default();
let config = TopologyConfig {
peer_timeout: Duration::from_secs(3600), ..Default::default()
};
let (event_tx, mut event_rx) = mpsc::unbounded_channel();
state
.lateral_peers
.insert("recent-lateral".to_string(), Instant::now());
let nearby: Vec<GeographicBeacon> = vec![];
TopologyBuilder::update_lateral_peers(
&mut state,
&config,
&nearby,
HierarchyLevel::Platoon,
&event_tx,
);
assert_eq!(state.lateral_peers.len(), 1);
assert!(event_rx.try_recv().is_err());
}
#[test]
fn test_topology_config_default_values() {
let config = TopologyConfig::default();
assert_eq!(config.reevaluation_interval, Some(Duration::from_secs(30)));
assert_eq!(config.peer_change_cooldown, Duration::from_secs(60));
assert_eq!(config.peer_timeout, Duration::from_secs(180));
assert!(config.hierarchy_strategy.is_none());
assert_eq!(config.max_lateral_connections, Some(10));
assert_eq!(config.max_retries, 3);
assert_eq!(config.initial_backoff, Duration::from_secs(1));
assert_eq!(config.max_backoff, Duration::from_secs(60));
assert_eq!(config.backoff_multiplier, 2.0);
assert_eq!(config.max_telemetry_buffer_size, 100);
assert!(config.metrics_collector.is_none());
assert!(config.partition_detector.is_none());
}
#[test]
fn test_topology_state_default_values() {
let state = TopologyState::default();
assert!(state.selected_peer.is_none());
assert!(state.linked_peers.is_empty());
assert!(state.lateral_peers.is_empty());
assert_eq!(state.role, crate::hierarchy::NodeRole::Standalone);
assert_eq!(state.hierarchy_level, HierarchyLevel::Squad);
}
#[test]
fn test_topology_event_enum_variants() {
let beacon = create_test_beacon("peer-1", HierarchyLevel::Platoon);
let events: Vec<TopologyEvent> = vec![
TopologyEvent::PeerSelected {
selected_peer_id: "peer-1".to_string(),
peer_beacon: beacon.clone(),
},
TopologyEvent::PeerChanged {
old_peer_id: "peer-old".to_string(),
new_peer_id: "peer-new".to_string(),
new_peer_beacon: beacon.clone(),
},
TopologyEvent::PeerLost {
lost_peer_id: "peer-lost".to_string(),
},
TopologyEvent::PeerAdded {
linked_peer_id: "linked-1".to_string(),
},
TopologyEvent::PeerRemoved {
linked_peer_id: "linked-1".to_string(),
},
TopologyEvent::LateralPeerDiscovered {
peer_id: "lateral-1".to_string(),
peer_beacon: beacon.clone(),
},
TopologyEvent::LateralPeerLost {
peer_id: "lateral-1".to_string(),
},
TopologyEvent::RoleChanged {
old_role: crate::hierarchy::NodeRole::Standalone,
new_role: crate::hierarchy::NodeRole::Leader,
},
TopologyEvent::LevelChanged {
old_level: HierarchyLevel::Squad,
new_level: HierarchyLevel::Platoon,
},
];
for event in &events {
let debug_str = format!("{:?}", event);
assert!(!debug_str.is_empty());
let _cloned = event.clone();
}
assert_eq!(events.len(), 9);
}
#[test]
fn test_selected_peer_struct() {
let beacon = create_test_beacon("peer-1", HierarchyLevel::Platoon);
let now = Instant::now();
let selected = SelectedPeer {
node_id: "peer-1".to_string(),
beacon: beacon.clone(),
selected_at: now,
};
assert_eq!(selected.node_id, "peer-1");
assert_eq!(selected.beacon.node_id, "peer-1");
assert!(selected.selected_at.elapsed() < Duration::from_secs(1));
let cloned = selected.clone();
assert_eq!(cloned.node_id, "peer-1");
let debug_str = format!("{:?}", selected);
assert!(debug_str.contains("peer-1"));
}
#[test]
fn test_linked_peer_refresh_existing() {
let mut state = TopologyState::default();
let config = TopologyConfig::default();
let (event_tx, mut event_rx) = mpsc::unbounded_channel();
let old_time = Instant::now() - Duration::from_secs(60);
state
.linked_peers
.insert("linked-peer".to_string(), old_time);
let mut beacon = GeographicBeacon::new(
"linked-peer".to_string(),
GeoPosition::new(37.7750, -122.4195),
HierarchyLevel::Platform,
);
beacon.can_parent = false;
let nearby = vec![beacon];
TopologyBuilder::update_linked_peers(
&mut state,
&config,
&nearby,
HierarchyLevel::Platoon, &event_tx,
);
assert_eq!(state.linked_peers.len(), 1);
let last_seen = state.linked_peers.get("linked-peer").unwrap();
assert!(*last_seen > old_time);
assert!(event_rx.try_recv().is_err());
}
#[test]
fn test_linked_peer_not_expired_within_timeout() {
let mut state = TopologyState::default();
let config = TopologyConfig {
peer_timeout: Duration::from_secs(3600),
..Default::default()
};
let (event_tx, mut event_rx) = mpsc::unbounded_channel();
state
.linked_peers
.insert("recent-linked".to_string(), Instant::now());
let nearby: Vec<GeographicBeacon> = vec![];
TopologyBuilder::update_linked_peers(
&mut state,
&config,
&nearby,
HierarchyLevel::Platoon,
&event_tx,
);
assert_eq!(state.linked_peers.len(), 1);
assert!(event_rx.try_recv().is_err());
}
#[test]
fn test_linked_peers_higher_level_not_tracked() {
let mut state = TopologyState::default();
let config = TopologyConfig::default();
let (event_tx, mut event_rx) = mpsc::unbounded_channel();
let mut beacon = GeographicBeacon::new(
"higher-peer".to_string(),
GeoPosition::new(37.7750, -122.4195),
HierarchyLevel::Company,
);
beacon.can_parent = true;
let nearby = vec![beacon];
TopologyBuilder::update_linked_peers(
&mut state,
&config,
&nearby,
HierarchyLevel::Platoon, &event_tx,
);
assert!(state.linked_peers.is_empty());
assert!(event_rx.try_recv().is_err());
}
#[tokio::test]
async fn test_clone_topology_builder() {
let storage = Arc::new(MockBeaconStorage::new());
let observer = Arc::new(BeaconObserver::new(storage, "9q8yy".to_string()));
let builder = TopologyBuilder::new(
TopologyConfig::default(),
"node-1".to_string(),
GeoPosition::new(37.7749, -122.4194),
HierarchyLevel::Squad,
None,
observer,
);
let cloned = builder.clone();
assert!(cloned.get_selected_peer().is_none());
assert!(cloned.get_state().linked_peers.is_empty());
assert!(cloned.subscribe().is_none());
let rx = builder.subscribe();
assert!(rx.is_some());
}
#[test]
fn test_update_lateral_peers_multiple_peers() {
let mut state = TopologyState::default();
let config = TopologyConfig::default();
let (event_tx, mut event_rx) = mpsc::unbounded_channel();
let lateral_a = create_test_beacon("lateral-A", HierarchyLevel::Platoon);
let lateral_b = create_test_beacon("lateral-B", HierarchyLevel::Platoon);
let nearby = vec![lateral_a, lateral_b];
TopologyBuilder::update_lateral_peers(
&mut state,
&config,
&nearby,
HierarchyLevel::Platoon,
&event_tx,
);
assert_eq!(state.lateral_peers.len(), 2);
assert!(state.lateral_peers.contains_key("lateral-A"));
assert!(state.lateral_peers.contains_key("lateral-B"));
let mut discovered_ids: Vec<String> = Vec::new();
while let Ok(event) = event_rx.try_recv() {
match event {
TopologyEvent::LateralPeerDiscovered { peer_id, .. } => {
discovered_ids.push(peer_id);
}
_ => panic!("Expected LateralPeerDiscovered events"),
}
}
discovered_ids.sort();
assert_eq!(discovered_ids, vec!["lateral-A", "lateral-B"]);
}
#[test]
fn test_update_linked_peers_multiple_additions() {
let mut state = TopologyState::default();
let config = TopologyConfig::default();
let (event_tx, mut event_rx) = mpsc::unbounded_channel();
let mut beacon_a = GeographicBeacon::new(
"linked-A".to_string(),
GeoPosition::new(37.7750, -122.4195),
HierarchyLevel::Platform,
);
beacon_a.can_parent = false;
let mut beacon_b = GeographicBeacon::new(
"linked-B".to_string(),
GeoPosition::new(37.7751, -122.4196),
HierarchyLevel::Squad,
);
beacon_b.can_parent = false;
let nearby = vec![beacon_a, beacon_b];
TopologyBuilder::update_linked_peers(
&mut state,
&config,
&nearby,
HierarchyLevel::Company, &event_tx,
);
assert_eq!(state.linked_peers.len(), 2);
let mut added_ids: Vec<String> = Vec::new();
while let Ok(event) = event_rx.try_recv() {
match event {
TopologyEvent::PeerAdded { linked_peer_id } => {
added_ids.push(linked_peer_id);
}
_ => panic!("Expected PeerAdded events"),
}
}
added_ids.sort();
assert_eq!(added_ids, vec!["linked-A", "linked-B"]);
}
fn create_test_beacon(node_id: &str, level: HierarchyLevel) -> GeographicBeacon {
let mut beacon = GeographicBeacon::new(
node_id.to_string(),
GeoPosition::new(37.7750, -122.4195),
level,
);
beacon.can_parent = true;
beacon.parent_priority = 100;
beacon
}
}