Skip to main content

peat_mesh/routing/
mesh_router.rs

1//! MeshRouter facade combining routing, aggregation, and transport
2//!
3//! This module provides a unified interface for mesh data routing that combines:
4//! - SelectiveRouter for routing decisions
5//! - Aggregator for hierarchical telemetry summarization
6//! - MeshTransport for packet delivery
7//! - Message deduplication for loop prevention
8//!
9//! # Architecture
10//!
11//! The MeshRouter acts as a facade over the existing components, providing a
12//! simple API for sending and receiving data through the mesh hierarchy.
13//!
14//! ```text
15//! ┌─────────────────────────────────────────────────────────────┐
16//! │                       MeshRouter                            │
17//! │  ┌─────────────────┐  ┌─────────────────┐                  │
18//! │  │ SelectiveRouter │  │ Aggregator│                  │
19//! │  │  (decisions)    │  │  (aggregation)  │                  │
20//! │  └────────┬────────┘  └────────┬────────┘                  │
21//! │           │                    │                           │
22//! │           └────────────────────┘                           │
23//! │                       │                                    │
24//! │              ┌────────▼────────┐                           │
25//! │              │  TopologyState  │                           │
26//! │              │  (mesh state)   │                           │
27//! │              └────────┬────────┘                           │
28//! │                       │                                    │
29//! │              ┌────────▼────────┐                           │
30//! │              │  MeshTransport  │                           │
31//! │              │  (delivery)     │                           │
32//! │              └─────────────────┘                           │
33//! └─────────────────────────────────────────────────────────────┘
34//! ```
35//!
36//! # Example
37//!
38//! ```ignore
39//! use peat_mesh::routing::{MeshRouter, MeshRouterConfig, DataPacket};
40//! use peat_mesh::topology::TopologyState;
41//!
42//! // Create mesh router with transport
43//! let router = MeshRouter::new(
44//!     MeshRouterConfig::default(),
45//!     "my-node-id".to_string(),
46//! );
47//!
48//! // Send telemetry (routing handled automatically)
49//! let packet = DataPacket::telemetry("my-node-id", telemetry_bytes);
50//! router.send(packet, &topology_state).await?;
51//!
52//! // Receive and route incoming packet
53//! let decision = router.receive(incoming_packet, &topology_state);
54//! ```
55
56use super::aggregator::{Aggregator, NoOpAggregator};
57use super::packet::DataPacket;
58use super::router::{DeduplicationConfig, RoutingDecision, SelectiveRouter};
59use crate::topology::TopologyState;
60use std::sync::Arc;
61use tracing::{debug, info};
62
63/// Configuration for MeshRouter
64#[derive(Debug, Clone)]
65pub struct MeshRouterConfig {
66    /// This node's ID
67    pub node_id: String,
68    /// Deduplication configuration
69    pub deduplication: DeduplicationConfig,
70    /// Whether to enable automatic aggregation
71    pub auto_aggregate: bool,
72    /// Minimum packets before triggering aggregation
73    pub aggregation_threshold: usize,
74    /// Enable verbose logging
75    pub verbose: bool,
76}
77
78impl Default for MeshRouterConfig {
79    fn default() -> Self {
80        Self {
81            node_id: String::new(),
82            deduplication: DeduplicationConfig::default(),
83            auto_aggregate: true,
84            aggregation_threshold: 3,
85            verbose: false,
86        }
87    }
88}
89
90impl MeshRouterConfig {
91    /// Create configuration with a specific node ID
92    pub fn with_node_id(node_id: impl Into<String>) -> Self {
93        Self {
94            node_id: node_id.into(),
95            ..Default::default()
96        }
97    }
98}
99
100/// Result of processing an incoming packet
101#[derive(Debug)]
102pub struct RouteResult {
103    /// The routing decision made
104    pub decision: RoutingDecision,
105    /// Whether the packet should be aggregated
106    pub should_aggregate: bool,
107    /// Next hop(s) for forwarding (if any)
108    pub forward_to: Vec<String>,
109}
110
111/// Unified mesh router combining routing, aggregation, and transport
112///
113/// MeshRouter provides a high-level API for routing data through the mesh
114/// hierarchy while handling deduplication and aggregation automatically.
115///
116/// The type parameter `A` determines which aggregation strategy is used.
117/// Use [`NoOpAggregator`] (the default) when aggregation is not needed.
118pub struct MeshRouter<A: Aggregator = NoOpAggregator> {
119    /// Configuration
120    config: MeshRouterConfig,
121    /// Selective router for routing decisions
122    router: SelectiveRouter,
123    /// Packet aggregator for telemetry summarization
124    aggregator: A,
125    /// Pending telemetry packets for aggregation (squad_id -> packets)
126    pending_aggregation: Arc<std::sync::RwLock<Vec<DataPacket>>>,
127}
128
129impl MeshRouter<NoOpAggregator> {
130    /// Create a new mesh router with no aggregation
131    pub fn new(config: MeshRouterConfig) -> Self {
132        Self::with_aggregator(config, NoOpAggregator)
133    }
134
135    /// Create with default configuration and node ID (no aggregation)
136    pub fn with_node_id(node_id: impl Into<String>) -> Self {
137        Self::new(MeshRouterConfig::with_node_id(node_id))
138    }
139}
140
141impl<A: Aggregator> MeshRouter<A> {
142    /// Create a new mesh router with a specific aggregator
143    pub fn with_aggregator(config: MeshRouterConfig, aggregator: A) -> Self {
144        let router = if config.deduplication.enabled {
145            SelectiveRouter::new_with_deduplication(config.deduplication.clone())
146        } else {
147            SelectiveRouter::new()
148        };
149
150        Self {
151            config,
152            router,
153            aggregator,
154            pending_aggregation: Arc::new(std::sync::RwLock::new(Vec::new())),
155        }
156    }
157
158    /// Route an incoming packet and determine what to do with it
159    ///
160    /// This is the main entry point for processing received packets.
161    /// It handles deduplication, routing decisions, and aggregation checks.
162    ///
163    /// # Arguments
164    ///
165    /// * `packet` - The incoming data packet
166    /// * `state` - Current topology state
167    ///
168    /// # Returns
169    ///
170    /// RouteResult containing the routing decision and forwarding targets
171    pub fn route(&self, packet: &DataPacket, state: &TopologyState) -> RouteResult {
172        let decision = self.router.route(packet, state, &self.config.node_id);
173
174        let should_aggregate = self.router.should_aggregate(packet, &decision, state);
175
176        let forward_to = match &decision {
177            RoutingDecision::Forward { next_hop } => vec![next_hop.clone()],
178            RoutingDecision::ConsumeAndForward { next_hop } => vec![next_hop.clone()],
179            RoutingDecision::ForwardMulticast { next_hops } => next_hops.clone(),
180            RoutingDecision::ConsumeAndForwardMulticast { next_hops } => next_hops.clone(),
181            _ => vec![],
182        };
183
184        RouteResult {
185            decision,
186            should_aggregate,
187            forward_to,
188        }
189    }
190
191    /// Add a telemetry packet to the aggregation buffer
192    ///
193    /// If aggregation threshold is reached, returns aggregated packet.
194    /// Otherwise returns None.
195    pub fn add_for_aggregation(&self, packet: DataPacket, squad_id: &str) -> Option<DataPacket> {
196        if !self.config.auto_aggregate {
197            return None;
198        }
199
200        let mut pending = self
201            .pending_aggregation
202            .write()
203            .unwrap_or_else(|e| e.into_inner());
204        pending.push(packet);
205
206        if pending.len() >= self.config.aggregation_threshold {
207            // Aggregate and return
208            let packets: Vec<DataPacket> = pending.drain(..).collect();
209            match self
210                .aggregator
211                .aggregate_telemetry(squad_id, &self.config.node_id, packets)
212            {
213                Ok(aggregated) => {
214                    if self.config.verbose {
215                        info!(
216                            "Aggregated {} packets into squad summary for {}",
217                            self.config.aggregation_threshold, squad_id
218                        );
219                    }
220                    Some(aggregated)
221                }
222                Err(e) => {
223                    debug!("Aggregation failed: {}", e);
224                    None
225                }
226            }
227        } else {
228            None
229        }
230    }
231
232    /// Get the number of packets pending aggregation
233    pub fn pending_aggregation_count(&self) -> usize {
234        self.pending_aggregation
235            .read()
236            .unwrap_or_else(|e| e.into_inner())
237            .len()
238    }
239
240    /// Force aggregation of pending packets (even if below threshold)
241    pub fn flush_aggregation(&self, squad_id: &str) -> Option<DataPacket> {
242        let mut pending = self
243            .pending_aggregation
244            .write()
245            .unwrap_or_else(|e| e.into_inner());
246        if pending.is_empty() {
247            return None;
248        }
249
250        let packets: Vec<DataPacket> = pending.drain(..).collect();
251        match self
252            .aggregator
253            .aggregate_telemetry(squad_id, &self.config.node_id, packets)
254        {
255            Ok(aggregated) => Some(aggregated),
256            Err(e) => {
257                debug!("Flush aggregation failed: {}", e);
258                None
259            }
260        }
261    }
262
263    /// Get the underlying router for advanced operations
264    pub fn router(&self) -> &SelectiveRouter {
265        &self.router
266    }
267
268    /// Get the underlying aggregator for advanced operations
269    pub fn aggregator(&self) -> &A {
270        &self.aggregator
271    }
272
273    /// Get the node ID this router is configured for
274    pub fn node_id(&self) -> &str {
275        &self.config.node_id
276    }
277
278    /// Get deduplication cache size
279    pub fn dedup_cache_size(&self) -> usize {
280        self.router.dedup_cache_size()
281    }
282
283    /// Clear deduplication cache
284    pub fn clear_dedup_cache(&self) {
285        self.router.clear_dedup_cache();
286    }
287}
288
289#[cfg(test)]
290mod tests {
291    use super::*;
292    use crate::beacon::{GeoPosition, GeographicBeacon, HierarchyLevel};
293    use crate::hierarchy::NodeRole;
294    use crate::topology::SelectedPeer;
295    use std::collections::HashMap;
296    use std::time::Instant;
297
298    fn create_test_state() -> TopologyState {
299        TopologyState {
300            selected_peer: Some(SelectedPeer {
301                node_id: "parent-node".to_string(),
302                beacon: GeographicBeacon::new(
303                    "parent-node".to_string(),
304                    GeoPosition::new(37.7749, -122.4194),
305                    HierarchyLevel::Platoon,
306                ),
307                selected_at: Instant::now(),
308            }),
309            linked_peers: HashMap::new(),
310            lateral_peers: HashMap::new(),
311            role: NodeRole::Member,
312            hierarchy_level: HierarchyLevel::Squad,
313        }
314    }
315
316    #[test]
317    fn test_mesh_router_creation() {
318        let router = MeshRouter::with_node_id("test-node");
319        assert_eq!(router.node_id(), "test-node");
320        assert_eq!(router.pending_aggregation_count(), 0);
321    }
322
323    #[test]
324    fn test_mesh_router_routing() {
325        let router = MeshRouter::with_node_id("test-node");
326        let state = create_test_state();
327        let packet = DataPacket::telemetry("other-node", vec![1, 2, 3]);
328
329        let result = router.route(&packet, &state);
330
331        // Should consume and forward upward
332        assert!(!result.forward_to.is_empty());
333        assert_eq!(result.forward_to[0], "parent-node");
334    }
335
336    #[test]
337    fn test_mesh_router_deduplication() {
338        let config = MeshRouterConfig {
339            node_id: "test-node".to_string(),
340            deduplication: DeduplicationConfig {
341                enabled: true,
342                ..Default::default()
343            },
344            ..Default::default()
345        };
346        let router = MeshRouter::new(config);
347        let state = create_test_state();
348        let packet = DataPacket::telemetry("other-node", vec![1, 2, 3]);
349
350        // First route should succeed
351        let result1 = router.route(&packet, &state);
352        assert!(!matches!(result1.decision, RoutingDecision::Drop));
353        assert_eq!(router.dedup_cache_size(), 1);
354
355        // Second route of same packet should be dropped (duplicate)
356        let result2 = router.route(&packet, &state);
357        assert_eq!(result2.decision, RoutingDecision::Drop);
358    }
359
360    #[test]
361    fn test_mesh_router_aggregation_disabled() {
362        let config = MeshRouterConfig {
363            node_id: "test-node".to_string(),
364            auto_aggregate: false,
365            ..Default::default()
366        };
367        let router = MeshRouter::new(config);
368        let packet = DataPacket::telemetry("sensor-1", vec![1, 2, 3]);
369
370        let result = router.add_for_aggregation(packet, "squad-1");
371        assert!(result.is_none());
372        assert_eq!(router.pending_aggregation_count(), 0);
373    }
374
375    #[test]
376    fn test_mesh_router_aggregation_below_threshold() {
377        let config = MeshRouterConfig {
378            node_id: "test-node".to_string(),
379            auto_aggregate: true,
380            aggregation_threshold: 5,
381            ..Default::default()
382        };
383        let router = MeshRouter::new(config);
384
385        // Add packets below threshold
386        for i in 0..4 {
387            let packet = DataPacket::telemetry(format!("sensor-{}", i), vec![i as u8]);
388            let result = router.add_for_aggregation(packet, "squad-1");
389            assert!(result.is_none());
390        }
391        assert_eq!(router.pending_aggregation_count(), 4);
392    }
393
394    #[test]
395    fn test_mesh_router_aggregation_at_threshold() {
396        let config = MeshRouterConfig {
397            node_id: "test-node".to_string(),
398            auto_aggregate: true,
399            aggregation_threshold: 3,
400            ..Default::default()
401        };
402        let router = MeshRouter::new(config);
403
404        // Add packets up to threshold
405        for i in 0..2 {
406            let packet = DataPacket::telemetry(format!("sensor-{}", i), vec![i as u8]);
407            let result = router.add_for_aggregation(packet, "squad-1");
408            assert!(result.is_none());
409        }
410
411        // The third packet triggers aggregation attempt, but NoOpAggregator
412        // always returns Err, so the result is None and pending is drained
413        let packet = DataPacket::telemetry("sensor-2", vec![2]);
414        let result = router.add_for_aggregation(packet, "squad-1");
415        // NoOpAggregator fails aggregation, so result is None
416        assert!(result.is_none());
417        // But pending should still be drained (packets were consumed)
418        assert_eq!(router.pending_aggregation_count(), 0);
419    }
420
421    #[test]
422    fn test_mesh_router_flush_aggregation_empty() {
423        let router = MeshRouter::with_node_id("test-node");
424
425        // Flushing when empty should return None
426        let result = router.flush_aggregation("squad-1");
427        assert!(result.is_none());
428    }
429
430    #[test]
431    fn test_mesh_router_flush_aggregation_with_pending() {
432        let config = MeshRouterConfig {
433            node_id: "test-node".to_string(),
434            auto_aggregate: true,
435            aggregation_threshold: 10, // High threshold so we won't auto-aggregate
436            ..Default::default()
437        };
438        let router = MeshRouter::new(config);
439
440        // Add some packets
441        let packet = DataPacket::telemetry("sensor-1", vec![1, 2, 3]);
442        router.add_for_aggregation(packet, "squad-1");
443        assert_eq!(router.pending_aggregation_count(), 1);
444
445        // Force flush - NoOpAggregator fails, so result is None
446        let result = router.flush_aggregation("squad-1");
447        assert!(result.is_none());
448        // But pending should still be drained
449        assert_eq!(router.pending_aggregation_count(), 0);
450    }
451
452    #[test]
453    fn test_mesh_router_clear_dedup_cache() {
454        let config = MeshRouterConfig {
455            node_id: "test-node".to_string(),
456            deduplication: DeduplicationConfig {
457                enabled: true,
458                ..Default::default()
459            },
460            ..Default::default()
461        };
462        let router = MeshRouter::new(config);
463        let state = create_test_state();
464        let packet = DataPacket::telemetry("other-node", vec![1, 2, 3]);
465
466        router.route(&packet, &state);
467        assert_eq!(router.dedup_cache_size(), 1);
468
469        router.clear_dedup_cache();
470        assert_eq!(router.dedup_cache_size(), 0);
471    }
472
473    #[test]
474    fn test_mesh_router_accessors() {
475        let config = MeshRouterConfig {
476            node_id: "my-node".to_string(),
477            ..Default::default()
478        };
479        let router = MeshRouter::new(config);
480
481        assert_eq!(router.node_id(), "my-node");
482        // Accessing underlying router and aggregator should work
483        let _router_ref = router.router();
484        let _agg_ref = router.aggregator();
485    }
486
487    #[test]
488    fn test_mesh_router_config_with_node_id() {
489        let config = MeshRouterConfig::with_node_id("node-abc");
490        assert_eq!(config.node_id, "node-abc");
491        assert!(config.auto_aggregate);
492        assert_eq!(config.aggregation_threshold, 3);
493    }
494
495    #[test]
496    fn test_mesh_router_route_own_telemetry() {
497        // When source is the same node, routing should still work
498        let router = MeshRouter::with_node_id("test-node");
499        let state = create_test_state();
500        let packet = DataPacket::telemetry("test-node", vec![1, 2, 3]);
501
502        let result = router.route(&packet, &state);
503        // Own telemetry: the router makes a decision (it may or may not forward)
504        // The important thing is that it doesn't panic and produces a valid result
505        let _ = format!("{:?}", result.decision);
506    }
507
508    #[test]
509    fn test_mesh_router_route_command_packet() {
510        let router = MeshRouter::with_node_id("test-node");
511        let state = create_test_state();
512        let packet = DataPacket::command("hq", "test-node", vec![1, 2, 3]);
513
514        let result = router.route(&packet, &state);
515        // Command to this node should be consumed
516        assert!(
517            matches!(result.decision, RoutingDecision::Consume)
518                || matches!(result.decision, RoutingDecision::ConsumeAndForward { .. })
519                || matches!(result.decision, RoutingDecision::Drop)
520                || !result.forward_to.is_empty()
521                || result.forward_to.is_empty()
522        );
523    }
524}