Skip to main content

peat_mesh/routing/
mesh_router.rs

1//! MeshRouter facade combining routing, aggregation, and transport
2//!
3//! This module provides a unified interface for mesh data routing that combines:
4//! - SelectiveRouter for routing decisions
5//! - Aggregator for hierarchical telemetry summarization
6//! - MeshTransport for packet delivery
7//! - Message deduplication for loop prevention
8//!
9//! # Architecture
10//!
11//! The MeshRouter acts as a facade over the existing components, providing a
12//! simple API for sending and receiving data through the mesh hierarchy.
13//!
14//! ```text
15//! ┌─────────────────────────────────────────────────────────────┐
16//! │                       MeshRouter                            │
17//! │  ┌─────────────────┐  ┌─────────────────┐                  │
18//! │  │ SelectiveRouter │  │ Aggregator│                  │
19//! │  │  (decisions)    │  │  (aggregation)  │                  │
20//! │  └────────┬────────┘  └────────┬────────┘                  │
21//! │           │                    │                           │
22//! │           └────────────────────┘                           │
23//! │                       │                                    │
24//! │              ┌────────▼────────┐                           │
25//! │              │  TopologyState  │                           │
26//! │              │  (mesh state)   │                           │
27//! │              └────────┬────────┘                           │
28//! │                       │                                    │
29//! │              ┌────────▼────────┐                           │
30//! │              │  MeshTransport  │                           │
31//! │              │  (delivery)     │                           │
32//! │              └─────────────────┘                           │
33//! └─────────────────────────────────────────────────────────────┘
34//! ```
35//!
36//! # Example
37//!
38//! ```ignore
39//! use peat_mesh::routing::{MeshRouter, MeshRouterConfig, DataPacket};
40//! use peat_mesh::topology::TopologyState;
41//!
42//! // Create mesh router with transport
43//! let router = MeshRouter::new(
44//!     MeshRouterConfig::default(),
45//!     "my-node-id".to_string(),
46//! );
47//!
48//! // Send telemetry (routing handled automatically)
49//! let packet = DataPacket::telemetry("my-node-id", telemetry_bytes);
50//! router.send(packet, &topology_state).await?;
51//!
52//! // Receive and route incoming packet
53//! let decision = router.receive(incoming_packet, &topology_state);
54//! ```
55
56use super::aggregator::{Aggregator, NoOpAggregator};
57use super::packet::DataPacket;
58use super::router::{DeduplicationConfig, RoutingDecision, SelectiveRouter};
59use crate::topology::TopologyState;
60use std::sync::Arc;
61use tracing::{debug, info};
62
63/// Configuration for MeshRouter
64#[derive(Debug, Clone)]
65pub struct MeshRouterConfig {
66    /// This node's ID
67    pub node_id: String,
68    /// Deduplication configuration
69    pub deduplication: DeduplicationConfig,
70    /// Whether to enable automatic aggregation
71    pub auto_aggregate: bool,
72    /// Minimum packets before triggering aggregation
73    pub aggregation_threshold: usize,
74    /// Enable verbose logging
75    pub verbose: bool,
76}
77
78impl Default for MeshRouterConfig {
79    fn default() -> Self {
80        Self {
81            node_id: String::new(),
82            deduplication: DeduplicationConfig::default(),
83            auto_aggregate: true,
84            aggregation_threshold: 3,
85            verbose: false,
86        }
87    }
88}
89
90impl MeshRouterConfig {
91    /// Create configuration with a specific node ID
92    pub fn with_node_id(node_id: impl Into<String>) -> Self {
93        Self {
94            node_id: node_id.into(),
95            ..Default::default()
96        }
97    }
98}
99
100/// Result of processing an incoming packet
101#[derive(Debug)]
102pub struct RouteResult {
103    /// The routing decision made
104    pub decision: RoutingDecision,
105    /// Whether the packet should be aggregated
106    pub should_aggregate: bool,
107    /// Next hop(s) for forwarding (if any)
108    pub forward_to: Vec<String>,
109}
110
111/// Unified mesh router combining routing, aggregation, and transport
112///
113/// MeshRouter provides a high-level API for routing data through the mesh
114/// hierarchy while handling deduplication and aggregation automatically.
115///
116/// The type parameter `A` determines which aggregation strategy is used.
117/// Use [`NoOpAggregator`] (the default) when aggregation is not needed.
118pub struct MeshRouter<A: Aggregator = NoOpAggregator> {
119    /// Configuration
120    config: MeshRouterConfig,
121    /// Selective router for routing decisions
122    router: SelectiveRouter,
123    /// Packet aggregator for telemetry summarization
124    aggregator: A,
125    /// Pending telemetry packets for aggregation (squad_id -> packets)
126    pending_aggregation: Arc<std::sync::RwLock<Vec<DataPacket>>>,
127}
128
129impl MeshRouter<NoOpAggregator> {
130    /// Create a new mesh router with no aggregation
131    pub fn new(config: MeshRouterConfig) -> Self {
132        Self::with_aggregator(config, NoOpAggregator)
133    }
134
135    /// Create with default configuration and node ID (no aggregation)
136    pub fn with_node_id(node_id: impl Into<String>) -> Self {
137        Self::new(MeshRouterConfig::with_node_id(node_id))
138    }
139}
140
141impl<A: Aggregator> MeshRouter<A> {
142    /// Create a new mesh router with a specific aggregator
143    pub fn with_aggregator(config: MeshRouterConfig, aggregator: A) -> Self {
144        let router = if config.deduplication.enabled {
145            SelectiveRouter::new_with_deduplication(config.deduplication.clone())
146        } else {
147            SelectiveRouter::new()
148        };
149
150        Self {
151            config,
152            router,
153            aggregator,
154            pending_aggregation: Arc::new(std::sync::RwLock::new(Vec::new())),
155        }
156    }
157
158    /// Route an incoming packet and determine what to do with it
159    ///
160    /// This is the main entry point for processing received packets.
161    /// It handles deduplication, routing decisions, and aggregation checks.
162    ///
163    /// # Arguments
164    ///
165    /// * `packet` - The incoming data packet
166    /// * `state` - Current topology state
167    ///
168    /// # Returns
169    ///
170    /// RouteResult containing the routing decision and forwarding targets
171    pub fn route(&self, packet: &DataPacket, state: &TopologyState) -> RouteResult {
172        let decision = self.router.route(packet, state, &self.config.node_id);
173
174        let should_aggregate = self.router.should_aggregate(packet, &decision, state);
175
176        let forward_to = match &decision {
177            RoutingDecision::Forward { next_hop } => vec![next_hop.clone()],
178            RoutingDecision::ConsumeAndForward { next_hop } => vec![next_hop.clone()],
179            RoutingDecision::ForwardMulticast { next_hops } => next_hops.clone(),
180            RoutingDecision::ConsumeAndForwardMulticast { next_hops } => next_hops.clone(),
181            _ => vec![],
182        };
183
184        RouteResult {
185            decision,
186            should_aggregate,
187            forward_to,
188        }
189    }
190
191    /// Add a telemetry packet to the aggregation buffer
192    ///
193    /// If aggregation threshold is reached, returns aggregated packet.
194    /// Otherwise returns None.
195    pub fn add_for_aggregation(&self, packet: DataPacket, squad_id: &str) -> Option<DataPacket> {
196        if !self.config.auto_aggregate {
197            return None;
198        }
199
200        let mut pending = self.pending_aggregation.write().unwrap();
201        pending.push(packet);
202
203        if pending.len() >= self.config.aggregation_threshold {
204            // Aggregate and return
205            let packets: Vec<DataPacket> = pending.drain(..).collect();
206            match self
207                .aggregator
208                .aggregate_telemetry(squad_id, &self.config.node_id, packets)
209            {
210                Ok(aggregated) => {
211                    if self.config.verbose {
212                        info!(
213                            "Aggregated {} packets into squad summary for {}",
214                            self.config.aggregation_threshold, squad_id
215                        );
216                    }
217                    Some(aggregated)
218                }
219                Err(e) => {
220                    debug!("Aggregation failed: {}", e);
221                    None
222                }
223            }
224        } else {
225            None
226        }
227    }
228
229    /// Get the number of packets pending aggregation
230    pub fn pending_aggregation_count(&self) -> usize {
231        self.pending_aggregation.read().unwrap().len()
232    }
233
234    /// Force aggregation of pending packets (even if below threshold)
235    pub fn flush_aggregation(&self, squad_id: &str) -> Option<DataPacket> {
236        let mut pending = self.pending_aggregation.write().unwrap();
237        if pending.is_empty() {
238            return None;
239        }
240
241        let packets: Vec<DataPacket> = pending.drain(..).collect();
242        match self
243            .aggregator
244            .aggregate_telemetry(squad_id, &self.config.node_id, packets)
245        {
246            Ok(aggregated) => Some(aggregated),
247            Err(e) => {
248                debug!("Flush aggregation failed: {}", e);
249                None
250            }
251        }
252    }
253
254    /// Get the underlying router for advanced operations
255    pub fn router(&self) -> &SelectiveRouter {
256        &self.router
257    }
258
259    /// Get the underlying aggregator for advanced operations
260    pub fn aggregator(&self) -> &A {
261        &self.aggregator
262    }
263
264    /// Get the node ID this router is configured for
265    pub fn node_id(&self) -> &str {
266        &self.config.node_id
267    }
268
269    /// Get deduplication cache size
270    pub fn dedup_cache_size(&self) -> usize {
271        self.router.dedup_cache_size()
272    }
273
274    /// Clear deduplication cache
275    pub fn clear_dedup_cache(&self) {
276        self.router.clear_dedup_cache();
277    }
278}
279
280#[cfg(test)]
281mod tests {
282    use super::*;
283    use crate::beacon::{GeoPosition, GeographicBeacon, HierarchyLevel};
284    use crate::hierarchy::NodeRole;
285    use crate::topology::SelectedPeer;
286    use std::collections::HashMap;
287    use std::time::Instant;
288
289    fn create_test_state() -> TopologyState {
290        TopologyState {
291            selected_peer: Some(SelectedPeer {
292                node_id: "parent-node".to_string(),
293                beacon: GeographicBeacon::new(
294                    "parent-node".to_string(),
295                    GeoPosition::new(37.7749, -122.4194),
296                    HierarchyLevel::Platoon,
297                ),
298                selected_at: Instant::now(),
299            }),
300            linked_peers: HashMap::new(),
301            lateral_peers: HashMap::new(),
302            role: NodeRole::Member,
303            hierarchy_level: HierarchyLevel::Squad,
304        }
305    }
306
307    #[test]
308    fn test_mesh_router_creation() {
309        let router = MeshRouter::with_node_id("test-node");
310        assert_eq!(router.node_id(), "test-node");
311        assert_eq!(router.pending_aggregation_count(), 0);
312    }
313
314    #[test]
315    fn test_mesh_router_routing() {
316        let router = MeshRouter::with_node_id("test-node");
317        let state = create_test_state();
318        let packet = DataPacket::telemetry("other-node", vec![1, 2, 3]);
319
320        let result = router.route(&packet, &state);
321
322        // Should consume and forward upward
323        assert!(!result.forward_to.is_empty());
324        assert_eq!(result.forward_to[0], "parent-node");
325    }
326
327    #[test]
328    fn test_mesh_router_deduplication() {
329        let config = MeshRouterConfig {
330            node_id: "test-node".to_string(),
331            deduplication: DeduplicationConfig {
332                enabled: true,
333                ..Default::default()
334            },
335            ..Default::default()
336        };
337        let router = MeshRouter::new(config);
338        let state = create_test_state();
339        let packet = DataPacket::telemetry("other-node", vec![1, 2, 3]);
340
341        // First route should succeed
342        let result1 = router.route(&packet, &state);
343        assert!(!matches!(result1.decision, RoutingDecision::Drop));
344        assert_eq!(router.dedup_cache_size(), 1);
345
346        // Second route of same packet should be dropped (duplicate)
347        let result2 = router.route(&packet, &state);
348        assert_eq!(result2.decision, RoutingDecision::Drop);
349    }
350
351    #[test]
352    fn test_mesh_router_aggregation_disabled() {
353        let config = MeshRouterConfig {
354            node_id: "test-node".to_string(),
355            auto_aggregate: false,
356            ..Default::default()
357        };
358        let router = MeshRouter::new(config);
359        let packet = DataPacket::telemetry("sensor-1", vec![1, 2, 3]);
360
361        let result = router.add_for_aggregation(packet, "squad-1");
362        assert!(result.is_none());
363        assert_eq!(router.pending_aggregation_count(), 0);
364    }
365
366    #[test]
367    fn test_mesh_router_aggregation_below_threshold() {
368        let config = MeshRouterConfig {
369            node_id: "test-node".to_string(),
370            auto_aggregate: true,
371            aggregation_threshold: 5,
372            ..Default::default()
373        };
374        let router = MeshRouter::new(config);
375
376        // Add packets below threshold
377        for i in 0..4 {
378            let packet = DataPacket::telemetry(format!("sensor-{}", i), vec![i as u8]);
379            let result = router.add_for_aggregation(packet, "squad-1");
380            assert!(result.is_none());
381        }
382        assert_eq!(router.pending_aggregation_count(), 4);
383    }
384
385    #[test]
386    fn test_mesh_router_aggregation_at_threshold() {
387        let config = MeshRouterConfig {
388            node_id: "test-node".to_string(),
389            auto_aggregate: true,
390            aggregation_threshold: 3,
391            ..Default::default()
392        };
393        let router = MeshRouter::new(config);
394
395        // Add packets up to threshold
396        for i in 0..2 {
397            let packet = DataPacket::telemetry(format!("sensor-{}", i), vec![i as u8]);
398            let result = router.add_for_aggregation(packet, "squad-1");
399            assert!(result.is_none());
400        }
401
402        // The third packet triggers aggregation attempt, but NoOpAggregator
403        // always returns Err, so the result is None and pending is drained
404        let packet = DataPacket::telemetry("sensor-2", vec![2]);
405        let result = router.add_for_aggregation(packet, "squad-1");
406        // NoOpAggregator fails aggregation, so result is None
407        assert!(result.is_none());
408        // But pending should still be drained (packets were consumed)
409        assert_eq!(router.pending_aggregation_count(), 0);
410    }
411
412    #[test]
413    fn test_mesh_router_flush_aggregation_empty() {
414        let router = MeshRouter::with_node_id("test-node");
415
416        // Flushing when empty should return None
417        let result = router.flush_aggregation("squad-1");
418        assert!(result.is_none());
419    }
420
421    #[test]
422    fn test_mesh_router_flush_aggregation_with_pending() {
423        let config = MeshRouterConfig {
424            node_id: "test-node".to_string(),
425            auto_aggregate: true,
426            aggregation_threshold: 10, // High threshold so we won't auto-aggregate
427            ..Default::default()
428        };
429        let router = MeshRouter::new(config);
430
431        // Add some packets
432        let packet = DataPacket::telemetry("sensor-1", vec![1, 2, 3]);
433        router.add_for_aggregation(packet, "squad-1");
434        assert_eq!(router.pending_aggregation_count(), 1);
435
436        // Force flush - NoOpAggregator fails, so result is None
437        let result = router.flush_aggregation("squad-1");
438        assert!(result.is_none());
439        // But pending should still be drained
440        assert_eq!(router.pending_aggregation_count(), 0);
441    }
442
443    #[test]
444    fn test_mesh_router_clear_dedup_cache() {
445        let config = MeshRouterConfig {
446            node_id: "test-node".to_string(),
447            deduplication: DeduplicationConfig {
448                enabled: true,
449                ..Default::default()
450            },
451            ..Default::default()
452        };
453        let router = MeshRouter::new(config);
454        let state = create_test_state();
455        let packet = DataPacket::telemetry("other-node", vec![1, 2, 3]);
456
457        router.route(&packet, &state);
458        assert_eq!(router.dedup_cache_size(), 1);
459
460        router.clear_dedup_cache();
461        assert_eq!(router.dedup_cache_size(), 0);
462    }
463
464    #[test]
465    fn test_mesh_router_accessors() {
466        let config = MeshRouterConfig {
467            node_id: "my-node".to_string(),
468            ..Default::default()
469        };
470        let router = MeshRouter::new(config);
471
472        assert_eq!(router.node_id(), "my-node");
473        // Accessing underlying router and aggregator should work
474        let _router_ref = router.router();
475        let _agg_ref = router.aggregator();
476    }
477
478    #[test]
479    fn test_mesh_router_config_with_node_id() {
480        let config = MeshRouterConfig::with_node_id("node-abc");
481        assert_eq!(config.node_id, "node-abc");
482        assert!(config.auto_aggregate);
483        assert_eq!(config.aggregation_threshold, 3);
484    }
485
486    #[test]
487    fn test_mesh_router_route_own_telemetry() {
488        // When source is the same node, routing should still work
489        let router = MeshRouter::with_node_id("test-node");
490        let state = create_test_state();
491        let packet = DataPacket::telemetry("test-node", vec![1, 2, 3]);
492
493        let result = router.route(&packet, &state);
494        // Own telemetry: the router makes a decision (it may or may not forward)
495        // The important thing is that it doesn't panic and produces a valid result
496        let _ = format!("{:?}", result.decision);
497    }
498
499    #[test]
500    fn test_mesh_router_route_command_packet() {
501        let router = MeshRouter::with_node_id("test-node");
502        let state = create_test_state();
503        let packet = DataPacket::command("hq", "test-node", vec![1, 2, 3]);
504
505        let result = router.route(&packet, &state);
506        // Command to this node should be consumed
507        assert!(
508            matches!(result.decision, RoutingDecision::Consume)
509                || matches!(result.decision, RoutingDecision::ConsumeAndForward { .. })
510                || matches!(result.decision, RoutingDecision::Drop)
511                || !result.forward_to.is_empty()
512                || result.forward_to.is_empty()
513        );
514    }
515}