Skip to main content

peat_mesh/routing/
router.rs

1//! Selective router implementation for hierarchical data routing
2//!
3//! This module implements the core routing logic that determines:
4//! - Whether data should be consumed (processed) by this node
5//! - Whether data should be forwarded to other nodes
6//! - Which peer should receive forwarded data
7//!
8//! ## Message Deduplication
9//!
10//! The router includes optional message deduplication to prevent routing loops.
11//! When enabled, each packet's ID is tracked and duplicate packets are automatically
12//! dropped. The deduplication cache uses a time-based eviction strategy.
13
14use super::packet::{DataDirection, DataPacket};
15use crate::beacon::HierarchyLevel;
16use crate::hierarchy::NodeRole;
17use crate::topology::TopologyState;
18use std::collections::HashMap;
19use std::sync::{Arc, RwLock};
20use std::time::{Duration, Instant};
21use tracing::{debug, trace, warn};
22
23/// Routing decision result
24#[derive(Debug, Clone, PartialEq)]
25pub enum RoutingDecision {
26    /// Consume (process) the data locally
27    Consume,
28
29    /// Forward the data to a specific peer
30    Forward { next_hop: String },
31
32    /// Consume locally AND forward to peer
33    ConsumeAndForward { next_hop: String },
34
35    /// Forward the data to multiple peers (multicast/broadcast)
36    ForwardMulticast { next_hops: Vec<String> },
37
38    /// Consume locally AND forward to multiple peers (multicast/broadcast)
39    ConsumeAndForwardMulticast { next_hops: Vec<String> },
40
41    /// Drop the packet (reached max hops or no route)
42    Drop,
43}
44
45/// Configuration for message deduplication
46#[derive(Debug, Clone)]
47pub struct DeduplicationConfig {
48    /// Whether deduplication is enabled
49    pub enabled: bool,
50    /// How long to remember seen packet IDs (default: 5 minutes)
51    pub ttl: Duration,
52    /// Maximum number of packet IDs to track (default: 10000)
53    pub max_entries: usize,
54}
55
56impl Default for DeduplicationConfig {
57    fn default() -> Self {
58        Self {
59            enabled: true,
60            ttl: Duration::from_secs(300), // 5 minutes
61            max_entries: 10000,
62        }
63    }
64}
65
66/// Entry in the deduplication cache
67#[derive(Debug, Clone)]
68struct DeduplicationEntry {
69    /// When this packet was first seen
70    first_seen: Instant,
71}
72
73/// Selective router for hierarchical mesh networks
74///
75/// Makes intelligent routing decisions based on:
76/// - Node's position in hierarchy (level and role)
77/// - Data direction (upward/downward/lateral)
78/// - Topology state (selected peer, linked peers, lateral peers)
79///
80/// # Message Deduplication
81///
82/// The router can optionally track seen packet IDs to prevent routing loops.
83/// Use `new_with_deduplication()` to enable this feature.
84///
85/// # Example
86///
87/// ```ignore
88/// use peat_mesh::routing::{SelectiveRouter, DataPacket, DeduplicationConfig};
89/// use peat_mesh::topology::TopologyState;
90///
91/// // Create router with deduplication enabled
92/// let router = SelectiveRouter::new_with_deduplication(DeduplicationConfig::default());
93/// let state = get_topology_state();
94/// let packet = DataPacket::telemetry("node-123", vec![1, 2, 3]);
95///
96/// // Route will automatically deduplicate
97/// let decision = router.route(&packet, &state, "this-node");
98///
99/// // Second call with same packet returns Drop (duplicate)
100/// let decision2 = router.route(&packet, &state, "this-node");
101/// assert_eq!(decision2, RoutingDecision::Drop);
102/// ```
103pub struct SelectiveRouter {
104    /// Enable verbose logging for debugging
105    verbose: bool,
106    /// Deduplication configuration
107    dedup_config: DeduplicationConfig,
108    /// Cache of seen packet IDs (packet_id -> entry)
109    seen_packets: Arc<RwLock<HashMap<String, DeduplicationEntry>>>,
110}
111
112impl SelectiveRouter {
113    /// Create a new selective router (deduplication disabled by default)
114    pub fn new() -> Self {
115        Self {
116            verbose: false,
117            dedup_config: DeduplicationConfig {
118                enabled: false,
119                ..Default::default()
120            },
121            seen_packets: Arc::new(RwLock::new(HashMap::new())),
122        }
123    }
124
125    /// Create a new selective router with verbose logging
126    pub fn new_verbose() -> Self {
127        Self {
128            verbose: true,
129            dedup_config: DeduplicationConfig {
130                enabled: false,
131                ..Default::default()
132            },
133            seen_packets: Arc::new(RwLock::new(HashMap::new())),
134        }
135    }
136
137    /// Create a new selective router with deduplication enabled
138    pub fn new_with_deduplication(config: DeduplicationConfig) -> Self {
139        Self {
140            verbose: false,
141            dedup_config: config,
142            seen_packets: Arc::new(RwLock::new(HashMap::new())),
143        }
144    }
145
146    /// Check if a packet has been seen before (for deduplication)
147    ///
148    /// Returns `true` if this is a duplicate packet that should be dropped.
149    /// If the packet is new, it's added to the seen cache.
150    fn is_duplicate(&self, packet_id: &str) -> bool {
151        if !self.dedup_config.enabled {
152            return false;
153        }
154
155        let now = Instant::now();
156
157        // Try to insert into cache
158        let mut cache = self.seen_packets.write().unwrap();
159
160        // Check if already seen and not expired
161        if let Some(entry) = cache.get(packet_id) {
162            if now.duration_since(entry.first_seen) < self.dedup_config.ttl {
163                if self.verbose {
164                    debug!("Duplicate packet detected: {}", packet_id);
165                }
166                return true;
167            }
168            // Entry expired, will be replaced below
169        }
170
171        // Evict expired entries if cache is getting full
172        if cache.len() >= self.dedup_config.max_entries {
173            self.evict_expired(&mut cache, now);
174
175            // If still full after eviction, remove oldest entry
176            if cache.len() >= self.dedup_config.max_entries {
177                if let Some(oldest_key) = cache
178                    .iter()
179                    .min_by_key(|(_, entry)| entry.first_seen)
180                    .map(|(k, _)| k.clone())
181                {
182                    cache.remove(&oldest_key);
183                }
184            }
185        }
186
187        // Record this packet
188        cache.insert(
189            packet_id.to_string(),
190            DeduplicationEntry { first_seen: now },
191        );
192
193        false
194    }
195
196    /// Evict expired entries from the cache
197    fn evict_expired(&self, cache: &mut HashMap<String, DeduplicationEntry>, now: Instant) {
198        cache.retain(|_, entry| now.duration_since(entry.first_seen) < self.dedup_config.ttl);
199    }
200
201    /// Get the number of entries in the deduplication cache
202    pub fn dedup_cache_size(&self) -> usize {
203        self.seen_packets.read().unwrap().len()
204    }
205
206    /// Clear the deduplication cache
207    pub fn clear_dedup_cache(&self) {
208        self.seen_packets.write().unwrap().clear();
209    }
210
211    /// Make a complete routing decision for a packet
212    ///
213    /// This is the primary entry point that combines should_consume,
214    /// should_forward, and next_hop into a single decision.
215    ///
216    /// If deduplication is enabled, duplicate packets are automatically dropped.
217    ///
218    /// # Arguments
219    ///
220    /// * `packet` - The data packet to route
221    /// * `state` - Current topology state
222    /// * `this_node_id` - This node's identifier
223    ///
224    /// # Returns
225    ///
226    /// RoutingDecision indicating what to do with the packet
227    pub fn route(
228        &self,
229        packet: &DataPacket,
230        state: &TopologyState,
231        this_node_id: &str,
232    ) -> RoutingDecision {
233        // Check for duplicate packet (if deduplication enabled)
234        if self.is_duplicate(&packet.packet_id) {
235            if self.verbose {
236                debug!("Packet {} is a duplicate, dropping", packet.packet_id);
237            }
238            return RoutingDecision::Drop;
239        }
240
241        // Check if packet has reached max hops
242        if packet.at_max_hops() {
243            if self.verbose {
244                warn!(
245                    "Packet {} reached max hops ({}), dropping",
246                    packet.packet_id, packet.max_hops
247                );
248            }
249            return RoutingDecision::Drop;
250        }
251
252        // Check if we're the source (don't route our own packets back to us)
253        if packet.source_node_id == this_node_id {
254            if self.verbose {
255                trace!(
256                    "Packet {} originated from us, not routing",
257                    packet.packet_id
258                );
259            }
260            return RoutingDecision::Drop;
261        }
262
263        let should_consume = self.should_consume(packet, state, this_node_id);
264        let should_forward = self.should_forward(packet, state);
265
266        if should_consume && should_forward {
267            // Both consume and forward - check if multicast needed
268            let next_hops = self.next_hops(packet, state);
269            if next_hops.is_empty() {
270                // Can't forward without next hop, just consume
271                if self.verbose {
272                    debug!("Packet {}: Consume only (no next hop)", packet.packet_id);
273                }
274                RoutingDecision::Consume
275            } else if next_hops.len() == 1 {
276                // Single next hop - use unicast variant
277                if self.verbose {
278                    debug!(
279                        "Packet {}: Consume and forward to {}",
280                        packet.packet_id, next_hops[0]
281                    );
282                }
283                RoutingDecision::ConsumeAndForward {
284                    next_hop: next_hops[0].clone(),
285                }
286            } else {
287                // Multiple next hops - use multicast variant
288                if self.verbose {
289                    debug!(
290                        "Packet {}: Consume and multicast to {} peers",
291                        packet.packet_id,
292                        next_hops.len()
293                    );
294                }
295                RoutingDecision::ConsumeAndForwardMulticast { next_hops }
296            }
297        } else if should_consume {
298            if self.verbose {
299                debug!("Packet {}: Consume only", packet.packet_id);
300            }
301            RoutingDecision::Consume
302        } else if should_forward {
303            let next_hops = self.next_hops(packet, state);
304            if next_hops.is_empty() {
305                if self.verbose {
306                    warn!(
307                        "Packet {}: Should forward but no next hop, dropping",
308                        packet.packet_id
309                    );
310                }
311                RoutingDecision::Drop
312            } else if next_hops.len() == 1 {
313                // Single next hop - use unicast variant
314                if self.verbose {
315                    debug!("Packet {}: Forward to {}", packet.packet_id, next_hops[0]);
316                }
317                RoutingDecision::Forward {
318                    next_hop: next_hops[0].clone(),
319                }
320            } else {
321                // Multiple next hops - use multicast variant
322                if self.verbose {
323                    debug!(
324                        "Packet {}: Multicast to {} peers",
325                        packet.packet_id,
326                        next_hops.len()
327                    );
328                }
329                RoutingDecision::ForwardMulticast { next_hops }
330            }
331        } else {
332            if self.verbose {
333                debug!("Packet {}: Drop (not for us)", packet.packet_id);
334            }
335            RoutingDecision::Drop
336        }
337    }
338
339    /// Determine if this node should consume (process) the packet
340    ///
341    /// # Consumption Rules
342    ///
343    /// **Upward (Telemetry)**
344    /// - Always consume telemetry for local processing/aggregation
345    ///
346    /// **Downward (Commands)**
347    /// - Consume if packet is addressed to us
348    /// - Leaders consume commands for their squad
349    ///
350    /// **Lateral (Coordination)**
351    /// - Leaders consume coordination messages
352    /// - Members typically don't consume lateral messages
353    ///
354    /// # Arguments
355    ///
356    /// * `packet` - The data packet
357    /// * `state` - Current topology state
358    /// * `this_node_id` - This node's identifier
359    ///
360    /// # Returns
361    ///
362    /// `true` if this node should process the packet
363    pub fn should_consume(
364        &self,
365        packet: &DataPacket,
366        state: &TopologyState,
367        this_node_id: &str,
368    ) -> bool {
369        match packet.direction {
370            DataDirection::Upward => {
371                // Upward data (telemetry, status): Always consume for aggregation
372                // Every node in the path can aggregate/process
373                true
374            }
375
376            DataDirection::Downward => {
377                // Downward data (commands, config): Consume if targeted at us
378                if let Some(ref dest) = packet.destination_node_id {
379                    if dest == this_node_id {
380                        return true;
381                    }
382                }
383
384                // Leaders consume commands even if not directly targeted
385                // (they may need to disseminate to squad members)
386                matches!(state.role, NodeRole::Leader)
387            }
388
389            DataDirection::Lateral => {
390                // Lateral data (coordination): Only Leaders typically consume
391                if let Some(ref dest) = packet.destination_node_id {
392                    // Consume only if directly addressed to us
393                    dest == this_node_id
394                } else {
395                    // No specific destination (broadcast): Leaders consume
396                    matches!(state.role, NodeRole::Leader)
397                }
398            }
399        }
400    }
401
402    /// Determine if this node should forward the packet
403    ///
404    /// # Forwarding Rules
405    ///
406    /// **Upward (Telemetry)**
407    /// - Forward if we have a selected peer (parent in hierarchy)
408    /// - Don't forward if we're at HQ level (no parent)
409    ///
410    /// **Downward (Commands)**
411    /// - Forward if we have linked peers (children) that need this data
412    /// - Don't forward if we're a leaf node (no children)
413    ///
414    /// **Lateral (Coordination)**
415    /// - Forward if addressed to a lateral peer we track
416    /// - Leaders may forward to other Leaders at same level
417    ///
418    /// # Arguments
419    ///
420    /// * `packet` - The data packet
421    /// * `state` - Current topology state
422    ///
423    /// # Returns
424    ///
425    /// `true` if packet should be forwarded to another peer
426    pub fn should_forward(&self, packet: &DataPacket, state: &TopologyState) -> bool {
427        match packet.direction {
428            DataDirection::Upward => {
429                // Forward upward if we have a selected peer (parent)
430                state.selected_peer.is_some()
431            }
432
433            DataDirection::Downward => {
434                // Forward downward if we have linked peers (children)
435                !state.linked_peers.is_empty()
436            }
437
438            DataDirection::Lateral => {
439                // Forward laterally if addressed to a peer we know
440                if let Some(ref dest) = packet.destination_node_id {
441                    state.lateral_peers.contains_key(dest)
442                } else {
443                    // Broadcast lateral messages if we're a Leader with lateral peers
444                    matches!(state.role, NodeRole::Leader) && !state.lateral_peers.is_empty()
445                }
446            }
447        }
448    }
449
450    /// Determine the next hop for forwarding the packet
451    ///
452    /// # Next Hop Selection
453    ///
454    /// **Upward**: selected_peer (parent in hierarchy)
455    /// **Downward**: linked_peers (children) - for now, return first child
456    /// **Lateral**: lateral_peers - specific peer if addressed, or first if broadcast
457    ///
458    /// # Arguments
459    ///
460    /// * `packet` - The data packet
461    /// * `state` - Current topology state
462    ///
463    /// # Returns
464    ///
465    /// Node ID of the next hop, or None if no valid next hop
466    pub fn next_hop(&self, packet: &DataPacket, state: &TopologyState) -> Option<String> {
467        match packet.direction {
468            DataDirection::Upward => {
469                // Upward: Route to selected peer (parent)
470                state
471                    .selected_peer
472                    .as_ref()
473                    .map(|peer| peer.node_id.clone())
474            }
475
476            DataDirection::Downward => {
477                // Downward: Route to linked peers (children)
478                // If addressed to specific child, route there
479                if let Some(ref dest) = packet.destination_node_id {
480                    if state.linked_peers.contains_key(dest) {
481                        return Some(dest.clone());
482                    }
483                }
484
485                // Otherwise, return first linked peer for backward compatibility
486                // For multicast/broadcast, use next_hops() instead
487                state.linked_peers.keys().next().cloned()
488            }
489
490            DataDirection::Lateral => {
491                // Lateral: Route to lateral peers
492                if let Some(ref dest) = packet.destination_node_id {
493                    // Route to specific lateral peer if we track them
494                    if state.lateral_peers.contains_key(dest) {
495                        return Some(dest.clone());
496                    }
497                }
498
499                // Otherwise, route to first lateral peer for backward compatibility
500                state.lateral_peers.keys().next().cloned()
501            }
502        }
503    }
504
505    /// Determine all next hops for multicast/broadcast forwarding
506    ///
507    /// Returns all appropriate peers for scenarios requiring multicast:
508    /// - Downward command dissemination to all children
509    /// - Lateral coordination broadcast to all peers at same level
510    ///
511    /// # Next Hops Selection
512    ///
513    /// **Upward**: Returns selected_peer (parent) as single-element vector
514    /// **Downward**: Returns all linked_peers (children) for broadcast
515    /// **Lateral**: Returns all lateral_peers for broadcast
516    ///
517    /// # Arguments
518    ///
519    /// * `packet` - The data packet
520    /// * `state` - Current topology state
521    ///
522    /// # Returns
523    ///
524    /// Vector of node IDs to forward to (empty if no valid hops)
525    pub fn next_hops(&self, packet: &DataPacket, state: &TopologyState) -> Vec<String> {
526        match packet.direction {
527            DataDirection::Upward => {
528                // Upward: Return selected peer (parent) as single-element vector
529                state
530                    .selected_peer
531                    .as_ref()
532                    .map(|peer| vec![peer.node_id.clone()])
533                    .unwrap_or_default()
534            }
535
536            DataDirection::Downward => {
537                // Downward: Route to all linked peers (children) for broadcast
538                // If addressed to specific child, route only there
539                if let Some(ref dest) = packet.destination_node_id {
540                    if state.linked_peers.contains_key(dest) {
541                        return vec![dest.clone()];
542                    }
543                }
544
545                // Otherwise, route to ALL linked peers (multicast/broadcast)
546                state.linked_peers.keys().cloned().collect()
547            }
548
549            DataDirection::Lateral => {
550                // Lateral: Route to all lateral peers for broadcast
551                if let Some(ref dest) = packet.destination_node_id {
552                    // Route to specific lateral peer if we track them
553                    if state.lateral_peers.contains_key(dest) {
554                        return vec![dest.clone()];
555                    }
556                }
557
558                // Otherwise, route to ALL lateral peers (broadcast)
559                state.lateral_peers.keys().cloned().collect()
560            }
561        }
562    }
563
564    /// Check if this node is at the hierarchy level that should aggregate
565    ///
566    /// HQ nodes (Company level) should aggregate and consume
567    /// without further forwarding.
568    #[allow(dead_code)]
569    fn is_hq_level(&self, level: HierarchyLevel) -> bool {
570        matches!(level, HierarchyLevel::Company)
571    }
572
573    /// Check if a packet should be aggregated before forwarding
574    ///
575    /// Aggregation is appropriate when:
576    /// - Packet data type requires aggregation (Telemetry, Status)
577    /// - Routing decision is ConsumeAndForward (intermediate node)
578    /// - Node is a Leader (squad leader aggregating member data)
579    ///
580    /// # Integration with Aggregator
581    ///
582    /// When this returns true, the application should:
583    /// 1. Collect telemetry packets from squad members (batching)
584    /// 2. Use `Aggregator::aggregate_telemetry()` to create aggregated packet
585    /// 3. Route the aggregated packet upward using this router
586    ///
587    /// # Example
588    ///
589    /// ```ignore
590    /// use peat_mesh::routing::{SelectiveRouter, Aggregator, DataPacket};
591    ///
592    /// let router = SelectiveRouter::new();
593    /// // let aggregator = MyAggregator::new();
594    ///
595    /// // Collect telemetry from squad members
596    /// let mut squad_telemetry = Vec::new();
597    /// for packet in incoming_packets {
598    ///     let decision = router.route(&packet, &state, "platoon-leader");
599    ///     if router.should_aggregate(&packet, &decision, &state) {
600    ///         squad_telemetry.push(packet);
601    ///     }
602    /// }
603    ///
604    /// // Aggregate when we have enough data
605    /// if squad_telemetry.len() >= 3 {
606    ///     let aggregated = aggregator.aggregate_telemetry(
607    ///         "squad-1",
608    ///         "platoon-leader",
609    ///         squad_telemetry,
610    ///     )?;
611    ///
612    ///     // Route aggregated packet upward
613    ///     let decision = router.route(&aggregated, &state, "platoon-leader");
614    ///     // ... forward to parent
615    /// }
616    /// ```
617    ///
618    /// # Arguments
619    ///
620    /// * `packet` - The data packet to check
621    /// * `decision` - The routing decision for this packet
622    /// * `state` - Current topology state
623    ///
624    /// # Returns
625    ///
626    /// `true` if this packet should be aggregated before forwarding
627    pub fn should_aggregate(
628        &self,
629        packet: &DataPacket,
630        decision: &RoutingDecision,
631        state: &TopologyState,
632    ) -> bool {
633        // Only aggregate if we're consuming and forwarding (intermediate node)
634        if !matches!(decision, RoutingDecision::ConsumeAndForward { .. }) {
635            return false;
636        }
637
638        // Only aggregate data types that require it
639        if !packet.data_type.requires_aggregation() {
640            return false;
641        }
642
643        // Only Leaders aggregate squad member data
644        matches!(state.role, NodeRole::Leader)
645    }
646}
647
648impl Default for SelectiveRouter {
649    fn default() -> Self {
650        Self::new()
651    }
652}
653
654#[cfg(test)]
655mod tests {
656    use super::*;
657    use crate::beacon::{GeoPosition, GeographicBeacon};
658    use crate::routing::packet::{DataDirection, DataType};
659    use crate::topology::SelectedPeer;
660    use std::collections::HashMap;
661    use std::time::Instant;
662
663    fn create_test_state(
664        hierarchy_level: HierarchyLevel,
665        role: NodeRole,
666        has_selected_peer: bool,
667        num_linked_peers: usize,
668        num_lateral_peers: usize,
669    ) -> TopologyState {
670        let selected_peer = if has_selected_peer {
671            Some(SelectedPeer {
672                node_id: "parent-node".to_string(),
673                beacon: GeographicBeacon::new(
674                    "parent-node".to_string(),
675                    GeoPosition::new(37.7749, -122.4194),
676                    HierarchyLevel::Platoon,
677                ),
678                selected_at: Instant::now(),
679            })
680        } else {
681            None
682        };
683
684        let mut linked_peers = HashMap::new();
685        for i in 0..num_linked_peers {
686            linked_peers.insert(format!("linked-peer-{}", i), Instant::now());
687        }
688
689        let mut lateral_peers = HashMap::new();
690        for i in 0..num_lateral_peers {
691            lateral_peers.insert(format!("lateral-peer-{}", i), Instant::now());
692        }
693
694        TopologyState {
695            selected_peer,
696            linked_peers,
697            lateral_peers,
698            role,
699            hierarchy_level,
700        }
701    }
702
703    #[test]
704    fn test_upward_telemetry_leaf_node() {
705        let router = SelectiveRouter::new();
706        let state = create_test_state(HierarchyLevel::Squad, NodeRole::Member, true, 0, 0);
707        let packet = DataPacket::telemetry("sensor-1", vec![1, 2, 3]);
708
709        // Leaf node should consume telemetry
710        assert!(router.should_consume(&packet, &state, "this-node"));
711
712        // Leaf node with parent should forward
713        assert!(router.should_forward(&packet, &state));
714
715        // Next hop should be parent
716        let next_hop = router.next_hop(&packet, &state);
717        assert_eq!(next_hop, Some("parent-node".to_string()));
718    }
719
720    #[test]
721    fn test_upward_telemetry_hq_node() {
722        let router = SelectiveRouter::new();
723        // HQ node (no selected peer = highest level)
724        let state = create_test_state(HierarchyLevel::Company, NodeRole::Leader, false, 3, 0);
725        let packet = DataPacket::telemetry("sensor-1", vec![1, 2, 3]);
726
727        // HQ should consume telemetry
728        assert!(router.should_consume(&packet, &state, "hq-node"));
729
730        // HQ should NOT forward (no parent)
731        assert!(!router.should_forward(&packet, &state));
732    }
733
734    #[test]
735    fn test_downward_command_to_leader() {
736        let router = SelectiveRouter::new();
737        let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Leader, true, 3, 0);
738        let packet = DataPacket::command("hq", "platoon-leader", vec![4, 5, 6]);
739
740        // Leader should consume command addressed to them
741        assert!(router.should_consume(&packet, &state, "platoon-leader"));
742
743        // Leader with children should forward
744        assert!(router.should_forward(&packet, &state));
745
746        // Next hop should be one of the linked peers (children)
747        let next_hop = router.next_hop(&packet, &state);
748        assert!(next_hop.is_some());
749        assert!(next_hop.unwrap().starts_with("linked-peer-"));
750    }
751
752    #[test]
753    fn test_downward_command_to_leaf() {
754        let router = SelectiveRouter::new();
755        let state = create_test_state(HierarchyLevel::Squad, NodeRole::Member, true, 0, 0);
756        let packet = DataPacket::command("hq", "squad-member", vec![4, 5, 6]);
757
758        // Member should consume command addressed to them
759        assert!(router.should_consume(&packet, &state, "squad-member"));
760
761        // Leaf node should NOT forward (no children)
762        assert!(!router.should_forward(&packet, &state));
763    }
764
765    #[test]
766    fn test_lateral_coordination_between_leaders() {
767        let router = SelectiveRouter::new();
768        let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Leader, true, 2, 3);
769        let packet = DataPacket::coordination("platoon-1", "lateral-peer-0", vec![7, 8, 9]);
770
771        // Leader should NOT consume lateral coordination if not addressed to them
772        assert!(!router.should_consume(&packet, &state, "platoon-3"));
773
774        // Should forward if addressed to a lateral peer we track
775        let state_with_target =
776            create_test_state(HierarchyLevel::Platoon, NodeRole::Leader, true, 2, 3);
777        assert!(router.should_forward(&packet, &state_with_target));
778    }
779
780    #[test]
781    fn test_max_hops_drop() {
782        let router = SelectiveRouter::new();
783        let state = create_test_state(HierarchyLevel::Squad, NodeRole::Member, true, 0, 0);
784        let mut packet = DataPacket::telemetry("sensor-1", vec![1, 2, 3]);
785
786        // Increment hops to max
787        for _ in 0..10 {
788            packet.increment_hop();
789        }
790
791        // Routing should return Drop when at max hops
792        let decision = router.route(&packet, &state, "this-node");
793        assert_eq!(decision, RoutingDecision::Drop);
794    }
795
796    #[test]
797    fn test_routing_decision_consume_and_forward() {
798        let router = SelectiveRouter::new();
799        // Intermediate node with parent and children
800        let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Leader, true, 3, 0);
801        let packet = DataPacket::telemetry("sensor-1", vec![1, 2, 3]);
802
803        let decision = router.route(&packet, &state, "platoon-leader");
804
805        // Should consume and forward
806        match decision {
807            RoutingDecision::ConsumeAndForward { next_hop } => {
808                assert_eq!(next_hop, "parent-node");
809            }
810            _ => panic!("Expected ConsumeAndForward, got {:?}", decision),
811        }
812    }
813
814    #[test]
815    fn test_routing_decision_consume_only() {
816        let router = SelectiveRouter::new();
817        // HQ node (no parent)
818        let state = create_test_state(HierarchyLevel::Company, NodeRole::Leader, false, 3, 0);
819        let packet = DataPacket::telemetry("sensor-1", vec![1, 2, 3]);
820
821        let decision = router.route(&packet, &state, "hq-node");
822
823        // Should consume only (no forwarding)
824        assert_eq!(decision, RoutingDecision::Consume);
825    }
826
827    #[test]
828    fn test_dont_route_own_packets() {
829        let router = SelectiveRouter::new();
830        let state = create_test_state(HierarchyLevel::Squad, NodeRole::Member, true, 0, 0);
831        let packet = DataPacket::telemetry("this-node", vec![1, 2, 3]);
832
833        // Should not route our own packets back to us
834        let decision = router.route(&packet, &state, "this-node");
835        assert_eq!(decision, RoutingDecision::Drop);
836    }
837
838    #[test]
839    fn test_should_aggregate_intermediate_leader() {
840        let router = SelectiveRouter::new();
841        // Intermediate Leader node (has parent and children)
842        let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Leader, true, 3, 0);
843        let packet = DataPacket::telemetry("squad-member-1", vec![1, 2, 3]);
844
845        let decision = router.route(&packet, &state, "platoon-leader");
846
847        // Should aggregate: Leader with ConsumeAndForward decision
848        assert!(router.should_aggregate(&packet, &decision, &state));
849    }
850
851    #[test]
852    fn test_should_not_aggregate_hq_node() {
853        let router = SelectiveRouter::new();
854        // HQ node (no parent, just consumes)
855        let state = create_test_state(HierarchyLevel::Company, NodeRole::Leader, false, 3, 0);
856        let packet = DataPacket::telemetry("platoon-1", vec![1, 2, 3]);
857
858        let decision = router.route(&packet, &state, "hq-node");
859
860        // Should NOT aggregate: Decision is Consume only (not ConsumeAndForward)
861        assert!(!router.should_aggregate(&packet, &decision, &state));
862    }
863
864    #[test]
865    fn test_should_not_aggregate_non_leader() {
866        let router = SelectiveRouter::new();
867        // Member node (not a Leader)
868        let state = create_test_state(HierarchyLevel::Squad, NodeRole::Member, true, 0, 0);
869        let packet = DataPacket::telemetry("sensor-1", vec![1, 2, 3]);
870
871        let decision = router.route(&packet, &state, "squad-member");
872
873        // Should NOT aggregate: Not a Leader
874        assert!(!router.should_aggregate(&packet, &decision, &state));
875    }
876
877    #[test]
878    fn test_should_not_aggregate_command_packet() {
879        let router = SelectiveRouter::new();
880        // Leader node
881        let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Leader, true, 3, 0);
882        let packet = DataPacket::command("hq", "platoon-leader", vec![4, 5, 6]);
883
884        let decision = router.route(&packet, &state, "platoon-leader");
885
886        // Should NOT aggregate: Command packets don't require aggregation
887        assert!(!router.should_aggregate(&packet, &decision, &state));
888    }
889
890    // ============================================================================
891    // Week 10: Multicast/Broadcast Routing Tests
892    // ============================================================================
893
894    #[test]
895    fn test_next_hops_upward_single_parent() {
896        let router = SelectiveRouter::new();
897        let state = create_test_state(HierarchyLevel::Squad, NodeRole::Member, true, 0, 0);
898        let packet = DataPacket::telemetry("sensor-1", vec![1, 2, 3]);
899
900        // Upward should return selected_peer as single-element vector
901        let next_hops = router.next_hops(&packet, &state);
902        assert_eq!(next_hops.len(), 1);
903        assert_eq!(next_hops[0], "parent-node");
904    }
905
906    #[test]
907    fn test_next_hops_upward_no_parent() {
908        let router = SelectiveRouter::new();
909        // HQ node with no parent
910        let state = create_test_state(HierarchyLevel::Company, NodeRole::Leader, false, 3, 0);
911        let packet = DataPacket::telemetry("sensor-1", vec![1, 2, 3]);
912
913        // No parent means empty vector
914        let next_hops = router.next_hops(&packet, &state);
915        assert_eq!(next_hops.len(), 0);
916    }
917
918    #[test]
919    fn test_next_hops_downward_multicast() {
920        let router = SelectiveRouter::new();
921        // Platoon leader with 5 linked peers (children)
922        let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Leader, true, 5, 0);
923        // Create broadcast command packet (no specific destination)
924        let packet = DataPacket {
925            packet_id: uuid::Uuid::new_v4().to_string(),
926            source_node_id: "hq".to_string(),
927            destination_node_id: None, // Broadcast
928            data_type: DataType::Command,
929            direction: DataDirection::Downward,
930            hop_count: 0,
931            max_hops: 10,
932            payload: vec![4, 5, 6],
933        };
934
935        // Downward broadcast should return ALL linked peers
936        let next_hops = router.next_hops(&packet, &state);
937        assert_eq!(next_hops.len(), 5);
938        for i in 0..5 {
939            assert!(next_hops.contains(&format!("linked-peer-{}", i)));
940        }
941    }
942
943    #[test]
944    fn test_next_hops_downward_targeted() {
945        let router = SelectiveRouter::new();
946        // Platoon leader with 3 linked peers
947        let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Leader, true, 3, 0);
948        let packet = DataPacket::command("hq", "linked-peer-1", vec![4, 5, 6]);
949
950        // Targeted downward should return only the specific child
951        let next_hops = router.next_hops(&packet, &state);
952        assert_eq!(next_hops.len(), 1);
953        assert_eq!(next_hops[0], "linked-peer-1");
954    }
955
956    #[test]
957    fn test_next_hops_lateral_multicast() {
958        let router = SelectiveRouter::new();
959        // Leader with 4 lateral peers
960        let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Leader, true, 2, 4);
961        // Create broadcast coordination packet (no specific destination)
962        let packet = DataPacket {
963            packet_id: uuid::Uuid::new_v4().to_string(),
964            source_node_id: "platoon-1".to_string(),
965            destination_node_id: None, // Broadcast
966            data_type: DataType::Coordination,
967            direction: DataDirection::Lateral,
968            hop_count: 0,
969            max_hops: 3,
970            payload: vec![7, 8, 9],
971        };
972
973        // Lateral broadcast should return ALL lateral peers
974        let next_hops = router.next_hops(&packet, &state);
975        assert_eq!(next_hops.len(), 4);
976        for i in 0..4 {
977            assert!(next_hops.contains(&format!("lateral-peer-{}", i)));
978        }
979    }
980
981    #[test]
982    fn test_next_hops_lateral_targeted() {
983        let router = SelectiveRouter::new();
984        // Leader with 3 lateral peers
985        let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Leader, true, 2, 3);
986        let packet = DataPacket::coordination("platoon-1", "lateral-peer-2", vec![7, 8, 9]);
987
988        // Targeted lateral should return only specific peer
989        let next_hops = router.next_hops(&packet, &state);
990        assert_eq!(next_hops.len(), 1);
991        assert_eq!(next_hops[0], "lateral-peer-2");
992    }
993
994    #[test]
995    fn test_route_downward_multicast() {
996        let router = SelectiveRouter::new();
997        // HQ node (Leader) with 3 children, broadcasting command
998        let state = create_test_state(HierarchyLevel::Company, NodeRole::Leader, false, 3, 0);
999        // Create broadcast command packet (no specific destination)
1000        let packet = DataPacket {
1001            packet_id: uuid::Uuid::new_v4().to_string(),
1002            source_node_id: "hq".to_string(),
1003            destination_node_id: None, // Broadcast
1004            data_type: DataType::Command,
1005            direction: DataDirection::Downward,
1006            hop_count: 0,
1007            max_hops: 10,
1008            payload: vec![4, 5, 6],
1009        };
1010
1011        let decision = router.route(&packet, &state, "hq-node");
1012
1013        // Leaders should consume broadcast commands AND multicast to all children
1014        match decision {
1015            RoutingDecision::ConsumeAndForwardMulticast { next_hops } => {
1016                assert_eq!(next_hops.len(), 3);
1017                assert!(next_hops.contains(&"linked-peer-0".to_string()));
1018                assert!(next_hops.contains(&"linked-peer-1".to_string()));
1019                assert!(next_hops.contains(&"linked-peer-2".to_string()));
1020            }
1021            _ => panic!("Expected ConsumeAndForwardMulticast, got {:?}", decision),
1022        }
1023    }
1024
1025    #[test]
1026    fn test_route_downward_consume_and_multicast() {
1027        let router = SelectiveRouter::new();
1028        // Platoon leader with 4 children, receiving command addressed to them
1029        let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Leader, true, 4, 0);
1030        let packet = DataPacket::command("hq", "platoon-leader", vec![4, 5, 6]);
1031
1032        let decision = router.route(&packet, &state, "platoon-leader");
1033
1034        // Should consume (addressed to us) AND multicast to all children
1035        match decision {
1036            RoutingDecision::ConsumeAndForwardMulticast { next_hops } => {
1037                assert_eq!(next_hops.len(), 4);
1038                for i in 0..4 {
1039                    assert!(next_hops.contains(&format!("linked-peer-{}", i)));
1040                }
1041            }
1042            _ => panic!("Expected ConsumeAndForwardMulticast, got {:?}", decision),
1043        }
1044    }
1045
1046    #[test]
1047    fn test_route_lateral_multicast() {
1048        let router = SelectiveRouter::new();
1049        // Leader with 3 lateral peers, broadcasting coordination
1050        let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Leader, true, 0, 3);
1051        // Create broadcast coordination packet (no specific destination)
1052        let packet = DataPacket {
1053            packet_id: uuid::Uuid::new_v4().to_string(),
1054            source_node_id: "platoon-1".to_string(),
1055            destination_node_id: None, // Broadcast
1056            data_type: DataType::Coordination,
1057            direction: DataDirection::Lateral,
1058            hop_count: 0,
1059            max_hops: 3,
1060            payload: vec![7, 8, 9],
1061        };
1062
1063        let decision = router.route(&packet, &state, "platoon-3");
1064
1065        // Leaders should consume broadcast coordination AND forward to all lateral peers
1066        match decision {
1067            RoutingDecision::ConsumeAndForwardMulticast { next_hops } => {
1068                assert_eq!(next_hops.len(), 3);
1069                assert!(next_hops.contains(&"lateral-peer-0".to_string()));
1070                assert!(next_hops.contains(&"lateral-peer-1".to_string()));
1071                assert!(next_hops.contains(&"lateral-peer-2".to_string()));
1072            }
1073            _ => panic!("Expected ConsumeAndForwardMulticast, got {:?}", decision),
1074        }
1075    }
1076
1077    #[test]
1078    fn test_route_downward_single_child_unicast() {
1079        let router = SelectiveRouter::new();
1080        // Leader with only 1 child
1081        let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Leader, false, 1, 0);
1082        // Create broadcast command packet (no specific destination)
1083        let packet = DataPacket {
1084            packet_id: uuid::Uuid::new_v4().to_string(),
1085            source_node_id: "hq".to_string(),
1086            destination_node_id: None, // Broadcast
1087            data_type: DataType::Command,
1088            direction: DataDirection::Downward,
1089            hop_count: 0,
1090            max_hops: 10,
1091            payload: vec![4, 5, 6],
1092        };
1093
1094        let decision = router.route(&packet, &state, "platoon-leader");
1095
1096        // Leaders consume broadcast commands, and with only 1 child use unicast variant
1097        match decision {
1098            RoutingDecision::ConsumeAndForward { next_hop } => {
1099                assert_eq!(next_hop, "linked-peer-0");
1100            }
1101            _ => panic!("Expected ConsumeAndForward (unicast), got {:?}", decision),
1102        }
1103    }
1104
1105    #[test]
1106    fn test_route_lateral_single_peer_unicast() {
1107        let router = SelectiveRouter::new();
1108        // Leader with only 1 lateral peer
1109        let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Leader, true, 0, 1);
1110        // Create broadcast coordination packet (no specific destination)
1111        let packet = DataPacket {
1112            packet_id: uuid::Uuid::new_v4().to_string(),
1113            source_node_id: "platoon-1".to_string(),
1114            destination_node_id: None, // Broadcast
1115            data_type: DataType::Coordination,
1116            direction: DataDirection::Lateral,
1117            hop_count: 0,
1118            max_hops: 3,
1119            payload: vec![7, 8, 9],
1120        };
1121
1122        let decision = router.route(&packet, &state, "platoon-3");
1123
1124        // Leaders consume broadcast coordination, and with only 1 lateral peer use unicast variant
1125        match decision {
1126            RoutingDecision::ConsumeAndForward { next_hop } => {
1127                assert_eq!(next_hop, "lateral-peer-0");
1128            }
1129            _ => panic!("Expected ConsumeAndForward (unicast), got {:?}", decision),
1130        }
1131    }
1132
1133    #[test]
1134    fn test_route_downward_no_children_drop() {
1135        let router = SelectiveRouter::new();
1136        // Leaf node with no children
1137        let state = create_test_state(HierarchyLevel::Squad, NodeRole::Member, true, 0, 0);
1138        // Create broadcast command packet (no specific destination)
1139        let packet = DataPacket {
1140            packet_id: uuid::Uuid::new_v4().to_string(),
1141            source_node_id: "hq".to_string(),
1142            destination_node_id: None, // Broadcast
1143            data_type: DataType::Command,
1144            direction: DataDirection::Downward,
1145            hop_count: 0,
1146            max_hops: 10,
1147            payload: vec![4, 5, 6],
1148        };
1149
1150        let decision = router.route(&packet, &state, "squad-member");
1151
1152        // With no children, downward broadcast should drop (or consume if addressed)
1153        // In this case, not addressed to us, so should drop
1154        assert_eq!(decision, RoutingDecision::Drop);
1155    }
1156
1157    #[test]
1158    fn test_multicast_preserves_backward_compatibility() {
1159        let router = SelectiveRouter::new();
1160        // Intermediate node with parent (upward routing)
1161        let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Leader, true, 3, 0);
1162        let packet = DataPacket::telemetry("sensor-1", vec![1, 2, 3]);
1163
1164        let decision = router.route(&packet, &state, "platoon-leader");
1165
1166        // Upward routing should still use ConsumeAndForward (unicast)
1167        match decision {
1168            RoutingDecision::ConsumeAndForward { next_hop } => {
1169                assert_eq!(next_hop, "parent-node");
1170            }
1171            _ => panic!(
1172                "Expected ConsumeAndForward (backward compat), got {:?}",
1173                decision
1174            ),
1175        }
1176
1177        // next_hop() should still work for backward compatibility
1178        let next_hop = router.next_hop(&packet, &state);
1179        assert_eq!(next_hop, Some("parent-node".to_string()));
1180    }
1181
1182    // ============================================================================
1183    // Message Deduplication Tests
1184    // ============================================================================
1185
1186    #[test]
1187    fn test_deduplication_disabled_by_default() {
1188        let router = SelectiveRouter::new();
1189        let state = create_test_state(HierarchyLevel::Squad, NodeRole::Member, true, 0, 0);
1190        let packet = DataPacket::telemetry("sensor-1", vec![1, 2, 3]);
1191
1192        // Route same packet twice - should NOT be dropped (dedup disabled)
1193        let decision1 = router.route(&packet, &state, "this-node");
1194        let decision2 = router.route(&packet, &state, "this-node");
1195
1196        // Both should route normally (ConsumeAndForward)
1197        assert!(matches!(
1198            decision1,
1199            RoutingDecision::ConsumeAndForward { .. }
1200        ));
1201        assert!(matches!(
1202            decision2,
1203            RoutingDecision::ConsumeAndForward { .. }
1204        ));
1205        assert_eq!(router.dedup_cache_size(), 0);
1206    }
1207
1208    #[test]
1209    fn test_deduplication_enabled() {
1210        let router = SelectiveRouter::new_with_deduplication(DeduplicationConfig::default());
1211        let state = create_test_state(HierarchyLevel::Squad, NodeRole::Member, true, 0, 0);
1212        let packet = DataPacket::telemetry("sensor-1", vec![1, 2, 3]);
1213
1214        // First route should succeed
1215        let decision1 = router.route(&packet, &state, "this-node");
1216        assert!(matches!(
1217            decision1,
1218            RoutingDecision::ConsumeAndForward { .. }
1219        ));
1220        assert_eq!(router.dedup_cache_size(), 1);
1221
1222        // Second route of same packet should be dropped
1223        let decision2 = router.route(&packet, &state, "this-node");
1224        assert_eq!(decision2, RoutingDecision::Drop);
1225        assert_eq!(router.dedup_cache_size(), 1); // No new entry added
1226    }
1227
1228    #[test]
1229    fn test_deduplication_different_packets() {
1230        let router = SelectiveRouter::new_with_deduplication(DeduplicationConfig::default());
1231        let state = create_test_state(HierarchyLevel::Squad, NodeRole::Member, true, 0, 0);
1232        let packet1 = DataPacket::telemetry("sensor-1", vec![1, 2, 3]);
1233        let packet2 = DataPacket::telemetry("sensor-2", vec![4, 5, 6]);
1234
1235        // Route two different packets - both should succeed
1236        let decision1 = router.route(&packet1, &state, "this-node");
1237        let decision2 = router.route(&packet2, &state, "this-node");
1238
1239        assert!(matches!(
1240            decision1,
1241            RoutingDecision::ConsumeAndForward { .. }
1242        ));
1243        assert!(matches!(
1244            decision2,
1245            RoutingDecision::ConsumeAndForward { .. }
1246        ));
1247        assert_eq!(router.dedup_cache_size(), 2);
1248    }
1249
1250    #[test]
1251    fn test_deduplication_cache_clear() {
1252        let router = SelectiveRouter::new_with_deduplication(DeduplicationConfig::default());
1253        let state = create_test_state(HierarchyLevel::Squad, NodeRole::Member, true, 0, 0);
1254        let packet = DataPacket::telemetry("sensor-1", vec![1, 2, 3]);
1255
1256        // Route packet
1257        let _ = router.route(&packet, &state, "this-node");
1258        assert_eq!(router.dedup_cache_size(), 1);
1259
1260        // Clear cache
1261        router.clear_dedup_cache();
1262        assert_eq!(router.dedup_cache_size(), 0);
1263
1264        // Should be able to route same packet again
1265        let decision = router.route(&packet, &state, "this-node");
1266        assert!(matches!(
1267            decision,
1268            RoutingDecision::ConsumeAndForward { .. }
1269        ));
1270    }
1271
1272    #[test]
1273    fn test_deduplication_config_defaults() {
1274        let config = DeduplicationConfig::default();
1275        assert!(config.enabled);
1276        assert_eq!(config.ttl, Duration::from_secs(300));
1277        assert_eq!(config.max_entries, 10000);
1278    }
1279
1280    #[test]
1281    fn test_verbose_router() {
1282        let router = SelectiveRouter::new_verbose();
1283        let state = create_test_state(HierarchyLevel::Squad, NodeRole::Member, true, 0, 0);
1284        let packet = DataPacket::telemetry("sensor-1", vec![1, 2, 3]);
1285
1286        // Should work the same as non-verbose, just with logging
1287        let decision = router.route(&packet, &state, "this-node");
1288        assert!(matches!(
1289            decision,
1290            RoutingDecision::ConsumeAndForward { .. }
1291        ));
1292    }
1293
1294    #[test]
1295    fn test_verbose_max_hops_drop() {
1296        let router = SelectiveRouter::new_verbose();
1297        let state = create_test_state(HierarchyLevel::Squad, NodeRole::Member, true, 0, 0);
1298        let mut packet = DataPacket::telemetry("sensor-1", vec![1, 2, 3]);
1299        for _ in 0..10 {
1300            packet.increment_hop();
1301        }
1302        let decision = router.route(&packet, &state, "this-node");
1303        assert_eq!(decision, RoutingDecision::Drop);
1304    }
1305
1306    #[test]
1307    fn test_verbose_own_packet_drop() {
1308        let router = SelectiveRouter::new_verbose();
1309        let state = create_test_state(HierarchyLevel::Squad, NodeRole::Member, true, 0, 0);
1310        let packet = DataPacket::telemetry("this-node", vec![1, 2, 3]);
1311        let decision = router.route(&packet, &state, "this-node");
1312        assert_eq!(decision, RoutingDecision::Drop);
1313    }
1314
1315    #[test]
1316    fn test_verbose_consume_only() {
1317        let router = SelectiveRouter::new_verbose();
1318        let state = create_test_state(HierarchyLevel::Company, NodeRole::Leader, false, 3, 0);
1319        let packet = DataPacket::telemetry("sensor-1", vec![1, 2, 3]);
1320        let decision = router.route(&packet, &state, "hq-node");
1321        assert_eq!(decision, RoutingDecision::Consume);
1322    }
1323
1324    #[test]
1325    fn test_verbose_consume_and_forward() {
1326        let router = SelectiveRouter::new_verbose();
1327        let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Leader, true, 3, 0);
1328        let packet = DataPacket::telemetry("sensor-1", vec![1, 2, 3]);
1329        let decision = router.route(&packet, &state, "platoon-leader");
1330        assert!(matches!(
1331            decision,
1332            RoutingDecision::ConsumeAndForward { .. }
1333        ));
1334    }
1335
1336    #[test]
1337    fn test_verbose_consume_and_multicast() {
1338        let router = SelectiveRouter::new_verbose();
1339        let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Leader, true, 4, 0);
1340        let packet = DataPacket::command("hq", "platoon-leader", vec![4, 5, 6]);
1341        let decision = router.route(&packet, &state, "platoon-leader");
1342        assert!(matches!(
1343            decision,
1344            RoutingDecision::ConsumeAndForwardMulticast { .. }
1345        ));
1346    }
1347
1348    #[test]
1349    fn test_verbose_forward_multicast() {
1350        let router = SelectiveRouter::new_verbose();
1351        // Member (not leader) with lateral peers and broadcast lateral packet
1352        let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Member, true, 3, 0);
1353        // Downward broadcast from hq - member doesn't consume, should forward to children
1354        let packet = DataPacket {
1355            packet_id: uuid::Uuid::new_v4().to_string(),
1356            source_node_id: "hq".to_string(),
1357            destination_node_id: None,
1358            data_type: DataType::Command,
1359            direction: DataDirection::Downward,
1360            hop_count: 0,
1361            max_hops: 10,
1362            payload: vec![4, 5, 6],
1363        };
1364        let decision = router.route(&packet, &state, "member-node");
1365        assert!(matches!(decision, RoutingDecision::ForwardMulticast { .. }));
1366    }
1367
1368    #[test]
1369    fn test_verbose_forward_unicast() {
1370        let router = SelectiveRouter::new_verbose();
1371        let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Member, true, 1, 0);
1372        let packet = DataPacket {
1373            packet_id: uuid::Uuid::new_v4().to_string(),
1374            source_node_id: "hq".to_string(),
1375            destination_node_id: None,
1376            data_type: DataType::Command,
1377            direction: DataDirection::Downward,
1378            hop_count: 0,
1379            max_hops: 10,
1380            payload: vec![4, 5, 6],
1381        };
1382        let decision = router.route(&packet, &state, "member-node");
1383        assert!(matches!(decision, RoutingDecision::Forward { .. }));
1384    }
1385
1386    #[test]
1387    fn test_verbose_forward_no_next_hop_drop() {
1388        let router = SelectiveRouter::new_verbose();
1389        // Member with lateral peer but packet directed to unknown lateral
1390        let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Member, false, 0, 1);
1391        let packet = DataPacket {
1392            packet_id: uuid::Uuid::new_v4().to_string(),
1393            source_node_id: "other".to_string(),
1394            destination_node_id: Some("lateral-peer-0".to_string()),
1395            data_type: DataType::Coordination,
1396            direction: DataDirection::Lateral,
1397            hop_count: 0,
1398            max_hops: 3,
1399            payload: vec![7, 8, 9],
1400        };
1401        // Member doesn't consume lateral (not addressed to us), but does forward
1402        let decision = router.route(&packet, &state, "member-node");
1403        assert!(matches!(decision, RoutingDecision::Forward { .. }));
1404    }
1405
1406    #[test]
1407    fn test_verbose_drop_not_for_us() {
1408        let router = SelectiveRouter::new_verbose();
1409        let state = create_test_state(HierarchyLevel::Squad, NodeRole::Member, false, 0, 0);
1410        // Lateral packet not addressed to us, no lateral peers
1411        let packet = DataPacket {
1412            packet_id: uuid::Uuid::new_v4().to_string(),
1413            source_node_id: "other".to_string(),
1414            destination_node_id: Some("someone-else".to_string()),
1415            data_type: DataType::Coordination,
1416            direction: DataDirection::Lateral,
1417            hop_count: 0,
1418            max_hops: 3,
1419            payload: vec![7, 8, 9],
1420        };
1421        let decision = router.route(&packet, &state, "member-node");
1422        assert_eq!(decision, RoutingDecision::Drop);
1423    }
1424
1425    #[test]
1426    fn test_verbose_dedup_drop() {
1427        let config = DeduplicationConfig {
1428            enabled: true,
1429            ttl: Duration::from_secs(300),
1430            max_entries: 100,
1431        };
1432        let router = SelectiveRouter {
1433            verbose: true,
1434            dedup_config: config,
1435            seen_packets: Arc::new(RwLock::new(HashMap::new())),
1436        };
1437        let state = create_test_state(HierarchyLevel::Squad, NodeRole::Member, true, 0, 0);
1438        let packet = DataPacket::telemetry("sensor-1", vec![1, 2, 3]);
1439
1440        let _ = router.route(&packet, &state, "this-node");
1441        let decision2 = router.route(&packet, &state, "this-node");
1442        assert_eq!(decision2, RoutingDecision::Drop);
1443    }
1444
1445    #[test]
1446    fn test_forward_only_no_consume_member_downward() {
1447        // Member with children receiving non-addressed broadcast command
1448        let router = SelectiveRouter::new();
1449        let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Member, true, 2, 0);
1450        let packet = DataPacket {
1451            packet_id: uuid::Uuid::new_v4().to_string(),
1452            source_node_id: "hq".to_string(),
1453            destination_node_id: None,
1454            data_type: DataType::Command,
1455            direction: DataDirection::Downward,
1456            hop_count: 0,
1457            max_hops: 10,
1458            payload: vec![4, 5, 6],
1459        };
1460        let decision = router.route(&packet, &state, "member-node");
1461        // Member doesn't consume broadcast commands, just forwards
1462        assert!(matches!(decision, RoutingDecision::ForwardMulticast { .. }));
1463    }
1464
1465    #[test]
1466    fn test_should_forward_no_next_hop_returns_drop() {
1467        let router = SelectiveRouter::new();
1468        // Member with no linked peers, no lateral peers, downward packet addressed to unknown
1469        let state = create_test_state(HierarchyLevel::Squad, NodeRole::Member, false, 0, 0);
1470        let packet = DataPacket {
1471            packet_id: uuid::Uuid::new_v4().to_string(),
1472            source_node_id: "hq".to_string(),
1473            destination_node_id: None,
1474            data_type: DataType::Command,
1475            direction: DataDirection::Downward,
1476            hop_count: 0,
1477            max_hops: 10,
1478            payload: vec![4, 5, 6],
1479        };
1480        let decision = router.route(&packet, &state, "squad-member");
1481        assert_eq!(decision, RoutingDecision::Drop);
1482    }
1483
1484    #[test]
1485    fn test_next_hop_downward_targeted_not_found() {
1486        let router = SelectiveRouter::new();
1487        let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Leader, true, 3, 0);
1488        // Addressed to a peer not in linked_peers
1489        let packet = DataPacket::command("hq", "unknown-child", vec![4, 5, 6]);
1490
1491        let next_hop = router.next_hop(&packet, &state);
1492        // Should return first linked peer as fallback
1493        assert!(next_hop.is_some());
1494        assert!(next_hop.unwrap().starts_with("linked-peer-"));
1495    }
1496
1497    #[test]
1498    fn test_next_hop_lateral_unknown_peer() {
1499        let router = SelectiveRouter::new();
1500        let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Leader, true, 0, 3);
1501        let packet = DataPacket::coordination("source", "unknown-lateral", vec![7, 8, 9]);
1502
1503        let next_hop = router.next_hop(&packet, &state);
1504        // Should return first lateral peer as fallback
1505        assert!(next_hop.is_some());
1506        assert!(next_hop.unwrap().starts_with("lateral-peer-"));
1507    }
1508
1509    #[test]
1510    fn test_default_router() {
1511        let router = SelectiveRouter::default();
1512        assert_eq!(router.dedup_cache_size(), 0);
1513    }
1514
1515    #[test]
1516    fn test_deduplication_max_entries_eviction() {
1517        let config = DeduplicationConfig {
1518            enabled: true,
1519            ttl: Duration::from_secs(300),
1520            max_entries: 3, // Very small for testing
1521        };
1522        let router = SelectiveRouter::new_with_deduplication(config);
1523        let state = create_test_state(HierarchyLevel::Squad, NodeRole::Member, true, 0, 0);
1524
1525        // Route 5 packets
1526        for i in 0..5 {
1527            let packet = DataPacket::telemetry(format!("sensor-{}", i), vec![i as u8]);
1528            let _ = router.route(&packet, &state, "this-node");
1529        }
1530
1531        // Cache should be limited to max_entries
1532        assert!(router.dedup_cache_size() <= 3);
1533    }
1534}