1use super::packet::{DataDirection, DataPacket};
15use crate::beacon::HierarchyLevel;
16use crate::hierarchy::NodeRole;
17use crate::topology::TopologyState;
18use std::collections::HashMap;
19use std::sync::{Arc, RwLock};
20use std::time::{Duration, Instant};
21use tracing::{debug, trace, warn};
22
23#[derive(Debug, Clone, PartialEq)]
25pub enum RoutingDecision {
26 Consume,
28
29 Forward { next_hop: String },
31
32 ConsumeAndForward { next_hop: String },
34
35 ForwardMulticast { next_hops: Vec<String> },
37
38 ConsumeAndForwardMulticast { next_hops: Vec<String> },
40
41 Drop,
43}
44
45#[derive(Debug, Clone)]
47pub struct DeduplicationConfig {
48 pub enabled: bool,
50 pub ttl: Duration,
52 pub max_entries: usize,
54}
55
56impl Default for DeduplicationConfig {
57 fn default() -> Self {
58 Self {
59 enabled: true,
60 ttl: Duration::from_secs(300), max_entries: 10000,
62 }
63 }
64}
65
66#[derive(Debug, Clone)]
68struct DeduplicationEntry {
69 first_seen: Instant,
71}
72
73pub struct SelectiveRouter {
104 verbose: bool,
106 dedup_config: DeduplicationConfig,
108 seen_packets: Arc<RwLock<HashMap<String, DeduplicationEntry>>>,
110}
111
112impl SelectiveRouter {
113 pub fn new() -> Self {
115 Self {
116 verbose: false,
117 dedup_config: DeduplicationConfig {
118 enabled: false,
119 ..Default::default()
120 },
121 seen_packets: Arc::new(RwLock::new(HashMap::new())),
122 }
123 }
124
125 pub fn new_verbose() -> Self {
127 Self {
128 verbose: true,
129 dedup_config: DeduplicationConfig {
130 enabled: false,
131 ..Default::default()
132 },
133 seen_packets: Arc::new(RwLock::new(HashMap::new())),
134 }
135 }
136
137 pub fn new_with_deduplication(config: DeduplicationConfig) -> Self {
139 Self {
140 verbose: false,
141 dedup_config: config,
142 seen_packets: Arc::new(RwLock::new(HashMap::new())),
143 }
144 }
145
146 fn is_duplicate(&self, packet_id: &str) -> bool {
151 if !self.dedup_config.enabled {
152 return false;
153 }
154
155 let now = Instant::now();
156
157 let mut cache = self.seen_packets.write().unwrap();
159
160 if let Some(entry) = cache.get(packet_id) {
162 if now.duration_since(entry.first_seen) < self.dedup_config.ttl {
163 if self.verbose {
164 debug!("Duplicate packet detected: {}", packet_id);
165 }
166 return true;
167 }
168 }
170
171 if cache.len() >= self.dedup_config.max_entries {
173 self.evict_expired(&mut cache, now);
174
175 if cache.len() >= self.dedup_config.max_entries {
177 if let Some(oldest_key) = cache
178 .iter()
179 .min_by_key(|(_, entry)| entry.first_seen)
180 .map(|(k, _)| k.clone())
181 {
182 cache.remove(&oldest_key);
183 }
184 }
185 }
186
187 cache.insert(
189 packet_id.to_string(),
190 DeduplicationEntry { first_seen: now },
191 );
192
193 false
194 }
195
196 fn evict_expired(&self, cache: &mut HashMap<String, DeduplicationEntry>, now: Instant) {
198 cache.retain(|_, entry| now.duration_since(entry.first_seen) < self.dedup_config.ttl);
199 }
200
201 pub fn dedup_cache_size(&self) -> usize {
203 self.seen_packets.read().unwrap().len()
204 }
205
206 pub fn clear_dedup_cache(&self) {
208 self.seen_packets.write().unwrap().clear();
209 }
210
211 pub fn route(
228 &self,
229 packet: &DataPacket,
230 state: &TopologyState,
231 this_node_id: &str,
232 ) -> RoutingDecision {
233 if self.is_duplicate(&packet.packet_id) {
235 if self.verbose {
236 debug!("Packet {} is a duplicate, dropping", packet.packet_id);
237 }
238 return RoutingDecision::Drop;
239 }
240
241 if packet.at_max_hops() {
243 if self.verbose {
244 warn!(
245 "Packet {} reached max hops ({}), dropping",
246 packet.packet_id, packet.max_hops
247 );
248 }
249 return RoutingDecision::Drop;
250 }
251
252 if packet.source_node_id == this_node_id {
254 if self.verbose {
255 trace!(
256 "Packet {} originated from us, not routing",
257 packet.packet_id
258 );
259 }
260 return RoutingDecision::Drop;
261 }
262
263 let should_consume = self.should_consume(packet, state, this_node_id);
264 let should_forward = self.should_forward(packet, state);
265
266 if should_consume && should_forward {
267 let next_hops = self.next_hops(packet, state);
269 if next_hops.is_empty() {
270 if self.verbose {
272 debug!("Packet {}: Consume only (no next hop)", packet.packet_id);
273 }
274 RoutingDecision::Consume
275 } else if next_hops.len() == 1 {
276 if self.verbose {
278 debug!(
279 "Packet {}: Consume and forward to {}",
280 packet.packet_id, next_hops[0]
281 );
282 }
283 RoutingDecision::ConsumeAndForward {
284 next_hop: next_hops[0].clone(),
285 }
286 } else {
287 if self.verbose {
289 debug!(
290 "Packet {}: Consume and multicast to {} peers",
291 packet.packet_id,
292 next_hops.len()
293 );
294 }
295 RoutingDecision::ConsumeAndForwardMulticast { next_hops }
296 }
297 } else if should_consume {
298 if self.verbose {
299 debug!("Packet {}: Consume only", packet.packet_id);
300 }
301 RoutingDecision::Consume
302 } else if should_forward {
303 let next_hops = self.next_hops(packet, state);
304 if next_hops.is_empty() {
305 if self.verbose {
306 warn!(
307 "Packet {}: Should forward but no next hop, dropping",
308 packet.packet_id
309 );
310 }
311 RoutingDecision::Drop
312 } else if next_hops.len() == 1 {
313 if self.verbose {
315 debug!("Packet {}: Forward to {}", packet.packet_id, next_hops[0]);
316 }
317 RoutingDecision::Forward {
318 next_hop: next_hops[0].clone(),
319 }
320 } else {
321 if self.verbose {
323 debug!(
324 "Packet {}: Multicast to {} peers",
325 packet.packet_id,
326 next_hops.len()
327 );
328 }
329 RoutingDecision::ForwardMulticast { next_hops }
330 }
331 } else {
332 if self.verbose {
333 debug!("Packet {}: Drop (not for us)", packet.packet_id);
334 }
335 RoutingDecision::Drop
336 }
337 }
338
339 pub fn should_consume(
364 &self,
365 packet: &DataPacket,
366 state: &TopologyState,
367 this_node_id: &str,
368 ) -> bool {
369 match packet.direction {
370 DataDirection::Upward => {
371 true
374 }
375
376 DataDirection::Downward => {
377 if let Some(ref dest) = packet.destination_node_id {
379 if dest == this_node_id {
380 return true;
381 }
382 }
383
384 matches!(state.role, NodeRole::Leader)
387 }
388
389 DataDirection::Lateral => {
390 if let Some(ref dest) = packet.destination_node_id {
392 dest == this_node_id
394 } else {
395 matches!(state.role, NodeRole::Leader)
397 }
398 }
399 }
400 }
401
402 pub fn should_forward(&self, packet: &DataPacket, state: &TopologyState) -> bool {
427 match packet.direction {
428 DataDirection::Upward => {
429 state.selected_peer.is_some()
431 }
432
433 DataDirection::Downward => {
434 !state.linked_peers.is_empty()
436 }
437
438 DataDirection::Lateral => {
439 if let Some(ref dest) = packet.destination_node_id {
441 state.lateral_peers.contains_key(dest)
442 } else {
443 matches!(state.role, NodeRole::Leader) && !state.lateral_peers.is_empty()
445 }
446 }
447 }
448 }
449
450 pub fn next_hop(&self, packet: &DataPacket, state: &TopologyState) -> Option<String> {
467 match packet.direction {
468 DataDirection::Upward => {
469 state
471 .selected_peer
472 .as_ref()
473 .map(|peer| peer.node_id.clone())
474 }
475
476 DataDirection::Downward => {
477 if let Some(ref dest) = packet.destination_node_id {
480 if state.linked_peers.contains_key(dest) {
481 return Some(dest.clone());
482 }
483 }
484
485 state.linked_peers.keys().next().cloned()
488 }
489
490 DataDirection::Lateral => {
491 if let Some(ref dest) = packet.destination_node_id {
493 if state.lateral_peers.contains_key(dest) {
495 return Some(dest.clone());
496 }
497 }
498
499 state.lateral_peers.keys().next().cloned()
501 }
502 }
503 }
504
505 pub fn next_hops(&self, packet: &DataPacket, state: &TopologyState) -> Vec<String> {
526 match packet.direction {
527 DataDirection::Upward => {
528 state
530 .selected_peer
531 .as_ref()
532 .map(|peer| vec![peer.node_id.clone()])
533 .unwrap_or_default()
534 }
535
536 DataDirection::Downward => {
537 if let Some(ref dest) = packet.destination_node_id {
540 if state.linked_peers.contains_key(dest) {
541 return vec![dest.clone()];
542 }
543 }
544
545 state.linked_peers.keys().cloned().collect()
547 }
548
549 DataDirection::Lateral => {
550 if let Some(ref dest) = packet.destination_node_id {
552 if state.lateral_peers.contains_key(dest) {
554 return vec![dest.clone()];
555 }
556 }
557
558 state.lateral_peers.keys().cloned().collect()
560 }
561 }
562 }
563
564 #[allow(dead_code)]
569 fn is_hq_level(&self, level: HierarchyLevel) -> bool {
570 matches!(level, HierarchyLevel::Company)
571 }
572
573 pub fn should_aggregate(
628 &self,
629 packet: &DataPacket,
630 decision: &RoutingDecision,
631 state: &TopologyState,
632 ) -> bool {
633 if !matches!(decision, RoutingDecision::ConsumeAndForward { .. }) {
635 return false;
636 }
637
638 if !packet.data_type.requires_aggregation() {
640 return false;
641 }
642
643 matches!(state.role, NodeRole::Leader)
645 }
646}
647
648impl Default for SelectiveRouter {
649 fn default() -> Self {
650 Self::new()
651 }
652}
653
654#[cfg(test)]
655mod tests {
656 use super::*;
657 use crate::beacon::{GeoPosition, GeographicBeacon};
658 use crate::routing::packet::{DataDirection, DataType};
659 use crate::topology::SelectedPeer;
660 use std::collections::HashMap;
661 use std::time::Instant;
662
663 fn create_test_state(
664 hierarchy_level: HierarchyLevel,
665 role: NodeRole,
666 has_selected_peer: bool,
667 num_linked_peers: usize,
668 num_lateral_peers: usize,
669 ) -> TopologyState {
670 let selected_peer = if has_selected_peer {
671 Some(SelectedPeer {
672 node_id: "parent-node".to_string(),
673 beacon: GeographicBeacon::new(
674 "parent-node".to_string(),
675 GeoPosition::new(37.7749, -122.4194),
676 HierarchyLevel::Platoon,
677 ),
678 selected_at: Instant::now(),
679 })
680 } else {
681 None
682 };
683
684 let mut linked_peers = HashMap::new();
685 for i in 0..num_linked_peers {
686 linked_peers.insert(format!("linked-peer-{}", i), Instant::now());
687 }
688
689 let mut lateral_peers = HashMap::new();
690 for i in 0..num_lateral_peers {
691 lateral_peers.insert(format!("lateral-peer-{}", i), Instant::now());
692 }
693
694 TopologyState {
695 selected_peer,
696 linked_peers,
697 lateral_peers,
698 role,
699 hierarchy_level,
700 }
701 }
702
703 #[test]
704 fn test_upward_telemetry_leaf_node() {
705 let router = SelectiveRouter::new();
706 let state = create_test_state(HierarchyLevel::Squad, NodeRole::Member, true, 0, 0);
707 let packet = DataPacket::telemetry("sensor-1", vec![1, 2, 3]);
708
709 assert!(router.should_consume(&packet, &state, "this-node"));
711
712 assert!(router.should_forward(&packet, &state));
714
715 let next_hop = router.next_hop(&packet, &state);
717 assert_eq!(next_hop, Some("parent-node".to_string()));
718 }
719
720 #[test]
721 fn test_upward_telemetry_hq_node() {
722 let router = SelectiveRouter::new();
723 let state = create_test_state(HierarchyLevel::Company, NodeRole::Leader, false, 3, 0);
725 let packet = DataPacket::telemetry("sensor-1", vec![1, 2, 3]);
726
727 assert!(router.should_consume(&packet, &state, "hq-node"));
729
730 assert!(!router.should_forward(&packet, &state));
732 }
733
734 #[test]
735 fn test_downward_command_to_leader() {
736 let router = SelectiveRouter::new();
737 let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Leader, true, 3, 0);
738 let packet = DataPacket::command("hq", "platoon-leader", vec![4, 5, 6]);
739
740 assert!(router.should_consume(&packet, &state, "platoon-leader"));
742
743 assert!(router.should_forward(&packet, &state));
745
746 let next_hop = router.next_hop(&packet, &state);
748 assert!(next_hop.is_some());
749 assert!(next_hop.unwrap().starts_with("linked-peer-"));
750 }
751
752 #[test]
753 fn test_downward_command_to_leaf() {
754 let router = SelectiveRouter::new();
755 let state = create_test_state(HierarchyLevel::Squad, NodeRole::Member, true, 0, 0);
756 let packet = DataPacket::command("hq", "squad-member", vec![4, 5, 6]);
757
758 assert!(router.should_consume(&packet, &state, "squad-member"));
760
761 assert!(!router.should_forward(&packet, &state));
763 }
764
765 #[test]
766 fn test_lateral_coordination_between_leaders() {
767 let router = SelectiveRouter::new();
768 let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Leader, true, 2, 3);
769 let packet = DataPacket::coordination("platoon-1", "lateral-peer-0", vec![7, 8, 9]);
770
771 assert!(!router.should_consume(&packet, &state, "platoon-3"));
773
774 let state_with_target =
776 create_test_state(HierarchyLevel::Platoon, NodeRole::Leader, true, 2, 3);
777 assert!(router.should_forward(&packet, &state_with_target));
778 }
779
780 #[test]
781 fn test_max_hops_drop() {
782 let router = SelectiveRouter::new();
783 let state = create_test_state(HierarchyLevel::Squad, NodeRole::Member, true, 0, 0);
784 let mut packet = DataPacket::telemetry("sensor-1", vec![1, 2, 3]);
785
786 for _ in 0..10 {
788 packet.increment_hop();
789 }
790
791 let decision = router.route(&packet, &state, "this-node");
793 assert_eq!(decision, RoutingDecision::Drop);
794 }
795
796 #[test]
797 fn test_routing_decision_consume_and_forward() {
798 let router = SelectiveRouter::new();
799 let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Leader, true, 3, 0);
801 let packet = DataPacket::telemetry("sensor-1", vec![1, 2, 3]);
802
803 let decision = router.route(&packet, &state, "platoon-leader");
804
805 match decision {
807 RoutingDecision::ConsumeAndForward { next_hop } => {
808 assert_eq!(next_hop, "parent-node");
809 }
810 _ => panic!("Expected ConsumeAndForward, got {:?}", decision),
811 }
812 }
813
814 #[test]
815 fn test_routing_decision_consume_only() {
816 let router = SelectiveRouter::new();
817 let state = create_test_state(HierarchyLevel::Company, NodeRole::Leader, false, 3, 0);
819 let packet = DataPacket::telemetry("sensor-1", vec![1, 2, 3]);
820
821 let decision = router.route(&packet, &state, "hq-node");
822
823 assert_eq!(decision, RoutingDecision::Consume);
825 }
826
827 #[test]
828 fn test_dont_route_own_packets() {
829 let router = SelectiveRouter::new();
830 let state = create_test_state(HierarchyLevel::Squad, NodeRole::Member, true, 0, 0);
831 let packet = DataPacket::telemetry("this-node", vec![1, 2, 3]);
832
833 let decision = router.route(&packet, &state, "this-node");
835 assert_eq!(decision, RoutingDecision::Drop);
836 }
837
838 #[test]
839 fn test_should_aggregate_intermediate_leader() {
840 let router = SelectiveRouter::new();
841 let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Leader, true, 3, 0);
843 let packet = DataPacket::telemetry("squad-member-1", vec![1, 2, 3]);
844
845 let decision = router.route(&packet, &state, "platoon-leader");
846
847 assert!(router.should_aggregate(&packet, &decision, &state));
849 }
850
851 #[test]
852 fn test_should_not_aggregate_hq_node() {
853 let router = SelectiveRouter::new();
854 let state = create_test_state(HierarchyLevel::Company, NodeRole::Leader, false, 3, 0);
856 let packet = DataPacket::telemetry("platoon-1", vec![1, 2, 3]);
857
858 let decision = router.route(&packet, &state, "hq-node");
859
860 assert!(!router.should_aggregate(&packet, &decision, &state));
862 }
863
864 #[test]
865 fn test_should_not_aggregate_non_leader() {
866 let router = SelectiveRouter::new();
867 let state = create_test_state(HierarchyLevel::Squad, NodeRole::Member, true, 0, 0);
869 let packet = DataPacket::telemetry("sensor-1", vec![1, 2, 3]);
870
871 let decision = router.route(&packet, &state, "squad-member");
872
873 assert!(!router.should_aggregate(&packet, &decision, &state));
875 }
876
877 #[test]
878 fn test_should_not_aggregate_command_packet() {
879 let router = SelectiveRouter::new();
880 let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Leader, true, 3, 0);
882 let packet = DataPacket::command("hq", "platoon-leader", vec![4, 5, 6]);
883
884 let decision = router.route(&packet, &state, "platoon-leader");
885
886 assert!(!router.should_aggregate(&packet, &decision, &state));
888 }
889
890 #[test]
895 fn test_next_hops_upward_single_parent() {
896 let router = SelectiveRouter::new();
897 let state = create_test_state(HierarchyLevel::Squad, NodeRole::Member, true, 0, 0);
898 let packet = DataPacket::telemetry("sensor-1", vec![1, 2, 3]);
899
900 let next_hops = router.next_hops(&packet, &state);
902 assert_eq!(next_hops.len(), 1);
903 assert_eq!(next_hops[0], "parent-node");
904 }
905
906 #[test]
907 fn test_next_hops_upward_no_parent() {
908 let router = SelectiveRouter::new();
909 let state = create_test_state(HierarchyLevel::Company, NodeRole::Leader, false, 3, 0);
911 let packet = DataPacket::telemetry("sensor-1", vec![1, 2, 3]);
912
913 let next_hops = router.next_hops(&packet, &state);
915 assert_eq!(next_hops.len(), 0);
916 }
917
918 #[test]
919 fn test_next_hops_downward_multicast() {
920 let router = SelectiveRouter::new();
921 let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Leader, true, 5, 0);
923 let packet = DataPacket {
925 packet_id: uuid::Uuid::new_v4().to_string(),
926 source_node_id: "hq".to_string(),
927 destination_node_id: None, data_type: DataType::Command,
929 direction: DataDirection::Downward,
930 hop_count: 0,
931 max_hops: 10,
932 payload: vec![4, 5, 6],
933 };
934
935 let next_hops = router.next_hops(&packet, &state);
937 assert_eq!(next_hops.len(), 5);
938 for i in 0..5 {
939 assert!(next_hops.contains(&format!("linked-peer-{}", i)));
940 }
941 }
942
943 #[test]
944 fn test_next_hops_downward_targeted() {
945 let router = SelectiveRouter::new();
946 let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Leader, true, 3, 0);
948 let packet = DataPacket::command("hq", "linked-peer-1", vec![4, 5, 6]);
949
950 let next_hops = router.next_hops(&packet, &state);
952 assert_eq!(next_hops.len(), 1);
953 assert_eq!(next_hops[0], "linked-peer-1");
954 }
955
956 #[test]
957 fn test_next_hops_lateral_multicast() {
958 let router = SelectiveRouter::new();
959 let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Leader, true, 2, 4);
961 let packet = DataPacket {
963 packet_id: uuid::Uuid::new_v4().to_string(),
964 source_node_id: "platoon-1".to_string(),
965 destination_node_id: None, data_type: DataType::Coordination,
967 direction: DataDirection::Lateral,
968 hop_count: 0,
969 max_hops: 3,
970 payload: vec![7, 8, 9],
971 };
972
973 let next_hops = router.next_hops(&packet, &state);
975 assert_eq!(next_hops.len(), 4);
976 for i in 0..4 {
977 assert!(next_hops.contains(&format!("lateral-peer-{}", i)));
978 }
979 }
980
981 #[test]
982 fn test_next_hops_lateral_targeted() {
983 let router = SelectiveRouter::new();
984 let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Leader, true, 2, 3);
986 let packet = DataPacket::coordination("platoon-1", "lateral-peer-2", vec![7, 8, 9]);
987
988 let next_hops = router.next_hops(&packet, &state);
990 assert_eq!(next_hops.len(), 1);
991 assert_eq!(next_hops[0], "lateral-peer-2");
992 }
993
994 #[test]
995 fn test_route_downward_multicast() {
996 let router = SelectiveRouter::new();
997 let state = create_test_state(HierarchyLevel::Company, NodeRole::Leader, false, 3, 0);
999 let packet = DataPacket {
1001 packet_id: uuid::Uuid::new_v4().to_string(),
1002 source_node_id: "hq".to_string(),
1003 destination_node_id: None, data_type: DataType::Command,
1005 direction: DataDirection::Downward,
1006 hop_count: 0,
1007 max_hops: 10,
1008 payload: vec![4, 5, 6],
1009 };
1010
1011 let decision = router.route(&packet, &state, "hq-node");
1012
1013 match decision {
1015 RoutingDecision::ConsumeAndForwardMulticast { next_hops } => {
1016 assert_eq!(next_hops.len(), 3);
1017 assert!(next_hops.contains(&"linked-peer-0".to_string()));
1018 assert!(next_hops.contains(&"linked-peer-1".to_string()));
1019 assert!(next_hops.contains(&"linked-peer-2".to_string()));
1020 }
1021 _ => panic!("Expected ConsumeAndForwardMulticast, got {:?}", decision),
1022 }
1023 }
1024
1025 #[test]
1026 fn test_route_downward_consume_and_multicast() {
1027 let router = SelectiveRouter::new();
1028 let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Leader, true, 4, 0);
1030 let packet = DataPacket::command("hq", "platoon-leader", vec![4, 5, 6]);
1031
1032 let decision = router.route(&packet, &state, "platoon-leader");
1033
1034 match decision {
1036 RoutingDecision::ConsumeAndForwardMulticast { next_hops } => {
1037 assert_eq!(next_hops.len(), 4);
1038 for i in 0..4 {
1039 assert!(next_hops.contains(&format!("linked-peer-{}", i)));
1040 }
1041 }
1042 _ => panic!("Expected ConsumeAndForwardMulticast, got {:?}", decision),
1043 }
1044 }
1045
1046 #[test]
1047 fn test_route_lateral_multicast() {
1048 let router = SelectiveRouter::new();
1049 let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Leader, true, 0, 3);
1051 let packet = DataPacket {
1053 packet_id: uuid::Uuid::new_v4().to_string(),
1054 source_node_id: "platoon-1".to_string(),
1055 destination_node_id: None, data_type: DataType::Coordination,
1057 direction: DataDirection::Lateral,
1058 hop_count: 0,
1059 max_hops: 3,
1060 payload: vec![7, 8, 9],
1061 };
1062
1063 let decision = router.route(&packet, &state, "platoon-3");
1064
1065 match decision {
1067 RoutingDecision::ConsumeAndForwardMulticast { next_hops } => {
1068 assert_eq!(next_hops.len(), 3);
1069 assert!(next_hops.contains(&"lateral-peer-0".to_string()));
1070 assert!(next_hops.contains(&"lateral-peer-1".to_string()));
1071 assert!(next_hops.contains(&"lateral-peer-2".to_string()));
1072 }
1073 _ => panic!("Expected ConsumeAndForwardMulticast, got {:?}", decision),
1074 }
1075 }
1076
1077 #[test]
1078 fn test_route_downward_single_child_unicast() {
1079 let router = SelectiveRouter::new();
1080 let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Leader, false, 1, 0);
1082 let packet = DataPacket {
1084 packet_id: uuid::Uuid::new_v4().to_string(),
1085 source_node_id: "hq".to_string(),
1086 destination_node_id: None, data_type: DataType::Command,
1088 direction: DataDirection::Downward,
1089 hop_count: 0,
1090 max_hops: 10,
1091 payload: vec![4, 5, 6],
1092 };
1093
1094 let decision = router.route(&packet, &state, "platoon-leader");
1095
1096 match decision {
1098 RoutingDecision::ConsumeAndForward { next_hop } => {
1099 assert_eq!(next_hop, "linked-peer-0");
1100 }
1101 _ => panic!("Expected ConsumeAndForward (unicast), got {:?}", decision),
1102 }
1103 }
1104
1105 #[test]
1106 fn test_route_lateral_single_peer_unicast() {
1107 let router = SelectiveRouter::new();
1108 let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Leader, true, 0, 1);
1110 let packet = DataPacket {
1112 packet_id: uuid::Uuid::new_v4().to_string(),
1113 source_node_id: "platoon-1".to_string(),
1114 destination_node_id: None, data_type: DataType::Coordination,
1116 direction: DataDirection::Lateral,
1117 hop_count: 0,
1118 max_hops: 3,
1119 payload: vec![7, 8, 9],
1120 };
1121
1122 let decision = router.route(&packet, &state, "platoon-3");
1123
1124 match decision {
1126 RoutingDecision::ConsumeAndForward { next_hop } => {
1127 assert_eq!(next_hop, "lateral-peer-0");
1128 }
1129 _ => panic!("Expected ConsumeAndForward (unicast), got {:?}", decision),
1130 }
1131 }
1132
1133 #[test]
1134 fn test_route_downward_no_children_drop() {
1135 let router = SelectiveRouter::new();
1136 let state = create_test_state(HierarchyLevel::Squad, NodeRole::Member, true, 0, 0);
1138 let packet = DataPacket {
1140 packet_id: uuid::Uuid::new_v4().to_string(),
1141 source_node_id: "hq".to_string(),
1142 destination_node_id: None, data_type: DataType::Command,
1144 direction: DataDirection::Downward,
1145 hop_count: 0,
1146 max_hops: 10,
1147 payload: vec![4, 5, 6],
1148 };
1149
1150 let decision = router.route(&packet, &state, "squad-member");
1151
1152 assert_eq!(decision, RoutingDecision::Drop);
1155 }
1156
1157 #[test]
1158 fn test_multicast_preserves_backward_compatibility() {
1159 let router = SelectiveRouter::new();
1160 let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Leader, true, 3, 0);
1162 let packet = DataPacket::telemetry("sensor-1", vec![1, 2, 3]);
1163
1164 let decision = router.route(&packet, &state, "platoon-leader");
1165
1166 match decision {
1168 RoutingDecision::ConsumeAndForward { next_hop } => {
1169 assert_eq!(next_hop, "parent-node");
1170 }
1171 _ => panic!(
1172 "Expected ConsumeAndForward (backward compat), got {:?}",
1173 decision
1174 ),
1175 }
1176
1177 let next_hop = router.next_hop(&packet, &state);
1179 assert_eq!(next_hop, Some("parent-node".to_string()));
1180 }
1181
1182 #[test]
1187 fn test_deduplication_disabled_by_default() {
1188 let router = SelectiveRouter::new();
1189 let state = create_test_state(HierarchyLevel::Squad, NodeRole::Member, true, 0, 0);
1190 let packet = DataPacket::telemetry("sensor-1", vec![1, 2, 3]);
1191
1192 let decision1 = router.route(&packet, &state, "this-node");
1194 let decision2 = router.route(&packet, &state, "this-node");
1195
1196 assert!(matches!(
1198 decision1,
1199 RoutingDecision::ConsumeAndForward { .. }
1200 ));
1201 assert!(matches!(
1202 decision2,
1203 RoutingDecision::ConsumeAndForward { .. }
1204 ));
1205 assert_eq!(router.dedup_cache_size(), 0);
1206 }
1207
1208 #[test]
1209 fn test_deduplication_enabled() {
1210 let router = SelectiveRouter::new_with_deduplication(DeduplicationConfig::default());
1211 let state = create_test_state(HierarchyLevel::Squad, NodeRole::Member, true, 0, 0);
1212 let packet = DataPacket::telemetry("sensor-1", vec![1, 2, 3]);
1213
1214 let decision1 = router.route(&packet, &state, "this-node");
1216 assert!(matches!(
1217 decision1,
1218 RoutingDecision::ConsumeAndForward { .. }
1219 ));
1220 assert_eq!(router.dedup_cache_size(), 1);
1221
1222 let decision2 = router.route(&packet, &state, "this-node");
1224 assert_eq!(decision2, RoutingDecision::Drop);
1225 assert_eq!(router.dedup_cache_size(), 1); }
1227
1228 #[test]
1229 fn test_deduplication_different_packets() {
1230 let router = SelectiveRouter::new_with_deduplication(DeduplicationConfig::default());
1231 let state = create_test_state(HierarchyLevel::Squad, NodeRole::Member, true, 0, 0);
1232 let packet1 = DataPacket::telemetry("sensor-1", vec![1, 2, 3]);
1233 let packet2 = DataPacket::telemetry("sensor-2", vec![4, 5, 6]);
1234
1235 let decision1 = router.route(&packet1, &state, "this-node");
1237 let decision2 = router.route(&packet2, &state, "this-node");
1238
1239 assert!(matches!(
1240 decision1,
1241 RoutingDecision::ConsumeAndForward { .. }
1242 ));
1243 assert!(matches!(
1244 decision2,
1245 RoutingDecision::ConsumeAndForward { .. }
1246 ));
1247 assert_eq!(router.dedup_cache_size(), 2);
1248 }
1249
1250 #[test]
1251 fn test_deduplication_cache_clear() {
1252 let router = SelectiveRouter::new_with_deduplication(DeduplicationConfig::default());
1253 let state = create_test_state(HierarchyLevel::Squad, NodeRole::Member, true, 0, 0);
1254 let packet = DataPacket::telemetry("sensor-1", vec![1, 2, 3]);
1255
1256 let _ = router.route(&packet, &state, "this-node");
1258 assert_eq!(router.dedup_cache_size(), 1);
1259
1260 router.clear_dedup_cache();
1262 assert_eq!(router.dedup_cache_size(), 0);
1263
1264 let decision = router.route(&packet, &state, "this-node");
1266 assert!(matches!(
1267 decision,
1268 RoutingDecision::ConsumeAndForward { .. }
1269 ));
1270 }
1271
1272 #[test]
1273 fn test_deduplication_config_defaults() {
1274 let config = DeduplicationConfig::default();
1275 assert!(config.enabled);
1276 assert_eq!(config.ttl, Duration::from_secs(300));
1277 assert_eq!(config.max_entries, 10000);
1278 }
1279
1280 #[test]
1281 fn test_verbose_router() {
1282 let router = SelectiveRouter::new_verbose();
1283 let state = create_test_state(HierarchyLevel::Squad, NodeRole::Member, true, 0, 0);
1284 let packet = DataPacket::telemetry("sensor-1", vec![1, 2, 3]);
1285
1286 let decision = router.route(&packet, &state, "this-node");
1288 assert!(matches!(
1289 decision,
1290 RoutingDecision::ConsumeAndForward { .. }
1291 ));
1292 }
1293
1294 #[test]
1295 fn test_verbose_max_hops_drop() {
1296 let router = SelectiveRouter::new_verbose();
1297 let state = create_test_state(HierarchyLevel::Squad, NodeRole::Member, true, 0, 0);
1298 let mut packet = DataPacket::telemetry("sensor-1", vec![1, 2, 3]);
1299 for _ in 0..10 {
1300 packet.increment_hop();
1301 }
1302 let decision = router.route(&packet, &state, "this-node");
1303 assert_eq!(decision, RoutingDecision::Drop);
1304 }
1305
1306 #[test]
1307 fn test_verbose_own_packet_drop() {
1308 let router = SelectiveRouter::new_verbose();
1309 let state = create_test_state(HierarchyLevel::Squad, NodeRole::Member, true, 0, 0);
1310 let packet = DataPacket::telemetry("this-node", vec![1, 2, 3]);
1311 let decision = router.route(&packet, &state, "this-node");
1312 assert_eq!(decision, RoutingDecision::Drop);
1313 }
1314
1315 #[test]
1316 fn test_verbose_consume_only() {
1317 let router = SelectiveRouter::new_verbose();
1318 let state = create_test_state(HierarchyLevel::Company, NodeRole::Leader, false, 3, 0);
1319 let packet = DataPacket::telemetry("sensor-1", vec![1, 2, 3]);
1320 let decision = router.route(&packet, &state, "hq-node");
1321 assert_eq!(decision, RoutingDecision::Consume);
1322 }
1323
1324 #[test]
1325 fn test_verbose_consume_and_forward() {
1326 let router = SelectiveRouter::new_verbose();
1327 let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Leader, true, 3, 0);
1328 let packet = DataPacket::telemetry("sensor-1", vec![1, 2, 3]);
1329 let decision = router.route(&packet, &state, "platoon-leader");
1330 assert!(matches!(
1331 decision,
1332 RoutingDecision::ConsumeAndForward { .. }
1333 ));
1334 }
1335
1336 #[test]
1337 fn test_verbose_consume_and_multicast() {
1338 let router = SelectiveRouter::new_verbose();
1339 let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Leader, true, 4, 0);
1340 let packet = DataPacket::command("hq", "platoon-leader", vec![4, 5, 6]);
1341 let decision = router.route(&packet, &state, "platoon-leader");
1342 assert!(matches!(
1343 decision,
1344 RoutingDecision::ConsumeAndForwardMulticast { .. }
1345 ));
1346 }
1347
1348 #[test]
1349 fn test_verbose_forward_multicast() {
1350 let router = SelectiveRouter::new_verbose();
1351 let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Member, true, 3, 0);
1353 let packet = DataPacket {
1355 packet_id: uuid::Uuid::new_v4().to_string(),
1356 source_node_id: "hq".to_string(),
1357 destination_node_id: None,
1358 data_type: DataType::Command,
1359 direction: DataDirection::Downward,
1360 hop_count: 0,
1361 max_hops: 10,
1362 payload: vec![4, 5, 6],
1363 };
1364 let decision = router.route(&packet, &state, "member-node");
1365 assert!(matches!(decision, RoutingDecision::ForwardMulticast { .. }));
1366 }
1367
1368 #[test]
1369 fn test_verbose_forward_unicast() {
1370 let router = SelectiveRouter::new_verbose();
1371 let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Member, true, 1, 0);
1372 let packet = DataPacket {
1373 packet_id: uuid::Uuid::new_v4().to_string(),
1374 source_node_id: "hq".to_string(),
1375 destination_node_id: None,
1376 data_type: DataType::Command,
1377 direction: DataDirection::Downward,
1378 hop_count: 0,
1379 max_hops: 10,
1380 payload: vec![4, 5, 6],
1381 };
1382 let decision = router.route(&packet, &state, "member-node");
1383 assert!(matches!(decision, RoutingDecision::Forward { .. }));
1384 }
1385
1386 #[test]
1387 fn test_verbose_forward_no_next_hop_drop() {
1388 let router = SelectiveRouter::new_verbose();
1389 let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Member, false, 0, 1);
1391 let packet = DataPacket {
1392 packet_id: uuid::Uuid::new_v4().to_string(),
1393 source_node_id: "other".to_string(),
1394 destination_node_id: Some("lateral-peer-0".to_string()),
1395 data_type: DataType::Coordination,
1396 direction: DataDirection::Lateral,
1397 hop_count: 0,
1398 max_hops: 3,
1399 payload: vec![7, 8, 9],
1400 };
1401 let decision = router.route(&packet, &state, "member-node");
1403 assert!(matches!(decision, RoutingDecision::Forward { .. }));
1404 }
1405
1406 #[test]
1407 fn test_verbose_drop_not_for_us() {
1408 let router = SelectiveRouter::new_verbose();
1409 let state = create_test_state(HierarchyLevel::Squad, NodeRole::Member, false, 0, 0);
1410 let packet = DataPacket {
1412 packet_id: uuid::Uuid::new_v4().to_string(),
1413 source_node_id: "other".to_string(),
1414 destination_node_id: Some("someone-else".to_string()),
1415 data_type: DataType::Coordination,
1416 direction: DataDirection::Lateral,
1417 hop_count: 0,
1418 max_hops: 3,
1419 payload: vec![7, 8, 9],
1420 };
1421 let decision = router.route(&packet, &state, "member-node");
1422 assert_eq!(decision, RoutingDecision::Drop);
1423 }
1424
1425 #[test]
1426 fn test_verbose_dedup_drop() {
1427 let config = DeduplicationConfig {
1428 enabled: true,
1429 ttl: Duration::from_secs(300),
1430 max_entries: 100,
1431 };
1432 let router = SelectiveRouter {
1433 verbose: true,
1434 dedup_config: config,
1435 seen_packets: Arc::new(RwLock::new(HashMap::new())),
1436 };
1437 let state = create_test_state(HierarchyLevel::Squad, NodeRole::Member, true, 0, 0);
1438 let packet = DataPacket::telemetry("sensor-1", vec![1, 2, 3]);
1439
1440 let _ = router.route(&packet, &state, "this-node");
1441 let decision2 = router.route(&packet, &state, "this-node");
1442 assert_eq!(decision2, RoutingDecision::Drop);
1443 }
1444
1445 #[test]
1446 fn test_forward_only_no_consume_member_downward() {
1447 let router = SelectiveRouter::new();
1449 let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Member, true, 2, 0);
1450 let packet = DataPacket {
1451 packet_id: uuid::Uuid::new_v4().to_string(),
1452 source_node_id: "hq".to_string(),
1453 destination_node_id: None,
1454 data_type: DataType::Command,
1455 direction: DataDirection::Downward,
1456 hop_count: 0,
1457 max_hops: 10,
1458 payload: vec![4, 5, 6],
1459 };
1460 let decision = router.route(&packet, &state, "member-node");
1461 assert!(matches!(decision, RoutingDecision::ForwardMulticast { .. }));
1463 }
1464
1465 #[test]
1466 fn test_should_forward_no_next_hop_returns_drop() {
1467 let router = SelectiveRouter::new();
1468 let state = create_test_state(HierarchyLevel::Squad, NodeRole::Member, false, 0, 0);
1470 let packet = DataPacket {
1471 packet_id: uuid::Uuid::new_v4().to_string(),
1472 source_node_id: "hq".to_string(),
1473 destination_node_id: None,
1474 data_type: DataType::Command,
1475 direction: DataDirection::Downward,
1476 hop_count: 0,
1477 max_hops: 10,
1478 payload: vec![4, 5, 6],
1479 };
1480 let decision = router.route(&packet, &state, "squad-member");
1481 assert_eq!(decision, RoutingDecision::Drop);
1482 }
1483
1484 #[test]
1485 fn test_next_hop_downward_targeted_not_found() {
1486 let router = SelectiveRouter::new();
1487 let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Leader, true, 3, 0);
1488 let packet = DataPacket::command("hq", "unknown-child", vec![4, 5, 6]);
1490
1491 let next_hop = router.next_hop(&packet, &state);
1492 assert!(next_hop.is_some());
1494 assert!(next_hop.unwrap().starts_with("linked-peer-"));
1495 }
1496
1497 #[test]
1498 fn test_next_hop_lateral_unknown_peer() {
1499 let router = SelectiveRouter::new();
1500 let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Leader, true, 0, 3);
1501 let packet = DataPacket::coordination("source", "unknown-lateral", vec![7, 8, 9]);
1502
1503 let next_hop = router.next_hop(&packet, &state);
1504 assert!(next_hop.is_some());
1506 assert!(next_hop.unwrap().starts_with("lateral-peer-"));
1507 }
1508
1509 #[test]
1510 fn test_default_router() {
1511 let router = SelectiveRouter::default();
1512 assert_eq!(router.dedup_cache_size(), 0);
1513 }
1514
1515 #[test]
1516 fn test_deduplication_max_entries_eviction() {
1517 let config = DeduplicationConfig {
1518 enabled: true,
1519 ttl: Duration::from_secs(300),
1520 max_entries: 3, };
1522 let router = SelectiveRouter::new_with_deduplication(config);
1523 let state = create_test_state(HierarchyLevel::Squad, NodeRole::Member, true, 0, 0);
1524
1525 for i in 0..5 {
1527 let packet = DataPacket::telemetry(format!("sensor-{}", i), vec![i as u8]);
1528 let _ = router.route(&packet, &state, "this-node");
1529 }
1530
1531 assert!(router.dedup_cache_size() <= 3);
1533 }
1534}