Skip to main content

peat_mesh/routing/
router.rs

1//! Selective router implementation for hierarchical data routing
2//!
3//! This module implements the core routing logic that determines:
4//! - Whether data should be consumed (processed) by this node
5//! - Whether data should be forwarded to other nodes
6//! - Which peer should receive forwarded data
7//!
8//! ## Message Deduplication
9//!
10//! The router includes optional message deduplication to prevent routing loops.
11//! When enabled, each packet's ID is tracked and duplicate packets are automatically
12//! dropped. The deduplication cache uses a time-based eviction strategy.
13//!
14//! ## Lock ordering
15//!
16//! `SelectiveRouter` contains a single lock:
17//!
18//! | Lock | Type | Protects |
19//! |------|------|----------|
20//! | `seen_packets` | `std::sync::RwLock<HashMap<String, DeduplicationEntry>>` | Dedup cache |
21//!
22//! Because only one lock exists, there is no ordering constraint within this
23//! module. Callers that also hold locks from other modules (e.g.,
24//! `TransportManager` or `PeatMesh::state`) should acquire `seen_packets`
25//! **after** releasing those outer locks to avoid contention.
26
27use super::packet::{DataDirection, DataPacket};
28use crate::beacon::HierarchyLevel;
29use crate::hierarchy::NodeRole;
30use crate::topology::TopologyState;
31use std::collections::HashMap;
32use std::sync::{Arc, RwLock};
33use std::time::{Duration, Instant};
34use tracing::{debug, trace, warn};
35
36/// Routing decision result
37#[derive(Debug, Clone, PartialEq)]
38pub enum RoutingDecision {
39    /// Consume (process) the data locally
40    Consume,
41
42    /// Forward the data to a specific peer
43    Forward { next_hop: String },
44
45    /// Consume locally AND forward to peer
46    ConsumeAndForward { next_hop: String },
47
48    /// Forward the data to multiple peers (multicast/broadcast)
49    ForwardMulticast { next_hops: Vec<String> },
50
51    /// Consume locally AND forward to multiple peers (multicast/broadcast)
52    ConsumeAndForwardMulticast { next_hops: Vec<String> },
53
54    /// Drop the packet (reached max hops or no route)
55    Drop,
56}
57
58/// Configuration for message deduplication
59#[derive(Debug, Clone)]
60pub struct DeduplicationConfig {
61    /// Whether deduplication is enabled
62    pub enabled: bool,
63    /// How long to remember seen packet IDs (default: 5 minutes)
64    pub ttl: Duration,
65    /// Maximum number of packet IDs to track (default: 10000)
66    pub max_entries: usize,
67}
68
69impl Default for DeduplicationConfig {
70    fn default() -> Self {
71        Self {
72            enabled: true,
73            ttl: Duration::from_secs(300), // 5 minutes
74            max_entries: 10000,
75        }
76    }
77}
78
79/// Entry in the deduplication cache
80#[derive(Debug, Clone)]
81struct DeduplicationEntry {
82    /// When this packet was first seen
83    first_seen: Instant,
84}
85
86/// Selective router for hierarchical mesh networks
87///
88/// Makes intelligent routing decisions based on:
89/// - Node's position in hierarchy (level and role)
90/// - Data direction (upward/downward/lateral)
91/// - Topology state (selected peer, linked peers, lateral peers)
92///
93/// # Message Deduplication
94///
95/// The router can optionally track seen packet IDs to prevent routing loops.
96/// Use `new_with_deduplication()` to enable this feature.
97///
98/// # Example
99///
100/// ```ignore
101/// use peat_mesh::routing::{SelectiveRouter, DataPacket, DeduplicationConfig};
102/// use peat_mesh::topology::TopologyState;
103///
104/// // Create router with deduplication enabled
105/// let router = SelectiveRouter::new_with_deduplication(DeduplicationConfig::default());
106/// let state = get_topology_state();
107/// let packet = DataPacket::telemetry("node-123", vec![1, 2, 3]);
108///
109/// // Route will automatically deduplicate
110/// let decision = router.route(&packet, &state, "this-node");
111///
112/// // Second call with same packet returns Drop (duplicate)
113/// let decision2 = router.route(&packet, &state, "this-node");
114/// assert_eq!(decision2, RoutingDecision::Drop);
115/// ```
116pub struct SelectiveRouter {
117    /// Enable verbose logging for debugging
118    verbose: bool,
119    /// Deduplication configuration
120    dedup_config: DeduplicationConfig,
121    /// Cache of seen packet IDs (packet_id -> entry)
122    seen_packets: Arc<RwLock<HashMap<String, DeduplicationEntry>>>,
123}
124
125impl SelectiveRouter {
126    /// Create a new selective router (deduplication disabled by default)
127    pub fn new() -> Self {
128        Self {
129            verbose: false,
130            dedup_config: DeduplicationConfig {
131                enabled: false,
132                ..Default::default()
133            },
134            seen_packets: Arc::new(RwLock::new(HashMap::new())),
135        }
136    }
137
138    /// Create a new selective router with verbose logging
139    pub fn new_verbose() -> Self {
140        Self {
141            verbose: true,
142            dedup_config: DeduplicationConfig {
143                enabled: false,
144                ..Default::default()
145            },
146            seen_packets: Arc::new(RwLock::new(HashMap::new())),
147        }
148    }
149
150    /// Create a new selective router with deduplication enabled
151    pub fn new_with_deduplication(config: DeduplicationConfig) -> Self {
152        Self {
153            verbose: false,
154            dedup_config: config,
155            seen_packets: Arc::new(RwLock::new(HashMap::new())),
156        }
157    }
158
159    /// Check if a packet has been seen before (for deduplication)
160    ///
161    /// Returns `true` if this is a duplicate packet that should be dropped.
162    /// If the packet is new, it's added to the seen cache.
163    fn is_duplicate(&self, packet_id: &str) -> bool {
164        if !self.dedup_config.enabled {
165            return false;
166        }
167
168        let now = Instant::now();
169
170        // Try to insert into cache
171        let mut cache = self.seen_packets.write().unwrap_or_else(|e| e.into_inner());
172
173        // Check if already seen and not expired
174        if let Some(entry) = cache.get(packet_id) {
175            if now.duration_since(entry.first_seen) < self.dedup_config.ttl {
176                if self.verbose {
177                    debug!("Duplicate packet detected: {}", packet_id);
178                }
179                return true;
180            }
181            // Entry expired, will be replaced below
182        }
183
184        // Evict expired entries if cache is getting full
185        if cache.len() >= self.dedup_config.max_entries {
186            self.evict_expired(&mut cache, now);
187
188            // If still full after eviction, remove oldest entry
189            if cache.len() >= self.dedup_config.max_entries {
190                if let Some(oldest_key) = cache
191                    .iter()
192                    .min_by_key(|(_, entry)| entry.first_seen)
193                    .map(|(k, _)| k.clone())
194                {
195                    cache.remove(&oldest_key);
196                }
197            }
198        }
199
200        // Record this packet
201        cache.insert(
202            packet_id.to_string(),
203            DeduplicationEntry { first_seen: now },
204        );
205
206        false
207    }
208
209    /// Evict expired entries from the cache
210    fn evict_expired(&self, cache: &mut HashMap<String, DeduplicationEntry>, now: Instant) {
211        cache.retain(|_, entry| now.duration_since(entry.first_seen) < self.dedup_config.ttl);
212    }
213
214    /// Get the number of entries in the deduplication cache
215    pub fn dedup_cache_size(&self) -> usize {
216        self.seen_packets
217            .read()
218            .unwrap_or_else(|e| e.into_inner())
219            .len()
220    }
221
222    /// Clear the deduplication cache
223    pub fn clear_dedup_cache(&self) {
224        self.seen_packets
225            .write()
226            .unwrap_or_else(|e| e.into_inner())
227            .clear();
228    }
229
230    /// Make a complete routing decision for a packet
231    ///
232    /// This is the primary entry point that combines should_consume,
233    /// should_forward, and next_hop into a single decision.
234    ///
235    /// If deduplication is enabled, duplicate packets are automatically dropped.
236    ///
237    /// # Arguments
238    ///
239    /// * `packet` - The data packet to route
240    /// * `state` - Current topology state
241    /// * `this_node_id` - This node's identifier
242    ///
243    /// # Returns
244    ///
245    /// RoutingDecision indicating what to do with the packet
246    pub fn route(
247        &self,
248        packet: &DataPacket,
249        state: &TopologyState,
250        this_node_id: &str,
251    ) -> RoutingDecision {
252        // Check for duplicate packet (if deduplication enabled)
253        if self.is_duplicate(&packet.packet_id) {
254            if self.verbose {
255                debug!("Packet {} is a duplicate, dropping", packet.packet_id);
256            }
257            return RoutingDecision::Drop;
258        }
259
260        // Check if packet has reached max hops
261        if packet.at_max_hops() {
262            if self.verbose {
263                warn!(
264                    "Packet {} reached max hops ({}), dropping",
265                    packet.packet_id, packet.max_hops
266                );
267            }
268            return RoutingDecision::Drop;
269        }
270
271        // Check if we're the source (don't route our own packets back to us)
272        if packet.source_node_id == this_node_id {
273            if self.verbose {
274                trace!(
275                    "Packet {} originated from us, not routing",
276                    packet.packet_id
277                );
278            }
279            return RoutingDecision::Drop;
280        }
281
282        let should_consume = self.should_consume(packet, state, this_node_id);
283        let should_forward = self.should_forward(packet, state);
284
285        if should_consume && should_forward {
286            // Both consume and forward - check if multicast needed
287            let next_hops = self.next_hops(packet, state);
288            if next_hops.is_empty() {
289                // Can't forward without next hop, just consume
290                if self.verbose {
291                    debug!("Packet {}: Consume only (no next hop)", packet.packet_id);
292                }
293                RoutingDecision::Consume
294            } else if next_hops.len() == 1 {
295                // Single next hop - use unicast variant
296                if self.verbose {
297                    debug!(
298                        "Packet {}: Consume and forward to {}",
299                        packet.packet_id, next_hops[0]
300                    );
301                }
302                RoutingDecision::ConsumeAndForward {
303                    next_hop: next_hops[0].clone(),
304                }
305            } else {
306                // Multiple next hops - use multicast variant
307                if self.verbose {
308                    debug!(
309                        "Packet {}: Consume and multicast to {} peers",
310                        packet.packet_id,
311                        next_hops.len()
312                    );
313                }
314                RoutingDecision::ConsumeAndForwardMulticast { next_hops }
315            }
316        } else if should_consume {
317            if self.verbose {
318                debug!("Packet {}: Consume only", packet.packet_id);
319            }
320            RoutingDecision::Consume
321        } else if should_forward {
322            let next_hops = self.next_hops(packet, state);
323            if next_hops.is_empty() {
324                if self.verbose {
325                    warn!(
326                        "Packet {}: Should forward but no next hop, dropping",
327                        packet.packet_id
328                    );
329                }
330                RoutingDecision::Drop
331            } else if next_hops.len() == 1 {
332                // Single next hop - use unicast variant
333                if self.verbose {
334                    debug!("Packet {}: Forward to {}", packet.packet_id, next_hops[0]);
335                }
336                RoutingDecision::Forward {
337                    next_hop: next_hops[0].clone(),
338                }
339            } else {
340                // Multiple next hops - use multicast variant
341                if self.verbose {
342                    debug!(
343                        "Packet {}: Multicast to {} peers",
344                        packet.packet_id,
345                        next_hops.len()
346                    );
347                }
348                RoutingDecision::ForwardMulticast { next_hops }
349            }
350        } else {
351            if self.verbose {
352                debug!("Packet {}: Drop (not for us)", packet.packet_id);
353            }
354            RoutingDecision::Drop
355        }
356    }
357
358    /// Determine if this node should consume (process) the packet
359    ///
360    /// # Consumption Rules
361    ///
362    /// **Upward (Telemetry)**
363    /// - Always consume telemetry for local processing/aggregation
364    ///
365    /// **Downward (Commands)**
366    /// - Consume if packet is addressed to us
367    /// - Leaders consume commands for their squad
368    ///
369    /// **Lateral (Coordination)**
370    /// - Leaders consume coordination messages
371    /// - Members typically don't consume lateral messages
372    ///
373    /// # Arguments
374    ///
375    /// * `packet` - The data packet
376    /// * `state` - Current topology state
377    /// * `this_node_id` - This node's identifier
378    ///
379    /// # Returns
380    ///
381    /// `true` if this node should process the packet
382    pub fn should_consume(
383        &self,
384        packet: &DataPacket,
385        state: &TopologyState,
386        this_node_id: &str,
387    ) -> bool {
388        match packet.direction {
389            DataDirection::Upward => {
390                // Upward data (telemetry, status): Always consume for aggregation
391                // Every node in the path can aggregate/process
392                true
393            }
394
395            DataDirection::Downward => {
396                // Downward data (commands, config): Consume if targeted at us
397                if let Some(ref dest) = packet.destination_node_id {
398                    if dest == this_node_id {
399                        return true;
400                    }
401                }
402
403                // Leaders consume commands even if not directly targeted
404                // (they may need to disseminate to squad members)
405                matches!(state.role, NodeRole::Leader)
406            }
407
408            DataDirection::Lateral => {
409                // Lateral data (coordination): Only Leaders typically consume
410                if let Some(ref dest) = packet.destination_node_id {
411                    // Consume only if directly addressed to us
412                    dest == this_node_id
413                } else {
414                    // No specific destination (broadcast): Leaders consume
415                    matches!(state.role, NodeRole::Leader)
416                }
417            }
418        }
419    }
420
421    /// Determine if this node should forward the packet
422    ///
423    /// # Forwarding Rules
424    ///
425    /// **Upward (Telemetry)**
426    /// - Forward if we have a selected peer (parent in hierarchy)
427    /// - Don't forward if we're at HQ level (no parent)
428    ///
429    /// **Downward (Commands)**
430    /// - Forward if we have linked peers (children) that need this data
431    /// - Don't forward if we're a leaf node (no children)
432    ///
433    /// **Lateral (Coordination)**
434    /// - Forward if addressed to a lateral peer we track
435    /// - Leaders may forward to other Leaders at same level
436    ///
437    /// # Arguments
438    ///
439    /// * `packet` - The data packet
440    /// * `state` - Current topology state
441    ///
442    /// # Returns
443    ///
444    /// `true` if packet should be forwarded to another peer
445    pub fn should_forward(&self, packet: &DataPacket, state: &TopologyState) -> bool {
446        match packet.direction {
447            DataDirection::Upward => {
448                // Forward upward if we have a selected peer (parent)
449                state.selected_peer.is_some()
450            }
451
452            DataDirection::Downward => {
453                // Forward downward if we have linked peers (children)
454                !state.linked_peers.is_empty()
455            }
456
457            DataDirection::Lateral => {
458                // Forward laterally if addressed to a peer we know
459                if let Some(ref dest) = packet.destination_node_id {
460                    state.lateral_peers.contains_key(dest)
461                } else {
462                    // Broadcast lateral messages if we're a Leader with lateral peers
463                    matches!(state.role, NodeRole::Leader) && !state.lateral_peers.is_empty()
464                }
465            }
466        }
467    }
468
469    /// Determine the next hop for forwarding the packet
470    ///
471    /// # Next Hop Selection
472    ///
473    /// **Upward**: selected_peer (parent in hierarchy)
474    /// **Downward**: linked_peers (children) - for now, return first child
475    /// **Lateral**: lateral_peers - specific peer if addressed, or first if broadcast
476    ///
477    /// # Arguments
478    ///
479    /// * `packet` - The data packet
480    /// * `state` - Current topology state
481    ///
482    /// # Returns
483    ///
484    /// Node ID of the next hop, or None if no valid next hop
485    pub fn next_hop(&self, packet: &DataPacket, state: &TopologyState) -> Option<String> {
486        match packet.direction {
487            DataDirection::Upward => {
488                // Upward: Route to selected peer (parent)
489                state
490                    .selected_peer
491                    .as_ref()
492                    .map(|peer| peer.node_id.clone())
493            }
494
495            DataDirection::Downward => {
496                // Downward: Route to linked peers (children)
497                // If addressed to specific child, route there
498                if let Some(ref dest) = packet.destination_node_id {
499                    if state.linked_peers.contains_key(dest) {
500                        return Some(dest.clone());
501                    }
502                }
503
504                // Otherwise, return first linked peer for backward compatibility
505                // For multicast/broadcast, use next_hops() instead
506                state.linked_peers.keys().next().cloned()
507            }
508
509            DataDirection::Lateral => {
510                // Lateral: Route to lateral peers
511                if let Some(ref dest) = packet.destination_node_id {
512                    // Route to specific lateral peer if we track them
513                    if state.lateral_peers.contains_key(dest) {
514                        return Some(dest.clone());
515                    }
516                }
517
518                // Otherwise, route to first lateral peer for backward compatibility
519                state.lateral_peers.keys().next().cloned()
520            }
521        }
522    }
523
524    /// Determine all next hops for multicast/broadcast forwarding
525    ///
526    /// Returns all appropriate peers for scenarios requiring multicast:
527    /// - Downward command dissemination to all children
528    /// - Lateral coordination broadcast to all peers at same level
529    ///
530    /// # Next Hops Selection
531    ///
532    /// **Upward**: Returns selected_peer (parent) as single-element vector
533    /// **Downward**: Returns all linked_peers (children) for broadcast
534    /// **Lateral**: Returns all lateral_peers for broadcast
535    ///
536    /// # Arguments
537    ///
538    /// * `packet` - The data packet
539    /// * `state` - Current topology state
540    ///
541    /// # Returns
542    ///
543    /// Vector of node IDs to forward to (empty if no valid hops)
544    pub fn next_hops(&self, packet: &DataPacket, state: &TopologyState) -> Vec<String> {
545        match packet.direction {
546            DataDirection::Upward => {
547                // Upward: Return selected peer (parent) as single-element vector
548                state
549                    .selected_peer
550                    .as_ref()
551                    .map(|peer| vec![peer.node_id.clone()])
552                    .unwrap_or_default()
553            }
554
555            DataDirection::Downward => {
556                // Downward: Route to all linked peers (children) for broadcast
557                // If addressed to specific child, route only there
558                if let Some(ref dest) = packet.destination_node_id {
559                    if state.linked_peers.contains_key(dest) {
560                        return vec![dest.clone()];
561                    }
562                }
563
564                // Otherwise, route to ALL linked peers (multicast/broadcast)
565                state.linked_peers.keys().cloned().collect()
566            }
567
568            DataDirection::Lateral => {
569                // Lateral: Route to all lateral peers for broadcast
570                if let Some(ref dest) = packet.destination_node_id {
571                    // Route to specific lateral peer if we track them
572                    if state.lateral_peers.contains_key(dest) {
573                        return vec![dest.clone()];
574                    }
575                }
576
577                // Otherwise, route to ALL lateral peers (broadcast)
578                state.lateral_peers.keys().cloned().collect()
579            }
580        }
581    }
582
583    /// Check if this node is at the hierarchy level that should aggregate
584    ///
585    /// HQ nodes (Company level) should aggregate and consume
586    /// without further forwarding.
587    #[allow(dead_code)]
588    fn is_hq_level(&self, level: HierarchyLevel) -> bool {
589        matches!(level, HierarchyLevel::Company)
590    }
591
592    /// Check if a packet should be aggregated before forwarding
593    ///
594    /// Aggregation is appropriate when:
595    /// - Packet data type requires aggregation (Telemetry, Status)
596    /// - Routing decision is ConsumeAndForward (intermediate node)
597    /// - Node is a Leader (squad leader aggregating member data)
598    ///
599    /// # Integration with Aggregator
600    ///
601    /// When this returns true, the application should:
602    /// 1. Collect telemetry packets from squad members (batching)
603    /// 2. Use `Aggregator::aggregate_telemetry()` to create aggregated packet
604    /// 3. Route the aggregated packet upward using this router
605    ///
606    /// # Example
607    ///
608    /// ```ignore
609    /// use peat_mesh::routing::{SelectiveRouter, Aggregator, DataPacket};
610    ///
611    /// let router = SelectiveRouter::new();
612    /// // let aggregator = MyAggregator::new();
613    ///
614    /// // Collect telemetry from squad members
615    /// let mut squad_telemetry = Vec::new();
616    /// for packet in incoming_packets {
617    ///     let decision = router.route(&packet, &state, "platoon-leader");
618    ///     if router.should_aggregate(&packet, &decision, &state) {
619    ///         squad_telemetry.push(packet);
620    ///     }
621    /// }
622    ///
623    /// // Aggregate when we have enough data
624    /// if squad_telemetry.len() >= 3 {
625    ///     let aggregated = aggregator.aggregate_telemetry(
626    ///         "squad-1",
627    ///         "platoon-leader",
628    ///         squad_telemetry,
629    ///     )?;
630    ///
631    ///     // Route aggregated packet upward
632    ///     let decision = router.route(&aggregated, &state, "platoon-leader");
633    ///     // ... forward to parent
634    /// }
635    /// ```
636    ///
637    /// # Arguments
638    ///
639    /// * `packet` - The data packet to check
640    /// * `decision` - The routing decision for this packet
641    /// * `state` - Current topology state
642    ///
643    /// # Returns
644    ///
645    /// `true` if this packet should be aggregated before forwarding
646    pub fn should_aggregate(
647        &self,
648        packet: &DataPacket,
649        decision: &RoutingDecision,
650        state: &TopologyState,
651    ) -> bool {
652        // Only aggregate if we're consuming and forwarding (intermediate node)
653        if !matches!(decision, RoutingDecision::ConsumeAndForward { .. }) {
654            return false;
655        }
656
657        // Only aggregate data types that require it
658        if !packet.data_type.requires_aggregation() {
659            return false;
660        }
661
662        // Only Leaders aggregate squad member data
663        matches!(state.role, NodeRole::Leader)
664    }
665}
666
667impl Default for SelectiveRouter {
668    fn default() -> Self {
669        Self::new()
670    }
671}
672
673#[cfg(test)]
674mod tests {
675    use super::*;
676    use crate::beacon::{GeoPosition, GeographicBeacon};
677    use crate::routing::packet::{DataDirection, DataType};
678    use crate::topology::SelectedPeer;
679    use std::collections::HashMap;
680    use std::time::Instant;
681
682    fn create_test_state(
683        hierarchy_level: HierarchyLevel,
684        role: NodeRole,
685        has_selected_peer: bool,
686        num_linked_peers: usize,
687        num_lateral_peers: usize,
688    ) -> TopologyState {
689        let selected_peer = if has_selected_peer {
690            Some(SelectedPeer {
691                node_id: "parent-node".to_string(),
692                beacon: GeographicBeacon::new(
693                    "parent-node".to_string(),
694                    GeoPosition::new(37.7749, -122.4194),
695                    HierarchyLevel::Platoon,
696                ),
697                selected_at: Instant::now(),
698            })
699        } else {
700            None
701        };
702
703        let mut linked_peers = HashMap::new();
704        for i in 0..num_linked_peers {
705            linked_peers.insert(format!("linked-peer-{}", i), Instant::now());
706        }
707
708        let mut lateral_peers = HashMap::new();
709        for i in 0..num_lateral_peers {
710            lateral_peers.insert(format!("lateral-peer-{}", i), Instant::now());
711        }
712
713        TopologyState {
714            selected_peer,
715            linked_peers,
716            lateral_peers,
717            role,
718            hierarchy_level,
719        }
720    }
721
722    #[test]
723    fn test_upward_telemetry_leaf_node() {
724        let router = SelectiveRouter::new();
725        let state = create_test_state(HierarchyLevel::Squad, NodeRole::Member, true, 0, 0);
726        let packet = DataPacket::telemetry("sensor-1", vec![1, 2, 3]);
727
728        // Leaf node should consume telemetry
729        assert!(router.should_consume(&packet, &state, "this-node"));
730
731        // Leaf node with parent should forward
732        assert!(router.should_forward(&packet, &state));
733
734        // Next hop should be parent
735        let next_hop = router.next_hop(&packet, &state);
736        assert_eq!(next_hop, Some("parent-node".to_string()));
737    }
738
739    #[test]
740    fn test_upward_telemetry_hq_node() {
741        let router = SelectiveRouter::new();
742        // HQ node (no selected peer = highest level)
743        let state = create_test_state(HierarchyLevel::Company, NodeRole::Leader, false, 3, 0);
744        let packet = DataPacket::telemetry("sensor-1", vec![1, 2, 3]);
745
746        // HQ should consume telemetry
747        assert!(router.should_consume(&packet, &state, "hq-node"));
748
749        // HQ should NOT forward (no parent)
750        assert!(!router.should_forward(&packet, &state));
751    }
752
753    #[test]
754    fn test_downward_command_to_leader() {
755        let router = SelectiveRouter::new();
756        let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Leader, true, 3, 0);
757        let packet = DataPacket::command("hq", "platoon-leader", vec![4, 5, 6]);
758
759        // Leader should consume command addressed to them
760        assert!(router.should_consume(&packet, &state, "platoon-leader"));
761
762        // Leader with children should forward
763        assert!(router.should_forward(&packet, &state));
764
765        // Next hop should be one of the linked peers (children)
766        let next_hop = router.next_hop(&packet, &state);
767        assert!(next_hop.is_some());
768        assert!(next_hop.unwrap().starts_with("linked-peer-"));
769    }
770
771    #[test]
772    fn test_downward_command_to_leaf() {
773        let router = SelectiveRouter::new();
774        let state = create_test_state(HierarchyLevel::Squad, NodeRole::Member, true, 0, 0);
775        let packet = DataPacket::command("hq", "squad-member", vec![4, 5, 6]);
776
777        // Member should consume command addressed to them
778        assert!(router.should_consume(&packet, &state, "squad-member"));
779
780        // Leaf node should NOT forward (no children)
781        assert!(!router.should_forward(&packet, &state));
782    }
783
784    #[test]
785    fn test_lateral_coordination_between_leaders() {
786        let router = SelectiveRouter::new();
787        let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Leader, true, 2, 3);
788        let packet = DataPacket::coordination("platoon-1", "lateral-peer-0", vec![7, 8, 9]);
789
790        // Leader should NOT consume lateral coordination if not addressed to them
791        assert!(!router.should_consume(&packet, &state, "platoon-3"));
792
793        // Should forward if addressed to a lateral peer we track
794        let state_with_target =
795            create_test_state(HierarchyLevel::Platoon, NodeRole::Leader, true, 2, 3);
796        assert!(router.should_forward(&packet, &state_with_target));
797    }
798
799    #[test]
800    fn test_max_hops_drop() {
801        let router = SelectiveRouter::new();
802        let state = create_test_state(HierarchyLevel::Squad, NodeRole::Member, true, 0, 0);
803        let mut packet = DataPacket::telemetry("sensor-1", vec![1, 2, 3]);
804
805        // Increment hops to max
806        for _ in 0..10 {
807            packet.increment_hop();
808        }
809
810        // Routing should return Drop when at max hops
811        let decision = router.route(&packet, &state, "this-node");
812        assert_eq!(decision, RoutingDecision::Drop);
813    }
814
815    #[test]
816    fn test_routing_decision_consume_and_forward() {
817        let router = SelectiveRouter::new();
818        // Intermediate node with parent and children
819        let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Leader, true, 3, 0);
820        let packet = DataPacket::telemetry("sensor-1", vec![1, 2, 3]);
821
822        let decision = router.route(&packet, &state, "platoon-leader");
823
824        // Should consume and forward
825        match decision {
826            RoutingDecision::ConsumeAndForward { next_hop } => {
827                assert_eq!(next_hop, "parent-node");
828            }
829            _ => panic!("Expected ConsumeAndForward, got {:?}", decision),
830        }
831    }
832
833    #[test]
834    fn test_routing_decision_consume_only() {
835        let router = SelectiveRouter::new();
836        // HQ node (no parent)
837        let state = create_test_state(HierarchyLevel::Company, NodeRole::Leader, false, 3, 0);
838        let packet = DataPacket::telemetry("sensor-1", vec![1, 2, 3]);
839
840        let decision = router.route(&packet, &state, "hq-node");
841
842        // Should consume only (no forwarding)
843        assert_eq!(decision, RoutingDecision::Consume);
844    }
845
846    #[test]
847    fn test_dont_route_own_packets() {
848        let router = SelectiveRouter::new();
849        let state = create_test_state(HierarchyLevel::Squad, NodeRole::Member, true, 0, 0);
850        let packet = DataPacket::telemetry("this-node", vec![1, 2, 3]);
851
852        // Should not route our own packets back to us
853        let decision = router.route(&packet, &state, "this-node");
854        assert_eq!(decision, RoutingDecision::Drop);
855    }
856
857    #[test]
858    fn test_should_aggregate_intermediate_leader() {
859        let router = SelectiveRouter::new();
860        // Intermediate Leader node (has parent and children)
861        let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Leader, true, 3, 0);
862        let packet = DataPacket::telemetry("squad-member-1", vec![1, 2, 3]);
863
864        let decision = router.route(&packet, &state, "platoon-leader");
865
866        // Should aggregate: Leader with ConsumeAndForward decision
867        assert!(router.should_aggregate(&packet, &decision, &state));
868    }
869
870    #[test]
871    fn test_should_not_aggregate_hq_node() {
872        let router = SelectiveRouter::new();
873        // HQ node (no parent, just consumes)
874        let state = create_test_state(HierarchyLevel::Company, NodeRole::Leader, false, 3, 0);
875        let packet = DataPacket::telemetry("platoon-1", vec![1, 2, 3]);
876
877        let decision = router.route(&packet, &state, "hq-node");
878
879        // Should NOT aggregate: Decision is Consume only (not ConsumeAndForward)
880        assert!(!router.should_aggregate(&packet, &decision, &state));
881    }
882
883    #[test]
884    fn test_should_not_aggregate_non_leader() {
885        let router = SelectiveRouter::new();
886        // Member node (not a Leader)
887        let state = create_test_state(HierarchyLevel::Squad, NodeRole::Member, true, 0, 0);
888        let packet = DataPacket::telemetry("sensor-1", vec![1, 2, 3]);
889
890        let decision = router.route(&packet, &state, "squad-member");
891
892        // Should NOT aggregate: Not a Leader
893        assert!(!router.should_aggregate(&packet, &decision, &state));
894    }
895
896    #[test]
897    fn test_should_not_aggregate_command_packet() {
898        let router = SelectiveRouter::new();
899        // Leader node
900        let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Leader, true, 3, 0);
901        let packet = DataPacket::command("hq", "platoon-leader", vec![4, 5, 6]);
902
903        let decision = router.route(&packet, &state, "platoon-leader");
904
905        // Should NOT aggregate: Command packets don't require aggregation
906        assert!(!router.should_aggregate(&packet, &decision, &state));
907    }
908
909    // ============================================================================
910    // Week 10: Multicast/Broadcast Routing Tests
911    // ============================================================================
912
913    #[test]
914    fn test_next_hops_upward_single_parent() {
915        let router = SelectiveRouter::new();
916        let state = create_test_state(HierarchyLevel::Squad, NodeRole::Member, true, 0, 0);
917        let packet = DataPacket::telemetry("sensor-1", vec![1, 2, 3]);
918
919        // Upward should return selected_peer as single-element vector
920        let next_hops = router.next_hops(&packet, &state);
921        assert_eq!(next_hops.len(), 1);
922        assert_eq!(next_hops[0], "parent-node");
923    }
924
925    #[test]
926    fn test_next_hops_upward_no_parent() {
927        let router = SelectiveRouter::new();
928        // HQ node with no parent
929        let state = create_test_state(HierarchyLevel::Company, NodeRole::Leader, false, 3, 0);
930        let packet = DataPacket::telemetry("sensor-1", vec![1, 2, 3]);
931
932        // No parent means empty vector
933        let next_hops = router.next_hops(&packet, &state);
934        assert_eq!(next_hops.len(), 0);
935    }
936
937    #[test]
938    fn test_next_hops_downward_multicast() {
939        let router = SelectiveRouter::new();
940        // Platoon leader with 5 linked peers (children)
941        let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Leader, true, 5, 0);
942        // Create broadcast command packet (no specific destination)
943        let packet = DataPacket {
944            packet_id: uuid::Uuid::new_v4().to_string(),
945            source_node_id: "hq".to_string(),
946            destination_node_id: None, // Broadcast
947            data_type: DataType::Command,
948            direction: DataDirection::Downward,
949            hop_count: 0,
950            max_hops: 10,
951            payload: vec![4, 5, 6],
952        };
953
954        // Downward broadcast should return ALL linked peers
955        let next_hops = router.next_hops(&packet, &state);
956        assert_eq!(next_hops.len(), 5);
957        for i in 0..5 {
958            assert!(next_hops.contains(&format!("linked-peer-{}", i)));
959        }
960    }
961
962    #[test]
963    fn test_next_hops_downward_targeted() {
964        let router = SelectiveRouter::new();
965        // Platoon leader with 3 linked peers
966        let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Leader, true, 3, 0);
967        let packet = DataPacket::command("hq", "linked-peer-1", vec![4, 5, 6]);
968
969        // Targeted downward should return only the specific child
970        let next_hops = router.next_hops(&packet, &state);
971        assert_eq!(next_hops.len(), 1);
972        assert_eq!(next_hops[0], "linked-peer-1");
973    }
974
975    #[test]
976    fn test_next_hops_lateral_multicast() {
977        let router = SelectiveRouter::new();
978        // Leader with 4 lateral peers
979        let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Leader, true, 2, 4);
980        // Create broadcast coordination packet (no specific destination)
981        let packet = DataPacket {
982            packet_id: uuid::Uuid::new_v4().to_string(),
983            source_node_id: "platoon-1".to_string(),
984            destination_node_id: None, // Broadcast
985            data_type: DataType::Coordination,
986            direction: DataDirection::Lateral,
987            hop_count: 0,
988            max_hops: 3,
989            payload: vec![7, 8, 9],
990        };
991
992        // Lateral broadcast should return ALL lateral peers
993        let next_hops = router.next_hops(&packet, &state);
994        assert_eq!(next_hops.len(), 4);
995        for i in 0..4 {
996            assert!(next_hops.contains(&format!("lateral-peer-{}", i)));
997        }
998    }
999
1000    #[test]
1001    fn test_next_hops_lateral_targeted() {
1002        let router = SelectiveRouter::new();
1003        // Leader with 3 lateral peers
1004        let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Leader, true, 2, 3);
1005        let packet = DataPacket::coordination("platoon-1", "lateral-peer-2", vec![7, 8, 9]);
1006
1007        // Targeted lateral should return only specific peer
1008        let next_hops = router.next_hops(&packet, &state);
1009        assert_eq!(next_hops.len(), 1);
1010        assert_eq!(next_hops[0], "lateral-peer-2");
1011    }
1012
1013    #[test]
1014    fn test_route_downward_multicast() {
1015        let router = SelectiveRouter::new();
1016        // HQ node (Leader) with 3 children, broadcasting command
1017        let state = create_test_state(HierarchyLevel::Company, NodeRole::Leader, false, 3, 0);
1018        // Create broadcast command packet (no specific destination)
1019        let packet = DataPacket {
1020            packet_id: uuid::Uuid::new_v4().to_string(),
1021            source_node_id: "hq".to_string(),
1022            destination_node_id: None, // Broadcast
1023            data_type: DataType::Command,
1024            direction: DataDirection::Downward,
1025            hop_count: 0,
1026            max_hops: 10,
1027            payload: vec![4, 5, 6],
1028        };
1029
1030        let decision = router.route(&packet, &state, "hq-node");
1031
1032        // Leaders should consume broadcast commands AND multicast to all children
1033        match decision {
1034            RoutingDecision::ConsumeAndForwardMulticast { next_hops } => {
1035                assert_eq!(next_hops.len(), 3);
1036                assert!(next_hops.contains(&"linked-peer-0".to_string()));
1037                assert!(next_hops.contains(&"linked-peer-1".to_string()));
1038                assert!(next_hops.contains(&"linked-peer-2".to_string()));
1039            }
1040            _ => panic!("Expected ConsumeAndForwardMulticast, got {:?}", decision),
1041        }
1042    }
1043
1044    #[test]
1045    fn test_route_downward_consume_and_multicast() {
1046        let router = SelectiveRouter::new();
1047        // Platoon leader with 4 children, receiving command addressed to them
1048        let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Leader, true, 4, 0);
1049        let packet = DataPacket::command("hq", "platoon-leader", vec![4, 5, 6]);
1050
1051        let decision = router.route(&packet, &state, "platoon-leader");
1052
1053        // Should consume (addressed to us) AND multicast to all children
1054        match decision {
1055            RoutingDecision::ConsumeAndForwardMulticast { next_hops } => {
1056                assert_eq!(next_hops.len(), 4);
1057                for i in 0..4 {
1058                    assert!(next_hops.contains(&format!("linked-peer-{}", i)));
1059                }
1060            }
1061            _ => panic!("Expected ConsumeAndForwardMulticast, got {:?}", decision),
1062        }
1063    }
1064
1065    #[test]
1066    fn test_route_lateral_multicast() {
1067        let router = SelectiveRouter::new();
1068        // Leader with 3 lateral peers, broadcasting coordination
1069        let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Leader, true, 0, 3);
1070        // Create broadcast coordination packet (no specific destination)
1071        let packet = DataPacket {
1072            packet_id: uuid::Uuid::new_v4().to_string(),
1073            source_node_id: "platoon-1".to_string(),
1074            destination_node_id: None, // Broadcast
1075            data_type: DataType::Coordination,
1076            direction: DataDirection::Lateral,
1077            hop_count: 0,
1078            max_hops: 3,
1079            payload: vec![7, 8, 9],
1080        };
1081
1082        let decision = router.route(&packet, &state, "platoon-3");
1083
1084        // Leaders should consume broadcast coordination AND forward to all lateral peers
1085        match decision {
1086            RoutingDecision::ConsumeAndForwardMulticast { next_hops } => {
1087                assert_eq!(next_hops.len(), 3);
1088                assert!(next_hops.contains(&"lateral-peer-0".to_string()));
1089                assert!(next_hops.contains(&"lateral-peer-1".to_string()));
1090                assert!(next_hops.contains(&"lateral-peer-2".to_string()));
1091            }
1092            _ => panic!("Expected ConsumeAndForwardMulticast, got {:?}", decision),
1093        }
1094    }
1095
1096    #[test]
1097    fn test_route_downward_single_child_unicast() {
1098        let router = SelectiveRouter::new();
1099        // Leader with only 1 child
1100        let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Leader, false, 1, 0);
1101        // Create broadcast command packet (no specific destination)
1102        let packet = DataPacket {
1103            packet_id: uuid::Uuid::new_v4().to_string(),
1104            source_node_id: "hq".to_string(),
1105            destination_node_id: None, // Broadcast
1106            data_type: DataType::Command,
1107            direction: DataDirection::Downward,
1108            hop_count: 0,
1109            max_hops: 10,
1110            payload: vec![4, 5, 6],
1111        };
1112
1113        let decision = router.route(&packet, &state, "platoon-leader");
1114
1115        // Leaders consume broadcast commands, and with only 1 child use unicast variant
1116        match decision {
1117            RoutingDecision::ConsumeAndForward { next_hop } => {
1118                assert_eq!(next_hop, "linked-peer-0");
1119            }
1120            _ => panic!("Expected ConsumeAndForward (unicast), got {:?}", decision),
1121        }
1122    }
1123
1124    #[test]
1125    fn test_route_lateral_single_peer_unicast() {
1126        let router = SelectiveRouter::new();
1127        // Leader with only 1 lateral peer
1128        let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Leader, true, 0, 1);
1129        // Create broadcast coordination packet (no specific destination)
1130        let packet = DataPacket {
1131            packet_id: uuid::Uuid::new_v4().to_string(),
1132            source_node_id: "platoon-1".to_string(),
1133            destination_node_id: None, // Broadcast
1134            data_type: DataType::Coordination,
1135            direction: DataDirection::Lateral,
1136            hop_count: 0,
1137            max_hops: 3,
1138            payload: vec![7, 8, 9],
1139        };
1140
1141        let decision = router.route(&packet, &state, "platoon-3");
1142
1143        // Leaders consume broadcast coordination, and with only 1 lateral peer use unicast variant
1144        match decision {
1145            RoutingDecision::ConsumeAndForward { next_hop } => {
1146                assert_eq!(next_hop, "lateral-peer-0");
1147            }
1148            _ => panic!("Expected ConsumeAndForward (unicast), got {:?}", decision),
1149        }
1150    }
1151
1152    #[test]
1153    fn test_route_downward_no_children_drop() {
1154        let router = SelectiveRouter::new();
1155        // Leaf node with no children
1156        let state = create_test_state(HierarchyLevel::Squad, NodeRole::Member, true, 0, 0);
1157        // Create broadcast command packet (no specific destination)
1158        let packet = DataPacket {
1159            packet_id: uuid::Uuid::new_v4().to_string(),
1160            source_node_id: "hq".to_string(),
1161            destination_node_id: None, // Broadcast
1162            data_type: DataType::Command,
1163            direction: DataDirection::Downward,
1164            hop_count: 0,
1165            max_hops: 10,
1166            payload: vec![4, 5, 6],
1167        };
1168
1169        let decision = router.route(&packet, &state, "squad-member");
1170
1171        // With no children, downward broadcast should drop (or consume if addressed)
1172        // In this case, not addressed to us, so should drop
1173        assert_eq!(decision, RoutingDecision::Drop);
1174    }
1175
1176    #[test]
1177    fn test_multicast_preserves_backward_compatibility() {
1178        let router = SelectiveRouter::new();
1179        // Intermediate node with parent (upward routing)
1180        let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Leader, true, 3, 0);
1181        let packet = DataPacket::telemetry("sensor-1", vec![1, 2, 3]);
1182
1183        let decision = router.route(&packet, &state, "platoon-leader");
1184
1185        // Upward routing should still use ConsumeAndForward (unicast)
1186        match decision {
1187            RoutingDecision::ConsumeAndForward { next_hop } => {
1188                assert_eq!(next_hop, "parent-node");
1189            }
1190            _ => panic!(
1191                "Expected ConsumeAndForward (backward compat), got {:?}",
1192                decision
1193            ),
1194        }
1195
1196        // next_hop() should still work for backward compatibility
1197        let next_hop = router.next_hop(&packet, &state);
1198        assert_eq!(next_hop, Some("parent-node".to_string()));
1199    }
1200
1201    // ============================================================================
1202    // Message Deduplication Tests
1203    // ============================================================================
1204
1205    #[test]
1206    fn test_deduplication_disabled_by_default() {
1207        let router = SelectiveRouter::new();
1208        let state = create_test_state(HierarchyLevel::Squad, NodeRole::Member, true, 0, 0);
1209        let packet = DataPacket::telemetry("sensor-1", vec![1, 2, 3]);
1210
1211        // Route same packet twice - should NOT be dropped (dedup disabled)
1212        let decision1 = router.route(&packet, &state, "this-node");
1213        let decision2 = router.route(&packet, &state, "this-node");
1214
1215        // Both should route normally (ConsumeAndForward)
1216        assert!(matches!(
1217            decision1,
1218            RoutingDecision::ConsumeAndForward { .. }
1219        ));
1220        assert!(matches!(
1221            decision2,
1222            RoutingDecision::ConsumeAndForward { .. }
1223        ));
1224        assert_eq!(router.dedup_cache_size(), 0);
1225    }
1226
1227    #[test]
1228    fn test_deduplication_enabled() {
1229        let router = SelectiveRouter::new_with_deduplication(DeduplicationConfig::default());
1230        let state = create_test_state(HierarchyLevel::Squad, NodeRole::Member, true, 0, 0);
1231        let packet = DataPacket::telemetry("sensor-1", vec![1, 2, 3]);
1232
1233        // First route should succeed
1234        let decision1 = router.route(&packet, &state, "this-node");
1235        assert!(matches!(
1236            decision1,
1237            RoutingDecision::ConsumeAndForward { .. }
1238        ));
1239        assert_eq!(router.dedup_cache_size(), 1);
1240
1241        // Second route of same packet should be dropped
1242        let decision2 = router.route(&packet, &state, "this-node");
1243        assert_eq!(decision2, RoutingDecision::Drop);
1244        assert_eq!(router.dedup_cache_size(), 1); // No new entry added
1245    }
1246
1247    #[test]
1248    fn test_deduplication_different_packets() {
1249        let router = SelectiveRouter::new_with_deduplication(DeduplicationConfig::default());
1250        let state = create_test_state(HierarchyLevel::Squad, NodeRole::Member, true, 0, 0);
1251        let packet1 = DataPacket::telemetry("sensor-1", vec![1, 2, 3]);
1252        let packet2 = DataPacket::telemetry("sensor-2", vec![4, 5, 6]);
1253
1254        // Route two different packets - both should succeed
1255        let decision1 = router.route(&packet1, &state, "this-node");
1256        let decision2 = router.route(&packet2, &state, "this-node");
1257
1258        assert!(matches!(
1259            decision1,
1260            RoutingDecision::ConsumeAndForward { .. }
1261        ));
1262        assert!(matches!(
1263            decision2,
1264            RoutingDecision::ConsumeAndForward { .. }
1265        ));
1266        assert_eq!(router.dedup_cache_size(), 2);
1267    }
1268
1269    #[test]
1270    fn test_deduplication_cache_clear() {
1271        let router = SelectiveRouter::new_with_deduplication(DeduplicationConfig::default());
1272        let state = create_test_state(HierarchyLevel::Squad, NodeRole::Member, true, 0, 0);
1273        let packet = DataPacket::telemetry("sensor-1", vec![1, 2, 3]);
1274
1275        // Route packet
1276        let _ = router.route(&packet, &state, "this-node");
1277        assert_eq!(router.dedup_cache_size(), 1);
1278
1279        // Clear cache
1280        router.clear_dedup_cache();
1281        assert_eq!(router.dedup_cache_size(), 0);
1282
1283        // Should be able to route same packet again
1284        let decision = router.route(&packet, &state, "this-node");
1285        assert!(matches!(
1286            decision,
1287            RoutingDecision::ConsumeAndForward { .. }
1288        ));
1289    }
1290
1291    #[test]
1292    fn test_deduplication_config_defaults() {
1293        let config = DeduplicationConfig::default();
1294        assert!(config.enabled);
1295        assert_eq!(config.ttl, Duration::from_secs(300));
1296        assert_eq!(config.max_entries, 10000);
1297    }
1298
1299    #[test]
1300    fn test_verbose_router() {
1301        let router = SelectiveRouter::new_verbose();
1302        let state = create_test_state(HierarchyLevel::Squad, NodeRole::Member, true, 0, 0);
1303        let packet = DataPacket::telemetry("sensor-1", vec![1, 2, 3]);
1304
1305        // Should work the same as non-verbose, just with logging
1306        let decision = router.route(&packet, &state, "this-node");
1307        assert!(matches!(
1308            decision,
1309            RoutingDecision::ConsumeAndForward { .. }
1310        ));
1311    }
1312
1313    #[test]
1314    fn test_verbose_max_hops_drop() {
1315        let router = SelectiveRouter::new_verbose();
1316        let state = create_test_state(HierarchyLevel::Squad, NodeRole::Member, true, 0, 0);
1317        let mut packet = DataPacket::telemetry("sensor-1", vec![1, 2, 3]);
1318        for _ in 0..10 {
1319            packet.increment_hop();
1320        }
1321        let decision = router.route(&packet, &state, "this-node");
1322        assert_eq!(decision, RoutingDecision::Drop);
1323    }
1324
1325    #[test]
1326    fn test_verbose_own_packet_drop() {
1327        let router = SelectiveRouter::new_verbose();
1328        let state = create_test_state(HierarchyLevel::Squad, NodeRole::Member, true, 0, 0);
1329        let packet = DataPacket::telemetry("this-node", vec![1, 2, 3]);
1330        let decision = router.route(&packet, &state, "this-node");
1331        assert_eq!(decision, RoutingDecision::Drop);
1332    }
1333
1334    #[test]
1335    fn test_verbose_consume_only() {
1336        let router = SelectiveRouter::new_verbose();
1337        let state = create_test_state(HierarchyLevel::Company, NodeRole::Leader, false, 3, 0);
1338        let packet = DataPacket::telemetry("sensor-1", vec![1, 2, 3]);
1339        let decision = router.route(&packet, &state, "hq-node");
1340        assert_eq!(decision, RoutingDecision::Consume);
1341    }
1342
1343    #[test]
1344    fn test_verbose_consume_and_forward() {
1345        let router = SelectiveRouter::new_verbose();
1346        let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Leader, true, 3, 0);
1347        let packet = DataPacket::telemetry("sensor-1", vec![1, 2, 3]);
1348        let decision = router.route(&packet, &state, "platoon-leader");
1349        assert!(matches!(
1350            decision,
1351            RoutingDecision::ConsumeAndForward { .. }
1352        ));
1353    }
1354
1355    #[test]
1356    fn test_verbose_consume_and_multicast() {
1357        let router = SelectiveRouter::new_verbose();
1358        let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Leader, true, 4, 0);
1359        let packet = DataPacket::command("hq", "platoon-leader", vec![4, 5, 6]);
1360        let decision = router.route(&packet, &state, "platoon-leader");
1361        assert!(matches!(
1362            decision,
1363            RoutingDecision::ConsumeAndForwardMulticast { .. }
1364        ));
1365    }
1366
1367    #[test]
1368    fn test_verbose_forward_multicast() {
1369        let router = SelectiveRouter::new_verbose();
1370        // Member (not leader) with lateral peers and broadcast lateral packet
1371        let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Member, true, 3, 0);
1372        // Downward broadcast from hq - member doesn't consume, should forward to children
1373        let packet = DataPacket {
1374            packet_id: uuid::Uuid::new_v4().to_string(),
1375            source_node_id: "hq".to_string(),
1376            destination_node_id: None,
1377            data_type: DataType::Command,
1378            direction: DataDirection::Downward,
1379            hop_count: 0,
1380            max_hops: 10,
1381            payload: vec![4, 5, 6],
1382        };
1383        let decision = router.route(&packet, &state, "member-node");
1384        assert!(matches!(decision, RoutingDecision::ForwardMulticast { .. }));
1385    }
1386
1387    #[test]
1388    fn test_verbose_forward_unicast() {
1389        let router = SelectiveRouter::new_verbose();
1390        let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Member, true, 1, 0);
1391        let packet = DataPacket {
1392            packet_id: uuid::Uuid::new_v4().to_string(),
1393            source_node_id: "hq".to_string(),
1394            destination_node_id: None,
1395            data_type: DataType::Command,
1396            direction: DataDirection::Downward,
1397            hop_count: 0,
1398            max_hops: 10,
1399            payload: vec![4, 5, 6],
1400        };
1401        let decision = router.route(&packet, &state, "member-node");
1402        assert!(matches!(decision, RoutingDecision::Forward { .. }));
1403    }
1404
1405    #[test]
1406    fn test_verbose_forward_no_next_hop_drop() {
1407        let router = SelectiveRouter::new_verbose();
1408        // Member with lateral peer but packet directed to unknown lateral
1409        let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Member, false, 0, 1);
1410        let packet = DataPacket {
1411            packet_id: uuid::Uuid::new_v4().to_string(),
1412            source_node_id: "other".to_string(),
1413            destination_node_id: Some("lateral-peer-0".to_string()),
1414            data_type: DataType::Coordination,
1415            direction: DataDirection::Lateral,
1416            hop_count: 0,
1417            max_hops: 3,
1418            payload: vec![7, 8, 9],
1419        };
1420        // Member doesn't consume lateral (not addressed to us), but does forward
1421        let decision = router.route(&packet, &state, "member-node");
1422        assert!(matches!(decision, RoutingDecision::Forward { .. }));
1423    }
1424
1425    #[test]
1426    fn test_verbose_drop_not_for_us() {
1427        let router = SelectiveRouter::new_verbose();
1428        let state = create_test_state(HierarchyLevel::Squad, NodeRole::Member, false, 0, 0);
1429        // Lateral packet not addressed to us, no lateral peers
1430        let packet = DataPacket {
1431            packet_id: uuid::Uuid::new_v4().to_string(),
1432            source_node_id: "other".to_string(),
1433            destination_node_id: Some("someone-else".to_string()),
1434            data_type: DataType::Coordination,
1435            direction: DataDirection::Lateral,
1436            hop_count: 0,
1437            max_hops: 3,
1438            payload: vec![7, 8, 9],
1439        };
1440        let decision = router.route(&packet, &state, "member-node");
1441        assert_eq!(decision, RoutingDecision::Drop);
1442    }
1443
1444    #[test]
1445    fn test_verbose_dedup_drop() {
1446        let config = DeduplicationConfig {
1447            enabled: true,
1448            ttl: Duration::from_secs(300),
1449            max_entries: 100,
1450        };
1451        let router = SelectiveRouter {
1452            verbose: true,
1453            dedup_config: config,
1454            seen_packets: Arc::new(RwLock::new(HashMap::new())),
1455        };
1456        let state = create_test_state(HierarchyLevel::Squad, NodeRole::Member, true, 0, 0);
1457        let packet = DataPacket::telemetry("sensor-1", vec![1, 2, 3]);
1458
1459        let _ = router.route(&packet, &state, "this-node");
1460        let decision2 = router.route(&packet, &state, "this-node");
1461        assert_eq!(decision2, RoutingDecision::Drop);
1462    }
1463
1464    #[test]
1465    fn test_forward_only_no_consume_member_downward() {
1466        // Member with children receiving non-addressed broadcast command
1467        let router = SelectiveRouter::new();
1468        let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Member, true, 2, 0);
1469        let packet = DataPacket {
1470            packet_id: uuid::Uuid::new_v4().to_string(),
1471            source_node_id: "hq".to_string(),
1472            destination_node_id: None,
1473            data_type: DataType::Command,
1474            direction: DataDirection::Downward,
1475            hop_count: 0,
1476            max_hops: 10,
1477            payload: vec![4, 5, 6],
1478        };
1479        let decision = router.route(&packet, &state, "member-node");
1480        // Member doesn't consume broadcast commands, just forwards
1481        assert!(matches!(decision, RoutingDecision::ForwardMulticast { .. }));
1482    }
1483
1484    #[test]
1485    fn test_should_forward_no_next_hop_returns_drop() {
1486        let router = SelectiveRouter::new();
1487        // Member with no linked peers, no lateral peers, downward packet addressed to unknown
1488        let state = create_test_state(HierarchyLevel::Squad, NodeRole::Member, false, 0, 0);
1489        let packet = DataPacket {
1490            packet_id: uuid::Uuid::new_v4().to_string(),
1491            source_node_id: "hq".to_string(),
1492            destination_node_id: None,
1493            data_type: DataType::Command,
1494            direction: DataDirection::Downward,
1495            hop_count: 0,
1496            max_hops: 10,
1497            payload: vec![4, 5, 6],
1498        };
1499        let decision = router.route(&packet, &state, "squad-member");
1500        assert_eq!(decision, RoutingDecision::Drop);
1501    }
1502
1503    #[test]
1504    fn test_next_hop_downward_targeted_not_found() {
1505        let router = SelectiveRouter::new();
1506        let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Leader, true, 3, 0);
1507        // Addressed to a peer not in linked_peers
1508        let packet = DataPacket::command("hq", "unknown-child", vec![4, 5, 6]);
1509
1510        let next_hop = router.next_hop(&packet, &state);
1511        // Should return first linked peer as fallback
1512        assert!(next_hop.is_some());
1513        assert!(next_hop.unwrap().starts_with("linked-peer-"));
1514    }
1515
1516    #[test]
1517    fn test_next_hop_lateral_unknown_peer() {
1518        let router = SelectiveRouter::new();
1519        let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Leader, true, 0, 3);
1520        let packet = DataPacket::coordination("source", "unknown-lateral", vec![7, 8, 9]);
1521
1522        let next_hop = router.next_hop(&packet, &state);
1523        // Should return first lateral peer as fallback
1524        assert!(next_hop.is_some());
1525        assert!(next_hop.unwrap().starts_with("lateral-peer-"));
1526    }
1527
1528    #[test]
1529    fn test_default_router() {
1530        let router = SelectiveRouter::default();
1531        assert_eq!(router.dedup_cache_size(), 0);
1532    }
1533
1534    #[test]
1535    fn test_deduplication_max_entries_eviction() {
1536        let config = DeduplicationConfig {
1537            enabled: true,
1538            ttl: Duration::from_secs(300),
1539            max_entries: 3, // Very small for testing
1540        };
1541        let router = SelectiveRouter::new_with_deduplication(config);
1542        let state = create_test_state(HierarchyLevel::Squad, NodeRole::Member, true, 0, 0);
1543
1544        // Route 5 packets
1545        for i in 0..5 {
1546            let packet = DataPacket::telemetry(format!("sensor-{}", i), vec![i as u8]);
1547            let _ = router.route(&packet, &state, "this-node");
1548        }
1549
1550        // Cache should be limited to max_entries
1551        assert!(router.dedup_cache_size() <= 3);
1552    }
1553}