1use super::packet::{DataDirection, DataPacket};
28use crate::beacon::HierarchyLevel;
29use crate::hierarchy::NodeRole;
30use crate::topology::TopologyState;
31use std::collections::HashMap;
32use std::sync::{Arc, RwLock};
33use std::time::{Duration, Instant};
34use tracing::{debug, trace, warn};
35
36#[derive(Debug, Clone, PartialEq)]
38pub enum RoutingDecision {
39 Consume,
41
42 Forward { next_hop: String },
44
45 ConsumeAndForward { next_hop: String },
47
48 ForwardMulticast { next_hops: Vec<String> },
50
51 ConsumeAndForwardMulticast { next_hops: Vec<String> },
53
54 Drop,
56}
57
58#[derive(Debug, Clone)]
60pub struct DeduplicationConfig {
61 pub enabled: bool,
63 pub ttl: Duration,
65 pub max_entries: usize,
67}
68
69impl Default for DeduplicationConfig {
70 fn default() -> Self {
71 Self {
72 enabled: true,
73 ttl: Duration::from_secs(300), max_entries: 10000,
75 }
76 }
77}
78
79#[derive(Debug, Clone)]
81struct DeduplicationEntry {
82 first_seen: Instant,
84}
85
86pub struct SelectiveRouter {
117 verbose: bool,
119 dedup_config: DeduplicationConfig,
121 seen_packets: Arc<RwLock<HashMap<String, DeduplicationEntry>>>,
123}
124
125impl SelectiveRouter {
126 pub fn new() -> Self {
128 Self {
129 verbose: false,
130 dedup_config: DeduplicationConfig {
131 enabled: false,
132 ..Default::default()
133 },
134 seen_packets: Arc::new(RwLock::new(HashMap::new())),
135 }
136 }
137
138 pub fn new_verbose() -> Self {
140 Self {
141 verbose: true,
142 dedup_config: DeduplicationConfig {
143 enabled: false,
144 ..Default::default()
145 },
146 seen_packets: Arc::new(RwLock::new(HashMap::new())),
147 }
148 }
149
150 pub fn new_with_deduplication(config: DeduplicationConfig) -> Self {
152 Self {
153 verbose: false,
154 dedup_config: config,
155 seen_packets: Arc::new(RwLock::new(HashMap::new())),
156 }
157 }
158
159 fn is_duplicate(&self, packet_id: &str) -> bool {
164 if !self.dedup_config.enabled {
165 return false;
166 }
167
168 let now = Instant::now();
169
170 let mut cache = self.seen_packets.write().unwrap_or_else(|e| e.into_inner());
172
173 if let Some(entry) = cache.get(packet_id) {
175 if now.duration_since(entry.first_seen) < self.dedup_config.ttl {
176 if self.verbose {
177 debug!("Duplicate packet detected: {}", packet_id);
178 }
179 return true;
180 }
181 }
183
184 if cache.len() >= self.dedup_config.max_entries {
186 self.evict_expired(&mut cache, now);
187
188 if cache.len() >= self.dedup_config.max_entries {
190 if let Some(oldest_key) = cache
191 .iter()
192 .min_by_key(|(_, entry)| entry.first_seen)
193 .map(|(k, _)| k.clone())
194 {
195 cache.remove(&oldest_key);
196 }
197 }
198 }
199
200 cache.insert(
202 packet_id.to_string(),
203 DeduplicationEntry { first_seen: now },
204 );
205
206 false
207 }
208
209 fn evict_expired(&self, cache: &mut HashMap<String, DeduplicationEntry>, now: Instant) {
211 cache.retain(|_, entry| now.duration_since(entry.first_seen) < self.dedup_config.ttl);
212 }
213
214 pub fn dedup_cache_size(&self) -> usize {
216 self.seen_packets
217 .read()
218 .unwrap_or_else(|e| e.into_inner())
219 .len()
220 }
221
222 pub fn clear_dedup_cache(&self) {
224 self.seen_packets
225 .write()
226 .unwrap_or_else(|e| e.into_inner())
227 .clear();
228 }
229
230 pub fn route(
247 &self,
248 packet: &DataPacket,
249 state: &TopologyState,
250 this_node_id: &str,
251 ) -> RoutingDecision {
252 if self.is_duplicate(&packet.packet_id) {
254 if self.verbose {
255 debug!("Packet {} is a duplicate, dropping", packet.packet_id);
256 }
257 return RoutingDecision::Drop;
258 }
259
260 if packet.at_max_hops() {
262 if self.verbose {
263 warn!(
264 "Packet {} reached max hops ({}), dropping",
265 packet.packet_id, packet.max_hops
266 );
267 }
268 return RoutingDecision::Drop;
269 }
270
271 if packet.source_node_id == this_node_id {
273 if self.verbose {
274 trace!(
275 "Packet {} originated from us, not routing",
276 packet.packet_id
277 );
278 }
279 return RoutingDecision::Drop;
280 }
281
282 let should_consume = self.should_consume(packet, state, this_node_id);
283 let should_forward = self.should_forward(packet, state);
284
285 if should_consume && should_forward {
286 let next_hops = self.next_hops(packet, state);
288 if next_hops.is_empty() {
289 if self.verbose {
291 debug!("Packet {}: Consume only (no next hop)", packet.packet_id);
292 }
293 RoutingDecision::Consume
294 } else if next_hops.len() == 1 {
295 if self.verbose {
297 debug!(
298 "Packet {}: Consume and forward to {}",
299 packet.packet_id, next_hops[0]
300 );
301 }
302 RoutingDecision::ConsumeAndForward {
303 next_hop: next_hops[0].clone(),
304 }
305 } else {
306 if self.verbose {
308 debug!(
309 "Packet {}: Consume and multicast to {} peers",
310 packet.packet_id,
311 next_hops.len()
312 );
313 }
314 RoutingDecision::ConsumeAndForwardMulticast { next_hops }
315 }
316 } else if should_consume {
317 if self.verbose {
318 debug!("Packet {}: Consume only", packet.packet_id);
319 }
320 RoutingDecision::Consume
321 } else if should_forward {
322 let next_hops = self.next_hops(packet, state);
323 if next_hops.is_empty() {
324 if self.verbose {
325 warn!(
326 "Packet {}: Should forward but no next hop, dropping",
327 packet.packet_id
328 );
329 }
330 RoutingDecision::Drop
331 } else if next_hops.len() == 1 {
332 if self.verbose {
334 debug!("Packet {}: Forward to {}", packet.packet_id, next_hops[0]);
335 }
336 RoutingDecision::Forward {
337 next_hop: next_hops[0].clone(),
338 }
339 } else {
340 if self.verbose {
342 debug!(
343 "Packet {}: Multicast to {} peers",
344 packet.packet_id,
345 next_hops.len()
346 );
347 }
348 RoutingDecision::ForwardMulticast { next_hops }
349 }
350 } else {
351 if self.verbose {
352 debug!("Packet {}: Drop (not for us)", packet.packet_id);
353 }
354 RoutingDecision::Drop
355 }
356 }
357
358 pub fn should_consume(
383 &self,
384 packet: &DataPacket,
385 state: &TopologyState,
386 this_node_id: &str,
387 ) -> bool {
388 match packet.direction {
389 DataDirection::Upward => {
390 true
393 }
394
395 DataDirection::Downward => {
396 if let Some(ref dest) = packet.destination_node_id {
398 if dest == this_node_id {
399 return true;
400 }
401 }
402
403 matches!(state.role, NodeRole::Leader)
406 }
407
408 DataDirection::Lateral => {
409 if let Some(ref dest) = packet.destination_node_id {
411 dest == this_node_id
413 } else {
414 matches!(state.role, NodeRole::Leader)
416 }
417 }
418 }
419 }
420
421 pub fn should_forward(&self, packet: &DataPacket, state: &TopologyState) -> bool {
446 match packet.direction {
447 DataDirection::Upward => {
448 state.selected_peer.is_some()
450 }
451
452 DataDirection::Downward => {
453 !state.linked_peers.is_empty()
455 }
456
457 DataDirection::Lateral => {
458 if let Some(ref dest) = packet.destination_node_id {
460 state.lateral_peers.contains_key(dest)
461 } else {
462 matches!(state.role, NodeRole::Leader) && !state.lateral_peers.is_empty()
464 }
465 }
466 }
467 }
468
469 pub fn next_hop(&self, packet: &DataPacket, state: &TopologyState) -> Option<String> {
486 match packet.direction {
487 DataDirection::Upward => {
488 state
490 .selected_peer
491 .as_ref()
492 .map(|peer| peer.node_id.clone())
493 }
494
495 DataDirection::Downward => {
496 if let Some(ref dest) = packet.destination_node_id {
499 if state.linked_peers.contains_key(dest) {
500 return Some(dest.clone());
501 }
502 }
503
504 state.linked_peers.keys().next().cloned()
507 }
508
509 DataDirection::Lateral => {
510 if let Some(ref dest) = packet.destination_node_id {
512 if state.lateral_peers.contains_key(dest) {
514 return Some(dest.clone());
515 }
516 }
517
518 state.lateral_peers.keys().next().cloned()
520 }
521 }
522 }
523
524 pub fn next_hops(&self, packet: &DataPacket, state: &TopologyState) -> Vec<String> {
545 match packet.direction {
546 DataDirection::Upward => {
547 state
549 .selected_peer
550 .as_ref()
551 .map(|peer| vec![peer.node_id.clone()])
552 .unwrap_or_default()
553 }
554
555 DataDirection::Downward => {
556 if let Some(ref dest) = packet.destination_node_id {
559 if state.linked_peers.contains_key(dest) {
560 return vec![dest.clone()];
561 }
562 }
563
564 state.linked_peers.keys().cloned().collect()
566 }
567
568 DataDirection::Lateral => {
569 if let Some(ref dest) = packet.destination_node_id {
571 if state.lateral_peers.contains_key(dest) {
573 return vec![dest.clone()];
574 }
575 }
576
577 state.lateral_peers.keys().cloned().collect()
579 }
580 }
581 }
582
583 #[allow(dead_code)]
588 fn is_hq_level(&self, level: HierarchyLevel) -> bool {
589 matches!(level, HierarchyLevel::Company)
590 }
591
592 pub fn should_aggregate(
647 &self,
648 packet: &DataPacket,
649 decision: &RoutingDecision,
650 state: &TopologyState,
651 ) -> bool {
652 if !matches!(decision, RoutingDecision::ConsumeAndForward { .. }) {
654 return false;
655 }
656
657 if !packet.data_type.requires_aggregation() {
659 return false;
660 }
661
662 matches!(state.role, NodeRole::Leader)
664 }
665}
666
667impl Default for SelectiveRouter {
668 fn default() -> Self {
669 Self::new()
670 }
671}
672
673#[cfg(test)]
674mod tests {
675 use super::*;
676 use crate::beacon::{GeoPosition, GeographicBeacon};
677 use crate::routing::packet::{DataDirection, DataType};
678 use crate::topology::SelectedPeer;
679 use std::collections::HashMap;
680 use std::time::Instant;
681
682 fn create_test_state(
683 hierarchy_level: HierarchyLevel,
684 role: NodeRole,
685 has_selected_peer: bool,
686 num_linked_peers: usize,
687 num_lateral_peers: usize,
688 ) -> TopologyState {
689 let selected_peer = if has_selected_peer {
690 Some(SelectedPeer {
691 node_id: "parent-node".to_string(),
692 beacon: GeographicBeacon::new(
693 "parent-node".to_string(),
694 GeoPosition::new(37.7749, -122.4194),
695 HierarchyLevel::Platoon,
696 ),
697 selected_at: Instant::now(),
698 })
699 } else {
700 None
701 };
702
703 let mut linked_peers = HashMap::new();
704 for i in 0..num_linked_peers {
705 linked_peers.insert(format!("linked-peer-{}", i), Instant::now());
706 }
707
708 let mut lateral_peers = HashMap::new();
709 for i in 0..num_lateral_peers {
710 lateral_peers.insert(format!("lateral-peer-{}", i), Instant::now());
711 }
712
713 TopologyState {
714 selected_peer,
715 linked_peers,
716 lateral_peers,
717 role,
718 hierarchy_level,
719 }
720 }
721
722 #[test]
723 fn test_upward_telemetry_leaf_node() {
724 let router = SelectiveRouter::new();
725 let state = create_test_state(HierarchyLevel::Squad, NodeRole::Member, true, 0, 0);
726 let packet = DataPacket::telemetry("sensor-1", vec![1, 2, 3]);
727
728 assert!(router.should_consume(&packet, &state, "this-node"));
730
731 assert!(router.should_forward(&packet, &state));
733
734 let next_hop = router.next_hop(&packet, &state);
736 assert_eq!(next_hop, Some("parent-node".to_string()));
737 }
738
739 #[test]
740 fn test_upward_telemetry_hq_node() {
741 let router = SelectiveRouter::new();
742 let state = create_test_state(HierarchyLevel::Company, NodeRole::Leader, false, 3, 0);
744 let packet = DataPacket::telemetry("sensor-1", vec![1, 2, 3]);
745
746 assert!(router.should_consume(&packet, &state, "hq-node"));
748
749 assert!(!router.should_forward(&packet, &state));
751 }
752
753 #[test]
754 fn test_downward_command_to_leader() {
755 let router = SelectiveRouter::new();
756 let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Leader, true, 3, 0);
757 let packet = DataPacket::command("hq", "platoon-leader", vec![4, 5, 6]);
758
759 assert!(router.should_consume(&packet, &state, "platoon-leader"));
761
762 assert!(router.should_forward(&packet, &state));
764
765 let next_hop = router.next_hop(&packet, &state);
767 assert!(next_hop.is_some());
768 assert!(next_hop.unwrap().starts_with("linked-peer-"));
769 }
770
771 #[test]
772 fn test_downward_command_to_leaf() {
773 let router = SelectiveRouter::new();
774 let state = create_test_state(HierarchyLevel::Squad, NodeRole::Member, true, 0, 0);
775 let packet = DataPacket::command("hq", "squad-member", vec![4, 5, 6]);
776
777 assert!(router.should_consume(&packet, &state, "squad-member"));
779
780 assert!(!router.should_forward(&packet, &state));
782 }
783
784 #[test]
785 fn test_lateral_coordination_between_leaders() {
786 let router = SelectiveRouter::new();
787 let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Leader, true, 2, 3);
788 let packet = DataPacket::coordination("platoon-1", "lateral-peer-0", vec![7, 8, 9]);
789
790 assert!(!router.should_consume(&packet, &state, "platoon-3"));
792
793 let state_with_target =
795 create_test_state(HierarchyLevel::Platoon, NodeRole::Leader, true, 2, 3);
796 assert!(router.should_forward(&packet, &state_with_target));
797 }
798
799 #[test]
800 fn test_max_hops_drop() {
801 let router = SelectiveRouter::new();
802 let state = create_test_state(HierarchyLevel::Squad, NodeRole::Member, true, 0, 0);
803 let mut packet = DataPacket::telemetry("sensor-1", vec![1, 2, 3]);
804
805 for _ in 0..10 {
807 packet.increment_hop();
808 }
809
810 let decision = router.route(&packet, &state, "this-node");
812 assert_eq!(decision, RoutingDecision::Drop);
813 }
814
815 #[test]
816 fn test_routing_decision_consume_and_forward() {
817 let router = SelectiveRouter::new();
818 let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Leader, true, 3, 0);
820 let packet = DataPacket::telemetry("sensor-1", vec![1, 2, 3]);
821
822 let decision = router.route(&packet, &state, "platoon-leader");
823
824 match decision {
826 RoutingDecision::ConsumeAndForward { next_hop } => {
827 assert_eq!(next_hop, "parent-node");
828 }
829 _ => panic!("Expected ConsumeAndForward, got {:?}", decision),
830 }
831 }
832
833 #[test]
834 fn test_routing_decision_consume_only() {
835 let router = SelectiveRouter::new();
836 let state = create_test_state(HierarchyLevel::Company, NodeRole::Leader, false, 3, 0);
838 let packet = DataPacket::telemetry("sensor-1", vec![1, 2, 3]);
839
840 let decision = router.route(&packet, &state, "hq-node");
841
842 assert_eq!(decision, RoutingDecision::Consume);
844 }
845
846 #[test]
847 fn test_dont_route_own_packets() {
848 let router = SelectiveRouter::new();
849 let state = create_test_state(HierarchyLevel::Squad, NodeRole::Member, true, 0, 0);
850 let packet = DataPacket::telemetry("this-node", vec![1, 2, 3]);
851
852 let decision = router.route(&packet, &state, "this-node");
854 assert_eq!(decision, RoutingDecision::Drop);
855 }
856
857 #[test]
858 fn test_should_aggregate_intermediate_leader() {
859 let router = SelectiveRouter::new();
860 let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Leader, true, 3, 0);
862 let packet = DataPacket::telemetry("squad-member-1", vec![1, 2, 3]);
863
864 let decision = router.route(&packet, &state, "platoon-leader");
865
866 assert!(router.should_aggregate(&packet, &decision, &state));
868 }
869
870 #[test]
871 fn test_should_not_aggregate_hq_node() {
872 let router = SelectiveRouter::new();
873 let state = create_test_state(HierarchyLevel::Company, NodeRole::Leader, false, 3, 0);
875 let packet = DataPacket::telemetry("platoon-1", vec![1, 2, 3]);
876
877 let decision = router.route(&packet, &state, "hq-node");
878
879 assert!(!router.should_aggregate(&packet, &decision, &state));
881 }
882
883 #[test]
884 fn test_should_not_aggregate_non_leader() {
885 let router = SelectiveRouter::new();
886 let state = create_test_state(HierarchyLevel::Squad, NodeRole::Member, true, 0, 0);
888 let packet = DataPacket::telemetry("sensor-1", vec![1, 2, 3]);
889
890 let decision = router.route(&packet, &state, "squad-member");
891
892 assert!(!router.should_aggregate(&packet, &decision, &state));
894 }
895
896 #[test]
897 fn test_should_not_aggregate_command_packet() {
898 let router = SelectiveRouter::new();
899 let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Leader, true, 3, 0);
901 let packet = DataPacket::command("hq", "platoon-leader", vec![4, 5, 6]);
902
903 let decision = router.route(&packet, &state, "platoon-leader");
904
905 assert!(!router.should_aggregate(&packet, &decision, &state));
907 }
908
909 #[test]
914 fn test_next_hops_upward_single_parent() {
915 let router = SelectiveRouter::new();
916 let state = create_test_state(HierarchyLevel::Squad, NodeRole::Member, true, 0, 0);
917 let packet = DataPacket::telemetry("sensor-1", vec![1, 2, 3]);
918
919 let next_hops = router.next_hops(&packet, &state);
921 assert_eq!(next_hops.len(), 1);
922 assert_eq!(next_hops[0], "parent-node");
923 }
924
925 #[test]
926 fn test_next_hops_upward_no_parent() {
927 let router = SelectiveRouter::new();
928 let state = create_test_state(HierarchyLevel::Company, NodeRole::Leader, false, 3, 0);
930 let packet = DataPacket::telemetry("sensor-1", vec![1, 2, 3]);
931
932 let next_hops = router.next_hops(&packet, &state);
934 assert_eq!(next_hops.len(), 0);
935 }
936
937 #[test]
938 fn test_next_hops_downward_multicast() {
939 let router = SelectiveRouter::new();
940 let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Leader, true, 5, 0);
942 let packet = DataPacket {
944 packet_id: uuid::Uuid::new_v4().to_string(),
945 source_node_id: "hq".to_string(),
946 destination_node_id: None, data_type: DataType::Command,
948 direction: DataDirection::Downward,
949 hop_count: 0,
950 max_hops: 10,
951 payload: vec![4, 5, 6],
952 };
953
954 let next_hops = router.next_hops(&packet, &state);
956 assert_eq!(next_hops.len(), 5);
957 for i in 0..5 {
958 assert!(next_hops.contains(&format!("linked-peer-{}", i)));
959 }
960 }
961
962 #[test]
963 fn test_next_hops_downward_targeted() {
964 let router = SelectiveRouter::new();
965 let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Leader, true, 3, 0);
967 let packet = DataPacket::command("hq", "linked-peer-1", vec![4, 5, 6]);
968
969 let next_hops = router.next_hops(&packet, &state);
971 assert_eq!(next_hops.len(), 1);
972 assert_eq!(next_hops[0], "linked-peer-1");
973 }
974
975 #[test]
976 fn test_next_hops_lateral_multicast() {
977 let router = SelectiveRouter::new();
978 let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Leader, true, 2, 4);
980 let packet = DataPacket {
982 packet_id: uuid::Uuid::new_v4().to_string(),
983 source_node_id: "platoon-1".to_string(),
984 destination_node_id: None, data_type: DataType::Coordination,
986 direction: DataDirection::Lateral,
987 hop_count: 0,
988 max_hops: 3,
989 payload: vec![7, 8, 9],
990 };
991
992 let next_hops = router.next_hops(&packet, &state);
994 assert_eq!(next_hops.len(), 4);
995 for i in 0..4 {
996 assert!(next_hops.contains(&format!("lateral-peer-{}", i)));
997 }
998 }
999
1000 #[test]
1001 fn test_next_hops_lateral_targeted() {
1002 let router = SelectiveRouter::new();
1003 let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Leader, true, 2, 3);
1005 let packet = DataPacket::coordination("platoon-1", "lateral-peer-2", vec![7, 8, 9]);
1006
1007 let next_hops = router.next_hops(&packet, &state);
1009 assert_eq!(next_hops.len(), 1);
1010 assert_eq!(next_hops[0], "lateral-peer-2");
1011 }
1012
1013 #[test]
1014 fn test_route_downward_multicast() {
1015 let router = SelectiveRouter::new();
1016 let state = create_test_state(HierarchyLevel::Company, NodeRole::Leader, false, 3, 0);
1018 let packet = DataPacket {
1020 packet_id: uuid::Uuid::new_v4().to_string(),
1021 source_node_id: "hq".to_string(),
1022 destination_node_id: None, data_type: DataType::Command,
1024 direction: DataDirection::Downward,
1025 hop_count: 0,
1026 max_hops: 10,
1027 payload: vec![4, 5, 6],
1028 };
1029
1030 let decision = router.route(&packet, &state, "hq-node");
1031
1032 match decision {
1034 RoutingDecision::ConsumeAndForwardMulticast { next_hops } => {
1035 assert_eq!(next_hops.len(), 3);
1036 assert!(next_hops.contains(&"linked-peer-0".to_string()));
1037 assert!(next_hops.contains(&"linked-peer-1".to_string()));
1038 assert!(next_hops.contains(&"linked-peer-2".to_string()));
1039 }
1040 _ => panic!("Expected ConsumeAndForwardMulticast, got {:?}", decision),
1041 }
1042 }
1043
1044 #[test]
1045 fn test_route_downward_consume_and_multicast() {
1046 let router = SelectiveRouter::new();
1047 let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Leader, true, 4, 0);
1049 let packet = DataPacket::command("hq", "platoon-leader", vec![4, 5, 6]);
1050
1051 let decision = router.route(&packet, &state, "platoon-leader");
1052
1053 match decision {
1055 RoutingDecision::ConsumeAndForwardMulticast { next_hops } => {
1056 assert_eq!(next_hops.len(), 4);
1057 for i in 0..4 {
1058 assert!(next_hops.contains(&format!("linked-peer-{}", i)));
1059 }
1060 }
1061 _ => panic!("Expected ConsumeAndForwardMulticast, got {:?}", decision),
1062 }
1063 }
1064
1065 #[test]
1066 fn test_route_lateral_multicast() {
1067 let router = SelectiveRouter::new();
1068 let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Leader, true, 0, 3);
1070 let packet = DataPacket {
1072 packet_id: uuid::Uuid::new_v4().to_string(),
1073 source_node_id: "platoon-1".to_string(),
1074 destination_node_id: None, data_type: DataType::Coordination,
1076 direction: DataDirection::Lateral,
1077 hop_count: 0,
1078 max_hops: 3,
1079 payload: vec![7, 8, 9],
1080 };
1081
1082 let decision = router.route(&packet, &state, "platoon-3");
1083
1084 match decision {
1086 RoutingDecision::ConsumeAndForwardMulticast { next_hops } => {
1087 assert_eq!(next_hops.len(), 3);
1088 assert!(next_hops.contains(&"lateral-peer-0".to_string()));
1089 assert!(next_hops.contains(&"lateral-peer-1".to_string()));
1090 assert!(next_hops.contains(&"lateral-peer-2".to_string()));
1091 }
1092 _ => panic!("Expected ConsumeAndForwardMulticast, got {:?}", decision),
1093 }
1094 }
1095
1096 #[test]
1097 fn test_route_downward_single_child_unicast() {
1098 let router = SelectiveRouter::new();
1099 let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Leader, false, 1, 0);
1101 let packet = DataPacket {
1103 packet_id: uuid::Uuid::new_v4().to_string(),
1104 source_node_id: "hq".to_string(),
1105 destination_node_id: None, data_type: DataType::Command,
1107 direction: DataDirection::Downward,
1108 hop_count: 0,
1109 max_hops: 10,
1110 payload: vec![4, 5, 6],
1111 };
1112
1113 let decision = router.route(&packet, &state, "platoon-leader");
1114
1115 match decision {
1117 RoutingDecision::ConsumeAndForward { next_hop } => {
1118 assert_eq!(next_hop, "linked-peer-0");
1119 }
1120 _ => panic!("Expected ConsumeAndForward (unicast), got {:?}", decision),
1121 }
1122 }
1123
1124 #[test]
1125 fn test_route_lateral_single_peer_unicast() {
1126 let router = SelectiveRouter::new();
1127 let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Leader, true, 0, 1);
1129 let packet = DataPacket {
1131 packet_id: uuid::Uuid::new_v4().to_string(),
1132 source_node_id: "platoon-1".to_string(),
1133 destination_node_id: None, data_type: DataType::Coordination,
1135 direction: DataDirection::Lateral,
1136 hop_count: 0,
1137 max_hops: 3,
1138 payload: vec![7, 8, 9],
1139 };
1140
1141 let decision = router.route(&packet, &state, "platoon-3");
1142
1143 match decision {
1145 RoutingDecision::ConsumeAndForward { next_hop } => {
1146 assert_eq!(next_hop, "lateral-peer-0");
1147 }
1148 _ => panic!("Expected ConsumeAndForward (unicast), got {:?}", decision),
1149 }
1150 }
1151
1152 #[test]
1153 fn test_route_downward_no_children_drop() {
1154 let router = SelectiveRouter::new();
1155 let state = create_test_state(HierarchyLevel::Squad, NodeRole::Member, true, 0, 0);
1157 let packet = DataPacket {
1159 packet_id: uuid::Uuid::new_v4().to_string(),
1160 source_node_id: "hq".to_string(),
1161 destination_node_id: None, data_type: DataType::Command,
1163 direction: DataDirection::Downward,
1164 hop_count: 0,
1165 max_hops: 10,
1166 payload: vec![4, 5, 6],
1167 };
1168
1169 let decision = router.route(&packet, &state, "squad-member");
1170
1171 assert_eq!(decision, RoutingDecision::Drop);
1174 }
1175
1176 #[test]
1177 fn test_multicast_preserves_backward_compatibility() {
1178 let router = SelectiveRouter::new();
1179 let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Leader, true, 3, 0);
1181 let packet = DataPacket::telemetry("sensor-1", vec![1, 2, 3]);
1182
1183 let decision = router.route(&packet, &state, "platoon-leader");
1184
1185 match decision {
1187 RoutingDecision::ConsumeAndForward { next_hop } => {
1188 assert_eq!(next_hop, "parent-node");
1189 }
1190 _ => panic!(
1191 "Expected ConsumeAndForward (backward compat), got {:?}",
1192 decision
1193 ),
1194 }
1195
1196 let next_hop = router.next_hop(&packet, &state);
1198 assert_eq!(next_hop, Some("parent-node".to_string()));
1199 }
1200
1201 #[test]
1206 fn test_deduplication_disabled_by_default() {
1207 let router = SelectiveRouter::new();
1208 let state = create_test_state(HierarchyLevel::Squad, NodeRole::Member, true, 0, 0);
1209 let packet = DataPacket::telemetry("sensor-1", vec![1, 2, 3]);
1210
1211 let decision1 = router.route(&packet, &state, "this-node");
1213 let decision2 = router.route(&packet, &state, "this-node");
1214
1215 assert!(matches!(
1217 decision1,
1218 RoutingDecision::ConsumeAndForward { .. }
1219 ));
1220 assert!(matches!(
1221 decision2,
1222 RoutingDecision::ConsumeAndForward { .. }
1223 ));
1224 assert_eq!(router.dedup_cache_size(), 0);
1225 }
1226
1227 #[test]
1228 fn test_deduplication_enabled() {
1229 let router = SelectiveRouter::new_with_deduplication(DeduplicationConfig::default());
1230 let state = create_test_state(HierarchyLevel::Squad, NodeRole::Member, true, 0, 0);
1231 let packet = DataPacket::telemetry("sensor-1", vec![1, 2, 3]);
1232
1233 let decision1 = router.route(&packet, &state, "this-node");
1235 assert!(matches!(
1236 decision1,
1237 RoutingDecision::ConsumeAndForward { .. }
1238 ));
1239 assert_eq!(router.dedup_cache_size(), 1);
1240
1241 let decision2 = router.route(&packet, &state, "this-node");
1243 assert_eq!(decision2, RoutingDecision::Drop);
1244 assert_eq!(router.dedup_cache_size(), 1); }
1246
1247 #[test]
1248 fn test_deduplication_different_packets() {
1249 let router = SelectiveRouter::new_with_deduplication(DeduplicationConfig::default());
1250 let state = create_test_state(HierarchyLevel::Squad, NodeRole::Member, true, 0, 0);
1251 let packet1 = DataPacket::telemetry("sensor-1", vec![1, 2, 3]);
1252 let packet2 = DataPacket::telemetry("sensor-2", vec![4, 5, 6]);
1253
1254 let decision1 = router.route(&packet1, &state, "this-node");
1256 let decision2 = router.route(&packet2, &state, "this-node");
1257
1258 assert!(matches!(
1259 decision1,
1260 RoutingDecision::ConsumeAndForward { .. }
1261 ));
1262 assert!(matches!(
1263 decision2,
1264 RoutingDecision::ConsumeAndForward { .. }
1265 ));
1266 assert_eq!(router.dedup_cache_size(), 2);
1267 }
1268
1269 #[test]
1270 fn test_deduplication_cache_clear() {
1271 let router = SelectiveRouter::new_with_deduplication(DeduplicationConfig::default());
1272 let state = create_test_state(HierarchyLevel::Squad, NodeRole::Member, true, 0, 0);
1273 let packet = DataPacket::telemetry("sensor-1", vec![1, 2, 3]);
1274
1275 let _ = router.route(&packet, &state, "this-node");
1277 assert_eq!(router.dedup_cache_size(), 1);
1278
1279 router.clear_dedup_cache();
1281 assert_eq!(router.dedup_cache_size(), 0);
1282
1283 let decision = router.route(&packet, &state, "this-node");
1285 assert!(matches!(
1286 decision,
1287 RoutingDecision::ConsumeAndForward { .. }
1288 ));
1289 }
1290
1291 #[test]
1292 fn test_deduplication_config_defaults() {
1293 let config = DeduplicationConfig::default();
1294 assert!(config.enabled);
1295 assert_eq!(config.ttl, Duration::from_secs(300));
1296 assert_eq!(config.max_entries, 10000);
1297 }
1298
1299 #[test]
1300 fn test_verbose_router() {
1301 let router = SelectiveRouter::new_verbose();
1302 let state = create_test_state(HierarchyLevel::Squad, NodeRole::Member, true, 0, 0);
1303 let packet = DataPacket::telemetry("sensor-1", vec![1, 2, 3]);
1304
1305 let decision = router.route(&packet, &state, "this-node");
1307 assert!(matches!(
1308 decision,
1309 RoutingDecision::ConsumeAndForward { .. }
1310 ));
1311 }
1312
1313 #[test]
1314 fn test_verbose_max_hops_drop() {
1315 let router = SelectiveRouter::new_verbose();
1316 let state = create_test_state(HierarchyLevel::Squad, NodeRole::Member, true, 0, 0);
1317 let mut packet = DataPacket::telemetry("sensor-1", vec![1, 2, 3]);
1318 for _ in 0..10 {
1319 packet.increment_hop();
1320 }
1321 let decision = router.route(&packet, &state, "this-node");
1322 assert_eq!(decision, RoutingDecision::Drop);
1323 }
1324
1325 #[test]
1326 fn test_verbose_own_packet_drop() {
1327 let router = SelectiveRouter::new_verbose();
1328 let state = create_test_state(HierarchyLevel::Squad, NodeRole::Member, true, 0, 0);
1329 let packet = DataPacket::telemetry("this-node", vec![1, 2, 3]);
1330 let decision = router.route(&packet, &state, "this-node");
1331 assert_eq!(decision, RoutingDecision::Drop);
1332 }
1333
1334 #[test]
1335 fn test_verbose_consume_only() {
1336 let router = SelectiveRouter::new_verbose();
1337 let state = create_test_state(HierarchyLevel::Company, NodeRole::Leader, false, 3, 0);
1338 let packet = DataPacket::telemetry("sensor-1", vec![1, 2, 3]);
1339 let decision = router.route(&packet, &state, "hq-node");
1340 assert_eq!(decision, RoutingDecision::Consume);
1341 }
1342
1343 #[test]
1344 fn test_verbose_consume_and_forward() {
1345 let router = SelectiveRouter::new_verbose();
1346 let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Leader, true, 3, 0);
1347 let packet = DataPacket::telemetry("sensor-1", vec![1, 2, 3]);
1348 let decision = router.route(&packet, &state, "platoon-leader");
1349 assert!(matches!(
1350 decision,
1351 RoutingDecision::ConsumeAndForward { .. }
1352 ));
1353 }
1354
1355 #[test]
1356 fn test_verbose_consume_and_multicast() {
1357 let router = SelectiveRouter::new_verbose();
1358 let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Leader, true, 4, 0);
1359 let packet = DataPacket::command("hq", "platoon-leader", vec![4, 5, 6]);
1360 let decision = router.route(&packet, &state, "platoon-leader");
1361 assert!(matches!(
1362 decision,
1363 RoutingDecision::ConsumeAndForwardMulticast { .. }
1364 ));
1365 }
1366
1367 #[test]
1368 fn test_verbose_forward_multicast() {
1369 let router = SelectiveRouter::new_verbose();
1370 let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Member, true, 3, 0);
1372 let packet = DataPacket {
1374 packet_id: uuid::Uuid::new_v4().to_string(),
1375 source_node_id: "hq".to_string(),
1376 destination_node_id: None,
1377 data_type: DataType::Command,
1378 direction: DataDirection::Downward,
1379 hop_count: 0,
1380 max_hops: 10,
1381 payload: vec![4, 5, 6],
1382 };
1383 let decision = router.route(&packet, &state, "member-node");
1384 assert!(matches!(decision, RoutingDecision::ForwardMulticast { .. }));
1385 }
1386
1387 #[test]
1388 fn test_verbose_forward_unicast() {
1389 let router = SelectiveRouter::new_verbose();
1390 let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Member, true, 1, 0);
1391 let packet = DataPacket {
1392 packet_id: uuid::Uuid::new_v4().to_string(),
1393 source_node_id: "hq".to_string(),
1394 destination_node_id: None,
1395 data_type: DataType::Command,
1396 direction: DataDirection::Downward,
1397 hop_count: 0,
1398 max_hops: 10,
1399 payload: vec![4, 5, 6],
1400 };
1401 let decision = router.route(&packet, &state, "member-node");
1402 assert!(matches!(decision, RoutingDecision::Forward { .. }));
1403 }
1404
1405 #[test]
1406 fn test_verbose_forward_no_next_hop_drop() {
1407 let router = SelectiveRouter::new_verbose();
1408 let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Member, false, 0, 1);
1410 let packet = DataPacket {
1411 packet_id: uuid::Uuid::new_v4().to_string(),
1412 source_node_id: "other".to_string(),
1413 destination_node_id: Some("lateral-peer-0".to_string()),
1414 data_type: DataType::Coordination,
1415 direction: DataDirection::Lateral,
1416 hop_count: 0,
1417 max_hops: 3,
1418 payload: vec![7, 8, 9],
1419 };
1420 let decision = router.route(&packet, &state, "member-node");
1422 assert!(matches!(decision, RoutingDecision::Forward { .. }));
1423 }
1424
1425 #[test]
1426 fn test_verbose_drop_not_for_us() {
1427 let router = SelectiveRouter::new_verbose();
1428 let state = create_test_state(HierarchyLevel::Squad, NodeRole::Member, false, 0, 0);
1429 let packet = DataPacket {
1431 packet_id: uuid::Uuid::new_v4().to_string(),
1432 source_node_id: "other".to_string(),
1433 destination_node_id: Some("someone-else".to_string()),
1434 data_type: DataType::Coordination,
1435 direction: DataDirection::Lateral,
1436 hop_count: 0,
1437 max_hops: 3,
1438 payload: vec![7, 8, 9],
1439 };
1440 let decision = router.route(&packet, &state, "member-node");
1441 assert_eq!(decision, RoutingDecision::Drop);
1442 }
1443
1444 #[test]
1445 fn test_verbose_dedup_drop() {
1446 let config = DeduplicationConfig {
1447 enabled: true,
1448 ttl: Duration::from_secs(300),
1449 max_entries: 100,
1450 };
1451 let router = SelectiveRouter {
1452 verbose: true,
1453 dedup_config: config,
1454 seen_packets: Arc::new(RwLock::new(HashMap::new())),
1455 };
1456 let state = create_test_state(HierarchyLevel::Squad, NodeRole::Member, true, 0, 0);
1457 let packet = DataPacket::telemetry("sensor-1", vec![1, 2, 3]);
1458
1459 let _ = router.route(&packet, &state, "this-node");
1460 let decision2 = router.route(&packet, &state, "this-node");
1461 assert_eq!(decision2, RoutingDecision::Drop);
1462 }
1463
1464 #[test]
1465 fn test_forward_only_no_consume_member_downward() {
1466 let router = SelectiveRouter::new();
1468 let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Member, true, 2, 0);
1469 let packet = DataPacket {
1470 packet_id: uuid::Uuid::new_v4().to_string(),
1471 source_node_id: "hq".to_string(),
1472 destination_node_id: None,
1473 data_type: DataType::Command,
1474 direction: DataDirection::Downward,
1475 hop_count: 0,
1476 max_hops: 10,
1477 payload: vec![4, 5, 6],
1478 };
1479 let decision = router.route(&packet, &state, "member-node");
1480 assert!(matches!(decision, RoutingDecision::ForwardMulticast { .. }));
1482 }
1483
1484 #[test]
1485 fn test_should_forward_no_next_hop_returns_drop() {
1486 let router = SelectiveRouter::new();
1487 let state = create_test_state(HierarchyLevel::Squad, NodeRole::Member, false, 0, 0);
1489 let packet = DataPacket {
1490 packet_id: uuid::Uuid::new_v4().to_string(),
1491 source_node_id: "hq".to_string(),
1492 destination_node_id: None,
1493 data_type: DataType::Command,
1494 direction: DataDirection::Downward,
1495 hop_count: 0,
1496 max_hops: 10,
1497 payload: vec![4, 5, 6],
1498 };
1499 let decision = router.route(&packet, &state, "squad-member");
1500 assert_eq!(decision, RoutingDecision::Drop);
1501 }
1502
1503 #[test]
1504 fn test_next_hop_downward_targeted_not_found() {
1505 let router = SelectiveRouter::new();
1506 let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Leader, true, 3, 0);
1507 let packet = DataPacket::command("hq", "unknown-child", vec![4, 5, 6]);
1509
1510 let next_hop = router.next_hop(&packet, &state);
1511 assert!(next_hop.is_some());
1513 assert!(next_hop.unwrap().starts_with("linked-peer-"));
1514 }
1515
1516 #[test]
1517 fn test_next_hop_lateral_unknown_peer() {
1518 let router = SelectiveRouter::new();
1519 let state = create_test_state(HierarchyLevel::Platoon, NodeRole::Leader, true, 0, 3);
1520 let packet = DataPacket::coordination("source", "unknown-lateral", vec![7, 8, 9]);
1521
1522 let next_hop = router.next_hop(&packet, &state);
1523 assert!(next_hop.is_some());
1525 assert!(next_hop.unwrap().starts_with("lateral-peer-"));
1526 }
1527
1528 #[test]
1529 fn test_default_router() {
1530 let router = SelectiveRouter::default();
1531 assert_eq!(router.dedup_cache_size(), 0);
1532 }
1533
1534 #[test]
1535 fn test_deduplication_max_entries_eviction() {
1536 let config = DeduplicationConfig {
1537 enabled: true,
1538 ttl: Duration::from_secs(300),
1539 max_entries: 3, };
1541 let router = SelectiveRouter::new_with_deduplication(config);
1542 let state = create_test_state(HierarchyLevel::Squad, NodeRole::Member, true, 0, 0);
1543
1544 for i in 0..5 {
1546 let packet = DataPacket::telemetry(format!("sensor-{}", i), vec![i as u8]);
1547 let _ = router.route(&packet, &state, "this-node");
1548 }
1549
1550 assert!(router.dedup_cache_size() <= 3);
1552 }
1553}