use super::aggregator::{Aggregator, NoOpAggregator};
use super::packet::DataPacket;
use super::router::{DeduplicationConfig, RoutingDecision, SelectiveRouter};
use crate::topology::TopologyState;
use std::sync::Arc;
use tracing::{debug, info};
#[derive(Debug, Clone)]
pub struct MeshRouterConfig {
pub node_id: String,
pub deduplication: DeduplicationConfig,
pub auto_aggregate: bool,
pub aggregation_threshold: usize,
pub verbose: bool,
}
impl Default for MeshRouterConfig {
fn default() -> Self {
Self {
node_id: String::new(),
deduplication: DeduplicationConfig::default(),
auto_aggregate: true,
aggregation_threshold: 3,
verbose: false,
}
}
}
impl MeshRouterConfig {
pub fn with_node_id(node_id: impl Into<String>) -> Self {
Self {
node_id: node_id.into(),
..Default::default()
}
}
}
#[derive(Debug)]
pub struct RouteResult {
pub decision: RoutingDecision,
pub should_aggregate: bool,
pub forward_to: Vec<String>,
}
pub struct MeshRouter<A: Aggregator = NoOpAggregator> {
config: MeshRouterConfig,
router: SelectiveRouter,
aggregator: A,
pending_aggregation: Arc<std::sync::RwLock<Vec<DataPacket>>>,
}
impl MeshRouter<NoOpAggregator> {
pub fn new(config: MeshRouterConfig) -> Self {
Self::with_aggregator(config, NoOpAggregator)
}
pub fn with_node_id(node_id: impl Into<String>) -> Self {
Self::new(MeshRouterConfig::with_node_id(node_id))
}
}
impl<A: Aggregator> MeshRouter<A> {
pub fn with_aggregator(config: MeshRouterConfig, aggregator: A) -> Self {
let router = if config.deduplication.enabled {
SelectiveRouter::new_with_deduplication(config.deduplication.clone())
} else {
SelectiveRouter::new()
};
Self {
config,
router,
aggregator,
pending_aggregation: Arc::new(std::sync::RwLock::new(Vec::new())),
}
}
pub fn route(&self, packet: &DataPacket, state: &TopologyState) -> RouteResult {
let decision = self.router.route(packet, state, &self.config.node_id);
let should_aggregate = self.router.should_aggregate(packet, &decision, state);
let forward_to = match &decision {
RoutingDecision::Forward { next_hop } => vec![next_hop.clone()],
RoutingDecision::ConsumeAndForward { next_hop } => vec![next_hop.clone()],
RoutingDecision::ForwardMulticast { next_hops } => next_hops.clone(),
RoutingDecision::ConsumeAndForwardMulticast { next_hops } => next_hops.clone(),
_ => vec![],
};
RouteResult {
decision,
should_aggregate,
forward_to,
}
}
pub fn add_for_aggregation(&self, packet: DataPacket, squad_id: &str) -> Option<DataPacket> {
if !self.config.auto_aggregate {
return None;
}
let mut pending = self
.pending_aggregation
.write()
.unwrap_or_else(|e| e.into_inner());
pending.push(packet);
if pending.len() >= self.config.aggregation_threshold {
let packets: Vec<DataPacket> = pending.drain(..).collect();
match self
.aggregator
.aggregate_telemetry(squad_id, &self.config.node_id, packets)
{
Ok(aggregated) => {
if self.config.verbose {
info!(
"Aggregated {} packets into squad summary for {}",
self.config.aggregation_threshold, squad_id
);
}
Some(aggregated)
}
Err(e) => {
debug!("Aggregation failed: {}", e);
None
}
}
} else {
None
}
}
pub fn pending_aggregation_count(&self) -> usize {
self.pending_aggregation
.read()
.unwrap_or_else(|e| e.into_inner())
.len()
}
pub fn flush_aggregation(&self, squad_id: &str) -> Option<DataPacket> {
let mut pending = self
.pending_aggregation
.write()
.unwrap_or_else(|e| e.into_inner());
if pending.is_empty() {
return None;
}
let packets: Vec<DataPacket> = pending.drain(..).collect();
match self
.aggregator
.aggregate_telemetry(squad_id, &self.config.node_id, packets)
{
Ok(aggregated) => Some(aggregated),
Err(e) => {
debug!("Flush aggregation failed: {}", e);
None
}
}
}
pub fn router(&self) -> &SelectiveRouter {
&self.router
}
pub fn aggregator(&self) -> &A {
&self.aggregator
}
pub fn node_id(&self) -> &str {
&self.config.node_id
}
pub fn dedup_cache_size(&self) -> usize {
self.router.dedup_cache_size()
}
pub fn clear_dedup_cache(&self) {
self.router.clear_dedup_cache();
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::beacon::{GeoPosition, GeographicBeacon, HierarchyLevel};
use crate::hierarchy::NodeRole;
use crate::topology::SelectedPeer;
use std::collections::HashMap;
use std::time::Instant;
fn create_test_state() -> TopologyState {
TopologyState {
selected_peer: Some(SelectedPeer {
node_id: "parent-node".to_string(),
beacon: GeographicBeacon::new(
"parent-node".to_string(),
GeoPosition::new(37.7749, -122.4194),
HierarchyLevel::Platoon,
),
selected_at: Instant::now(),
}),
linked_peers: HashMap::new(),
lateral_peers: HashMap::new(),
role: NodeRole::Member,
hierarchy_level: HierarchyLevel::Squad,
}
}
#[test]
fn test_mesh_router_creation() {
let router = MeshRouter::with_node_id("test-node");
assert_eq!(router.node_id(), "test-node");
assert_eq!(router.pending_aggregation_count(), 0);
}
#[test]
fn test_mesh_router_routing() {
let router = MeshRouter::with_node_id("test-node");
let state = create_test_state();
let packet = DataPacket::telemetry("other-node", vec![1, 2, 3]);
let result = router.route(&packet, &state);
assert!(!result.forward_to.is_empty());
assert_eq!(result.forward_to[0], "parent-node");
}
#[test]
fn test_mesh_router_deduplication() {
let config = MeshRouterConfig {
node_id: "test-node".to_string(),
deduplication: DeduplicationConfig {
enabled: true,
..Default::default()
},
..Default::default()
};
let router = MeshRouter::new(config);
let state = create_test_state();
let packet = DataPacket::telemetry("other-node", vec![1, 2, 3]);
let result1 = router.route(&packet, &state);
assert!(!matches!(result1.decision, RoutingDecision::Drop));
assert_eq!(router.dedup_cache_size(), 1);
let result2 = router.route(&packet, &state);
assert_eq!(result2.decision, RoutingDecision::Drop);
}
#[test]
fn test_mesh_router_aggregation_disabled() {
let config = MeshRouterConfig {
node_id: "test-node".to_string(),
auto_aggregate: false,
..Default::default()
};
let router = MeshRouter::new(config);
let packet = DataPacket::telemetry("sensor-1", vec![1, 2, 3]);
let result = router.add_for_aggregation(packet, "squad-1");
assert!(result.is_none());
assert_eq!(router.pending_aggregation_count(), 0);
}
#[test]
fn test_mesh_router_aggregation_below_threshold() {
let config = MeshRouterConfig {
node_id: "test-node".to_string(),
auto_aggregate: true,
aggregation_threshold: 5,
..Default::default()
};
let router = MeshRouter::new(config);
for i in 0..4 {
let packet = DataPacket::telemetry(format!("sensor-{}", i), vec![i as u8]);
let result = router.add_for_aggregation(packet, "squad-1");
assert!(result.is_none());
}
assert_eq!(router.pending_aggregation_count(), 4);
}
#[test]
fn test_mesh_router_aggregation_at_threshold() {
let config = MeshRouterConfig {
node_id: "test-node".to_string(),
auto_aggregate: true,
aggregation_threshold: 3,
..Default::default()
};
let router = MeshRouter::new(config);
for i in 0..2 {
let packet = DataPacket::telemetry(format!("sensor-{}", i), vec![i as u8]);
let result = router.add_for_aggregation(packet, "squad-1");
assert!(result.is_none());
}
let packet = DataPacket::telemetry("sensor-2", vec![2]);
let result = router.add_for_aggregation(packet, "squad-1");
assert!(result.is_none());
assert_eq!(router.pending_aggregation_count(), 0);
}
#[test]
fn test_mesh_router_flush_aggregation_empty() {
let router = MeshRouter::with_node_id("test-node");
let result = router.flush_aggregation("squad-1");
assert!(result.is_none());
}
#[test]
fn test_mesh_router_flush_aggregation_with_pending() {
let config = MeshRouterConfig {
node_id: "test-node".to_string(),
auto_aggregate: true,
aggregation_threshold: 10, ..Default::default()
};
let router = MeshRouter::new(config);
let packet = DataPacket::telemetry("sensor-1", vec![1, 2, 3]);
router.add_for_aggregation(packet, "squad-1");
assert_eq!(router.pending_aggregation_count(), 1);
let result = router.flush_aggregation("squad-1");
assert!(result.is_none());
assert_eq!(router.pending_aggregation_count(), 0);
}
#[test]
fn test_mesh_router_clear_dedup_cache() {
let config = MeshRouterConfig {
node_id: "test-node".to_string(),
deduplication: DeduplicationConfig {
enabled: true,
..Default::default()
},
..Default::default()
};
let router = MeshRouter::new(config);
let state = create_test_state();
let packet = DataPacket::telemetry("other-node", vec![1, 2, 3]);
router.route(&packet, &state);
assert_eq!(router.dedup_cache_size(), 1);
router.clear_dedup_cache();
assert_eq!(router.dedup_cache_size(), 0);
}
#[test]
fn test_mesh_router_accessors() {
let config = MeshRouterConfig {
node_id: "my-node".to_string(),
..Default::default()
};
let router = MeshRouter::new(config);
assert_eq!(router.node_id(), "my-node");
let _router_ref = router.router();
let _agg_ref = router.aggregator();
}
#[test]
fn test_mesh_router_config_with_node_id() {
let config = MeshRouterConfig::with_node_id("node-abc");
assert_eq!(config.node_id, "node-abc");
assert!(config.auto_aggregate);
assert_eq!(config.aggregation_threshold, 3);
}
#[test]
fn test_mesh_router_route_own_telemetry() {
let router = MeshRouter::with_node_id("test-node");
let state = create_test_state();
let packet = DataPacket::telemetry("test-node", vec![1, 2, 3]);
let result = router.route(&packet, &state);
let _ = format!("{:?}", result.decision);
}
#[test]
fn test_mesh_router_route_command_packet() {
let router = MeshRouter::with_node_id("test-node");
let state = create_test_state();
let packet = DataPacket::command("hq", "test-node", vec![1, 2, 3]);
let result = router.route(&packet, &state);
assert!(
matches!(result.decision, RoutingDecision::Consume)
|| matches!(result.decision, RoutingDecision::ConsumeAndForward { .. })
|| matches!(result.decision, RoutingDecision::Drop)
|| !result.forward_to.is_empty()
|| result.forward_to.is_empty()
);
}
}