1use std::collections::HashMap;
61use std::hash::{Hash, Hasher};
62use std::net::{IpAddr, Ipv4Addr, SocketAddr};
63use std::sync::atomic::{AtomicBool, AtomicU64, AtomicU8, Ordering};
64use std::sync::{Arc, RwLock};
65use std::time::{Duration, Instant};
66
67use chacha20poly1305::{
68 aead::{Aead, KeyInit},
69 ChaCha20Poly1305, Nonce,
70};
71use ed25519_dalek::{Signature, Signer, SigningKey, Verifier, VerifyingKey};
72use serde::{Deserialize, Serialize};
73use tokio::net::UdpSocket;
74use tokio::sync::broadcast;
75use tracing::{debug, info};
76
77use super::MessagePriority;
78
79#[derive(Debug)]
85pub enum BypassError {
86 Io(std::io::Error),
88 Encode(String),
90 Decode(String),
92 Config(String),
94 NotStarted,
96 MessageTooLarge { size: usize, max: usize },
98 InvalidHeader,
100 StaleMessage,
102 InvalidSignature,
104 DecryptionFailed,
106 UnauthorizedSource(IpAddr),
108 ReplayDetected { sequence: u8 },
110 MissingCredential(String),
112}
113
114impl std::fmt::Display for BypassError {
115 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
116 match self {
117 BypassError::Io(e) => write!(f, "IO error: {}", e),
118 BypassError::Encode(msg) => write!(f, "Encode error: {}", msg),
119 BypassError::Decode(msg) => write!(f, "Decode error: {}", msg),
120 BypassError::Config(msg) => write!(f, "Config error: {}", msg),
121 BypassError::NotStarted => write!(f, "Bypass channel not started"),
122 BypassError::MessageTooLarge { size, max } => {
123 write!(f, "Message too large: {} bytes (max {})", size, max)
124 }
125 BypassError::InvalidHeader => write!(f, "Invalid bypass header"),
126 BypassError::StaleMessage => write!(f, "Message is stale (past TTL)"),
127 BypassError::InvalidSignature => write!(f, "Invalid message signature"),
128 BypassError::DecryptionFailed => write!(f, "Message decryption failed"),
129 BypassError::UnauthorizedSource(ip) => {
130 write!(f, "Unauthorized source IP: {}", ip)
131 }
132 BypassError::ReplayDetected { sequence } => {
133 write!(f, "Replay attack detected (sequence {})", sequence)
134 }
135 BypassError::MissingCredential(what) => {
136 write!(f, "Missing security credential: {}", what)
137 }
138 }
139 }
140}
141
142impl std::error::Error for BypassError {
143 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
144 match self {
145 BypassError::Io(e) => Some(e),
146 _ => None,
147 }
148 }
149}
150
151impl From<std::io::Error> for BypassError {
152 fn from(err: std::io::Error) -> Self {
153 BypassError::Io(err)
154 }
155}
156
157pub type Result<T> = std::result::Result<T, BypassError>;
158
159#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize, Deserialize)]
165#[serde(tag = "type", rename_all = "snake_case")]
166pub enum BypassTransport {
167 #[default]
169 Unicast,
170 Multicast {
172 group: IpAddr,
174 port: u16,
176 },
177 Broadcast,
179}
180
181#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
183#[serde(rename_all = "snake_case")]
184pub enum MessageEncoding {
185 #[default]
187 Protobuf,
188 Json,
190 Raw,
192 Cbor,
194}
195
196impl std::fmt::Display for MessageEncoding {
197 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
198 match self {
199 MessageEncoding::Protobuf => write!(f, "protobuf"),
200 MessageEncoding::Json => write!(f, "json"),
201 MessageEncoding::Raw => write!(f, "raw"),
202 MessageEncoding::Cbor => write!(f, "cbor"),
203 }
204 }
205}
206
207#[derive(Debug, Clone, Serialize, Deserialize)]
209pub struct BypassCollectionConfig {
210 pub collection: String,
212 pub transport: BypassTransport,
214 pub encoding: MessageEncoding,
216 #[serde(default = "default_ttl_ms")]
218 pub ttl_ms: u64,
219 #[serde(default)]
221 pub priority: MessagePriority,
222}
223
224fn default_ttl_ms() -> u64 {
225 5000
226}
227
228impl BypassCollectionConfig {
229 pub fn ttl(&self) -> Duration {
231 Duration::from_millis(self.ttl_ms)
232 }
233}
234
235impl Default for BypassCollectionConfig {
236 fn default() -> Self {
237 Self {
238 collection: String::new(),
239 transport: BypassTransport::Unicast,
240 encoding: MessageEncoding::Protobuf,
241 ttl_ms: 5000,
242 priority: MessagePriority::Normal,
243 }
244 }
245}
246
247#[derive(Debug, Clone, Serialize, Deserialize)]
249pub struct UdpConfig {
250 pub bind_port: u16,
252 pub buffer_size: usize,
254 pub multicast_ttl: u32,
256}
257
258impl Default for UdpConfig {
259 fn default() -> Self {
260 Self {
261 bind_port: 5150,
262 buffer_size: 65536,
263 multicast_ttl: 32,
264 }
265 }
266}
267
268#[derive(Debug, Clone, Default, Serialize, Deserialize)]
270pub struct BypassChannelConfig {
271 pub udp: UdpConfig,
273 pub collections: Vec<BypassCollectionConfig>,
275 pub multicast_enabled: bool,
277 pub max_message_size: usize,
279}
280
281impl BypassChannelConfig {
282 pub fn new() -> Self {
284 Self {
285 udp: UdpConfig::default(),
286 collections: Vec::new(),
287 multicast_enabled: true,
288 max_message_size: 65000, }
290 }
291
292 pub fn with_collection(mut self, config: BypassCollectionConfig) -> Self {
294 self.collections.push(config);
295 self
296 }
297
298 pub fn get_collection(&self, name: &str) -> Option<&BypassCollectionConfig> {
300 self.collections.iter().find(|c| c.collection == name)
301 }
302
303 pub fn is_bypass_collection(&self, name: &str) -> bool {
305 self.collections.iter().any(|c| c.collection == name)
306 }
307}
308
309#[derive(Debug, Clone, Default)]
354pub struct BypassSecurityConfig {
355 pub require_signature: bool,
362
363 pub encrypt_payload: bool,
370
371 pub source_allowlist: Option<Vec<IpAddr>>,
378
379 pub replay_protection: bool,
386
387 pub replay_window_size: usize,
392}
393
394impl BypassSecurityConfig {
395 pub fn none() -> Self {
397 Self::default()
398 }
399
400 pub fn signed() -> Self {
402 Self {
403 require_signature: true,
404 ..Default::default()
405 }
406 }
407
408 pub fn full_security() -> Self {
410 Self {
411 require_signature: true,
412 encrypt_payload: true,
413 source_allowlist: None, replay_protection: true,
415 replay_window_size: 64,
416 }
417 }
418
419 pub fn is_enabled(&self) -> bool {
421 self.require_signature
422 || self.encrypt_payload
423 || self.source_allowlist.is_some()
424 || self.replay_protection
425 }
426
427 pub fn is_source_allowed(&self, ip: &IpAddr) -> bool {
429 match &self.source_allowlist {
430 Some(list) => list.contains(ip),
431 None => true, }
433 }
434}
435
436#[derive(Clone, Default)]
440pub struct BypassSecurityCredentials {
441 signing_key: Option<SigningKey>,
443
444 peer_keys: HashMap<String, VerifyingKey>,
447
448 encryption_key: Option<[u8; 32]>,
451}
452
453impl std::fmt::Debug for BypassSecurityCredentials {
454 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
455 f.debug_struct("BypassSecurityCredentials")
456 .field("has_signing_key", &self.signing_key.is_some())
457 .field("peer_keys_count", &self.peer_keys.len())
458 .field("has_encryption_key", &self.encryption_key.is_some())
459 .finish()
460 }
461}
462
463impl BypassSecurityCredentials {
464 pub fn new() -> Self {
466 Self::default()
467 }
468
469 pub fn with_signing_key(mut self, key: SigningKey) -> Self {
471 self.signing_key = Some(key);
472 self
473 }
474
475 pub fn with_peer_key(mut self, peer_id: impl Into<String>, key: VerifyingKey) -> Self {
477 self.peer_keys.insert(peer_id.into(), key);
478 self
479 }
480
481 pub fn with_encryption_key(mut self, key: [u8; 32]) -> Self {
483 self.encryption_key = Some(key);
484 self
485 }
486
487 pub fn verifying_key(&self) -> Option<VerifyingKey> {
489 self.signing_key.as_ref().map(|k| k.verifying_key())
490 }
491
492 pub fn sign(&self, message: &[u8]) -> Result<Signature> {
494 let key = self
495 .signing_key
496 .as_ref()
497 .ok_or_else(|| BypassError::MissingCredential("signing key".into()))?;
498 Ok(key.sign(message))
499 }
500
501 pub fn verify(&self, peer_id: &str, message: &[u8], signature: &Signature) -> Result<()> {
503 let key = self
504 .peer_keys
505 .get(peer_id)
506 .ok_or_else(|| BypassError::MissingCredential(format!("peer key for {}", peer_id)))?;
507 key.verify(message, signature)
508 .map_err(|_| BypassError::InvalidSignature)
509 }
510
511 pub fn verify_by_ip(&self, ip: &IpAddr, message: &[u8], signature: &Signature) -> Result<()> {
513 self.verify(&ip.to_string(), message, signature)
514 }
515
516 pub fn encrypt(&self, plaintext: &[u8], nonce: &[u8; 12]) -> Result<Vec<u8>> {
518 let key = self
519 .encryption_key
520 .as_ref()
521 .ok_or_else(|| BypassError::MissingCredential("encryption key".into()))?;
522
523 let cipher = ChaCha20Poly1305::new_from_slice(key)
524 .map_err(|_| BypassError::Config("Invalid encryption key".into()))?;
525
526 let nonce = Nonce::from_slice(nonce);
527 cipher
528 .encrypt(nonce, plaintext)
529 .map_err(|_| BypassError::Encode("Encryption failed".into()))
530 }
531
532 pub fn decrypt(&self, ciphertext: &[u8], nonce: &[u8; 12]) -> Result<Vec<u8>> {
534 let key = self
535 .encryption_key
536 .as_ref()
537 .ok_or_else(|| BypassError::MissingCredential("encryption key".into()))?;
538
539 let cipher = ChaCha20Poly1305::new_from_slice(key)
540 .map_err(|_| BypassError::Config("Invalid encryption key".into()))?;
541
542 let nonce = Nonce::from_slice(nonce);
543 cipher
544 .decrypt(nonce, ciphertext)
545 .map_err(|_| BypassError::DecryptionFailed)
546 }
547}
548
549#[derive(Debug, Default)]
554pub struct ReplayTracker {
555 windows: RwLock<HashMap<IpAddr, ReplayWindow>>,
558
559 window_size: usize,
561}
562
563#[derive(Debug)]
565struct ReplayWindow {
566 highest_seq: u8,
568 seen: u64,
570}
571
572impl ReplayWindow {
573 fn new() -> Self {
574 Self {
575 highest_seq: 0,
576 seen: 0,
577 }
578 }
579
580 fn check_and_update(&mut self, seq: u8, window_size: usize) -> bool {
583 let window_size = window_size.min(64) as u8;
584
585 let diff = self.highest_seq.wrapping_sub(seq);
587
588 if diff == 0 && self.seen == 0 {
589 self.highest_seq = seq;
591 self.seen = 1;
592 return true;
593 }
594
595 if seq == self.highest_seq {
596 return false;
598 }
599
600 let ahead = seq.wrapping_sub(self.highest_seq);
602 if ahead > 0 && ahead < 128 {
603 let shift = ahead as u32;
605 if shift < 64 {
606 self.seen = (self.seen << shift) | 1;
607 } else {
608 self.seen = 1;
609 }
610 self.highest_seq = seq;
611 return true;
612 }
613
614 if diff < window_size && diff < 64 {
616 let bit = 1u64 << diff;
617 if self.seen & bit != 0 {
618 return false;
620 }
621 self.seen |= bit;
622 return true;
623 }
624
625 false
627 }
628}
629
630impl ReplayTracker {
631 pub fn new(window_size: usize) -> Self {
633 Self {
634 windows: RwLock::new(HashMap::new()),
635 window_size: window_size.min(64),
636 }
637 }
638
639 pub fn check(&self, source: &IpAddr, sequence: u8) -> Result<()> {
642 let mut windows = self.windows.write().unwrap_or_else(|e| e.into_inner());
643 let window = windows.entry(*source).or_insert_with(ReplayWindow::new);
644
645 if window.check_and_update(sequence, self.window_size) {
646 Ok(())
647 } else {
648 Err(BypassError::ReplayDetected { sequence })
649 }
650 }
651
652 pub fn clear_source(&self, source: &IpAddr) {
654 self.windows
655 .write()
656 .unwrap_or_else(|e| e.into_inner())
657 .remove(source);
658 }
659
660 pub fn clear_all(&self) {
662 self.windows
663 .write()
664 .unwrap_or_else(|e| e.into_inner())
665 .clear();
666 }
667}
668
669#[derive(Debug, Clone, Copy)]
682pub struct BypassHeader {
683 pub magic: [u8; 4],
685 pub collection_hash: u32,
687 pub ttl_ms: u16,
689 pub flags: u8,
691 pub sequence: u8,
693}
694
695impl BypassHeader {
696 pub const MAGIC: [u8; 4] = [0x45, 0x43, 0x48, 0x45];
698
699 pub const SIZE: usize = 12;
701
702 pub const FLAG_COMPRESSED: u8 = 0x01;
704 pub const FLAG_ENCRYPTED: u8 = 0x02;
706 pub const FLAG_SIGNED: u8 = 0x04;
708
709 pub fn new(collection: &str, ttl: Duration, sequence: u8) -> Self {
711 Self {
712 magic: Self::MAGIC,
713 collection_hash: Self::hash_collection(collection),
714 ttl_ms: ttl.as_millis().min(u16::MAX as u128) as u16,
715 flags: 0,
716 sequence,
717 }
718 }
719
720 pub fn hash_collection(name: &str) -> u32 {
722 let mut hasher = fnv::FnvHasher::default();
723 name.hash(&mut hasher);
724 hasher.finish() as u32
725 }
726
727 pub fn is_valid(&self) -> bool {
729 self.magic == Self::MAGIC
730 }
731
732 pub fn encode(&self) -> [u8; Self::SIZE] {
734 let mut buf = [0u8; Self::SIZE];
735 buf[0..4].copy_from_slice(&self.magic);
736 buf[4..8].copy_from_slice(&self.collection_hash.to_be_bytes());
737 buf[8..10].copy_from_slice(&self.ttl_ms.to_be_bytes());
738 buf[10] = self.flags;
739 buf[11] = self.sequence;
740 buf
741 }
742
743 pub fn decode(buf: &[u8]) -> Result<Self> {
745 if buf.len() < Self::SIZE {
746 return Err(BypassError::InvalidHeader);
747 }
748
749 let mut magic = [0u8; 4];
750 magic.copy_from_slice(&buf[0..4]);
751
752 if magic != Self::MAGIC {
753 return Err(BypassError::InvalidHeader);
754 }
755
756 let collection_hash = u32::from_be_bytes([buf[4], buf[5], buf[6], buf[7]]);
757 let ttl_ms = u16::from_be_bytes([buf[8], buf[9]]);
758 let flags = buf[10];
759 let sequence = buf[11];
760
761 Ok(Self {
762 magic,
763 collection_hash,
764 ttl_ms,
765 flags,
766 sequence,
767 })
768 }
769
770 pub fn is_stale(&self, received_at: Instant, sent_at: Instant) -> bool {
772 let elapsed = received_at.duration_since(sent_at);
773 elapsed > Duration::from_millis(self.ttl_ms as u64)
774 }
775}
776
777mod fnv {
782 use std::hash::Hasher;
783
784 const FNV_OFFSET_BASIS: u64 = 14695981039346656037;
785 const FNV_PRIME: u64 = 1099511628211;
786
787 #[derive(Default)]
788 pub struct FnvHasher(u64);
789
790 impl Hasher for FnvHasher {
791 fn write(&mut self, bytes: &[u8]) {
792 for byte in bytes {
793 self.0 ^= *byte as u64;
794 self.0 = self.0.wrapping_mul(FNV_PRIME);
795 }
796 }
797
798 fn finish(&self) -> u64 {
799 self.0
800 }
801 }
802
803 impl FnvHasher {
804 pub fn default() -> Self {
805 Self(FNV_OFFSET_BASIS)
806 }
807 }
808}
809
810#[derive(Debug, Clone)]
816pub struct BypassMessage {
817 pub source: SocketAddr,
819 pub collection_hash: u32,
821 pub data: Vec<u8>,
823 pub received_at: Instant,
825 pub sequence: u8,
827 pub priority: MessagePriority,
829}
830
831#[derive(Debug, Clone)]
833pub enum BypassTarget {
834 Unicast(SocketAddr),
836 Multicast { group: IpAddr, port: u16 },
838 Broadcast { port: u16 },
840}
841
842#[derive(Debug, Default)]
848pub struct BypassMetrics {
849 pub messages_sent: AtomicU64,
851 pub messages_received: AtomicU64,
853 pub bytes_sent: AtomicU64,
855 pub bytes_received: AtomicU64,
857 pub stale_dropped: AtomicU64,
859 pub invalid_dropped: AtomicU64,
861 pub send_errors: AtomicU64,
863 pub receive_errors: AtomicU64,
865
866 pub signature_rejected: AtomicU64,
869 pub decryption_failed: AtomicU64,
871 pub unauthorized_source: AtomicU64,
873 pub replay_rejected: AtomicU64,
875}
876
877impl BypassMetrics {
878 pub fn snapshot(&self) -> BypassMetricsSnapshot {
880 BypassMetricsSnapshot {
881 messages_sent: self.messages_sent.load(Ordering::Relaxed),
882 messages_received: self.messages_received.load(Ordering::Relaxed),
883 bytes_sent: self.bytes_sent.load(Ordering::Relaxed),
884 bytes_received: self.bytes_received.load(Ordering::Relaxed),
885 stale_dropped: self.stale_dropped.load(Ordering::Relaxed),
886 invalid_dropped: self.invalid_dropped.load(Ordering::Relaxed),
887 send_errors: self.send_errors.load(Ordering::Relaxed),
888 receive_errors: self.receive_errors.load(Ordering::Relaxed),
889 signature_rejected: self.signature_rejected.load(Ordering::Relaxed),
890 decryption_failed: self.decryption_failed.load(Ordering::Relaxed),
891 unauthorized_source: self.unauthorized_source.load(Ordering::Relaxed),
892 replay_rejected: self.replay_rejected.load(Ordering::Relaxed),
893 }
894 }
895}
896
897#[derive(Debug, Clone, Default)]
899pub struct BypassMetricsSnapshot {
900 pub messages_sent: u64,
901 pub messages_received: u64,
902 pub bytes_sent: u64,
903 pub bytes_received: u64,
904 pub stale_dropped: u64,
905 pub invalid_dropped: u64,
906 pub send_errors: u64,
907 pub receive_errors: u64,
908
909 pub signature_rejected: u64,
912 pub decryption_failed: u64,
914 pub unauthorized_source: u64,
916 pub replay_rejected: u64,
918}
919
920pub struct UdpBypassChannel {
929 config: BypassChannelConfig,
931
932 socket: Option<Arc<UdpSocket>>,
934
935 multicast_sockets: RwLock<HashMap<IpAddr, Arc<UdpSocket>>>,
937
938 collection_map: HashMap<u32, BypassCollectionConfig>,
940
941 sequence: AtomicU8,
943
944 metrics: Arc<BypassMetrics>,
946
947 incoming_tx: broadcast::Sender<BypassMessage>,
949
950 running: Arc<AtomicBool>,
952}
953
954impl UdpBypassChannel {
955 pub async fn new(config: BypassChannelConfig) -> Result<Self> {
957 let collection_map: HashMap<u32, BypassCollectionConfig> = config
959 .collections
960 .iter()
961 .map(|c| (BypassHeader::hash_collection(&c.collection), c.clone()))
962 .collect();
963
964 let (incoming_tx, _) = broadcast::channel(1024);
965
966 Ok(Self {
967 config,
968 socket: None,
969 multicast_sockets: RwLock::new(HashMap::new()),
970 collection_map,
971 sequence: AtomicU8::new(0),
972 metrics: Arc::new(BypassMetrics::default()),
973 incoming_tx,
974 running: Arc::new(AtomicBool::new(false)),
975 })
976 }
977
978 pub async fn start(&mut self) -> Result<()> {
980 if self.running.load(Ordering::SeqCst) {
981 return Ok(());
982 }
983
984 let bind_addr = format!("0.0.0.0:{}", self.config.udp.bind_port);
986 let socket = UdpSocket::bind(&bind_addr).await?;
987 socket.set_broadcast(true)?;
988
989 let socket = Arc::new(socket);
990 self.socket = Some(socket.clone());
991
992 let incoming_tx = self.incoming_tx.clone();
994 let metrics = self.metrics.clone();
995 let collection_map = self.collection_map.clone();
996 let buffer_size = self.config.udp.buffer_size;
997 let running = self.running.clone();
998
999 running.store(true, Ordering::SeqCst);
1000
1001 tokio::spawn(async move {
1002 let mut buf = vec![0u8; buffer_size];
1003
1004 while running.load(Ordering::SeqCst) {
1005 match tokio::time::timeout(Duration::from_millis(100), socket.recv_from(&mut buf))
1006 .await
1007 {
1008 Ok(Ok((len, src))) => {
1009 let received_at = Instant::now();
1010
1011 if len < BypassHeader::SIZE {
1013 metrics.invalid_dropped.fetch_add(1, Ordering::Relaxed);
1014 continue;
1015 }
1016
1017 let header = match BypassHeader::decode(&buf[..BypassHeader::SIZE]) {
1018 Ok(h) => h,
1019 Err(_) => {
1020 metrics.invalid_dropped.fetch_add(1, Ordering::Relaxed);
1021 continue;
1022 }
1023 };
1024
1025 let payload = buf[BypassHeader::SIZE..len].to_vec();
1027
1028 let priority = collection_map
1030 .get(&header.collection_hash)
1031 .map(|c| c.priority)
1032 .unwrap_or(MessagePriority::Normal);
1033
1034 let message = BypassMessage {
1035 source: src,
1036 collection_hash: header.collection_hash,
1037 data: payload,
1038 received_at,
1039 sequence: header.sequence,
1040 priority,
1041 };
1042
1043 metrics.messages_received.fetch_add(1, Ordering::Relaxed);
1044 metrics
1045 .bytes_received
1046 .fetch_add(len as u64, Ordering::Relaxed);
1047
1048 let _ = incoming_tx.send(message);
1050 }
1051 Ok(Err(_e)) => {
1052 metrics.receive_errors.fetch_add(1, Ordering::Relaxed);
1053 }
1054 Err(_) => {
1055 }
1057 }
1058 }
1059 });
1060
1061 info!(
1062 "Bypass channel started on port {}",
1063 self.config.udp.bind_port
1064 );
1065 Ok(())
1066 }
1067
1068 pub fn stop(&mut self) {
1070 self.running.store(false, Ordering::SeqCst);
1071 self.socket = None;
1072 self.multicast_sockets
1073 .write()
1074 .unwrap_or_else(|e| e.into_inner())
1075 .clear();
1076 info!("Bypass channel stopped");
1077 }
1078
1079 pub fn is_running(&self) -> bool {
1081 self.running.load(Ordering::SeqCst)
1082 }
1083
1084 fn next_sequence(&self) -> u8 {
1086 self.sequence.fetch_add(1, Ordering::Relaxed)
1087 }
1088
1089 pub async fn send(&self, target: BypassTarget, collection: &str, data: &[u8]) -> Result<()> {
1091 let socket = self.socket.as_ref().ok_or(BypassError::NotStarted)?;
1092
1093 if self.config.max_message_size > 0 && data.len() > self.config.max_message_size {
1095 return Err(BypassError::MessageTooLarge {
1096 size: data.len(),
1097 max: self.config.max_message_size,
1098 });
1099 }
1100
1101 let ttl = self
1103 .config
1104 .get_collection(collection)
1105 .map(|c| c.ttl())
1106 .unwrap_or(Duration::from_secs(5));
1107
1108 let header = BypassHeader::new(collection, ttl, self.next_sequence());
1110 let header_bytes = header.encode();
1111
1112 let mut frame = Vec::with_capacity(BypassHeader::SIZE + data.len());
1114 frame.extend_from_slice(&header_bytes);
1115 frame.extend_from_slice(data);
1116
1117 let bytes_sent = match target {
1119 BypassTarget::Unicast(addr) => socket.send_to(&frame, addr).await?,
1120 BypassTarget::Multicast { group, port } => {
1121 let mcast_socket = self.get_or_create_multicast(group).await?;
1122 mcast_socket.send_to(&frame, (group, port)).await?
1123 }
1124 BypassTarget::Broadcast { port } => {
1125 let broadcast_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::BROADCAST), port);
1126 socket.send_to(&frame, broadcast_addr).await?
1127 }
1128 };
1129
1130 self.metrics.messages_sent.fetch_add(1, Ordering::Relaxed);
1131 self.metrics
1132 .bytes_sent
1133 .fetch_add(bytes_sent as u64, Ordering::Relaxed);
1134
1135 Ok(())
1136 }
1137
1138 pub async fn send_to_collection(
1140 &self,
1141 collection: &str,
1142 target_addr: Option<SocketAddr>,
1143 data: &[u8],
1144 ) -> Result<()> {
1145 let config = self
1146 .config
1147 .get_collection(collection)
1148 .ok_or_else(|| BypassError::Config(format!("Unknown collection: {}", collection)))?;
1149
1150 let target = match &config.transport {
1151 BypassTransport::Unicast => {
1152 let addr = target_addr.ok_or_else(|| {
1153 BypassError::Config("Unicast requires target address".to_string())
1154 })?;
1155 BypassTarget::Unicast(addr)
1156 }
1157 BypassTransport::Multicast { group, port } => BypassTarget::Multicast {
1158 group: *group,
1159 port: *port,
1160 },
1161 BypassTransport::Broadcast => BypassTarget::Broadcast {
1162 port: self.config.udp.bind_port,
1163 },
1164 };
1165
1166 self.send(target, collection, data).await
1167 }
1168
1169 pub fn subscribe(&self) -> broadcast::Receiver<BypassMessage> {
1171 self.incoming_tx.subscribe()
1172 }
1173
1174 pub fn subscribe_collection(
1176 &self,
1177 collection: &str,
1178 ) -> (u32, broadcast::Receiver<BypassMessage>) {
1179 let hash = BypassHeader::hash_collection(collection);
1180 (hash, self.incoming_tx.subscribe())
1181 }
1182
1183 async fn get_or_create_multicast(&self, group: IpAddr) -> Result<Arc<UdpSocket>> {
1185 {
1187 let sockets = self
1188 .multicast_sockets
1189 .read()
1190 .unwrap_or_else(|e| e.into_inner());
1191 if let Some(socket) = sockets.get(&group) {
1192 return Ok(socket.clone());
1193 }
1194 }
1195
1196 let socket = UdpSocket::bind("0.0.0.0:0").await?;
1198
1199 match group {
1200 IpAddr::V4(addr) => {
1201 socket.join_multicast_v4(addr, Ipv4Addr::UNSPECIFIED)?;
1202 socket.set_multicast_ttl_v4(self.config.udp.multicast_ttl)?;
1203 }
1204 IpAddr::V6(addr) => {
1205 socket.join_multicast_v6(&addr, 0)?;
1206 }
1207 }
1208
1209 let socket = Arc::new(socket);
1210 self.multicast_sockets
1211 .write()
1212 .unwrap()
1213 .insert(group, socket.clone());
1214
1215 debug!("Joined multicast group: {}", group);
1216 Ok(socket)
1217 }
1218
1219 pub fn leave_multicast(&self, group: IpAddr) -> Result<()> {
1221 if let Some(socket) = self
1222 .multicast_sockets
1223 .write()
1224 .unwrap_or_else(|e| e.into_inner())
1225 .remove(&group)
1226 {
1227 match group {
1228 IpAddr::V4(addr) => {
1229 if let Ok(socket) = Arc::try_unwrap(socket) {
1231 let _ = socket.leave_multicast_v4(addr, Ipv4Addr::UNSPECIFIED);
1232 }
1233 }
1234 IpAddr::V6(addr) => {
1235 if let Ok(socket) = Arc::try_unwrap(socket) {
1236 let _ = socket.leave_multicast_v6(&addr, 0);
1237 }
1238 }
1239 }
1240 debug!("Left multicast group: {}", group);
1241 }
1242 Ok(())
1243 }
1244
1245 pub fn metrics(&self) -> BypassMetricsSnapshot {
1247 self.metrics.snapshot()
1248 }
1249
1250 pub fn config(&self) -> &BypassChannelConfig {
1252 &self.config
1253 }
1254
1255 pub fn is_bypass_collection(&self, name: &str) -> bool {
1257 self.config.is_bypass_collection(name)
1258 }
1259
1260 pub fn get_collection_by_hash(&self, hash: u32) -> Option<&BypassCollectionConfig> {
1262 self.collection_map.get(&hash)
1263 }
1264}
1265
1266#[cfg(test)]
1271mod tests {
1272 use super::*;
1273
1274 #[test]
1275 fn test_bypass_header_encode_decode() {
1276 let header = BypassHeader::new("test_collection", Duration::from_millis(1000), 42);
1277 let encoded = header.encode();
1278 let decoded = BypassHeader::decode(&encoded).unwrap();
1279
1280 assert_eq!(decoded.magic, BypassHeader::MAGIC);
1281 assert_eq!(decoded.collection_hash, header.collection_hash);
1282 assert_eq!(decoded.ttl_ms, 1000);
1283 assert_eq!(decoded.sequence, 42);
1284 assert!(decoded.is_valid());
1285 }
1286
1287 #[test]
1288 fn test_bypass_header_invalid_magic() {
1289 let mut data = [0u8; 12];
1290 data[0..4].copy_from_slice(&[0, 0, 0, 0]);
1291 let result = BypassHeader::decode(&data);
1292 assert!(result.is_err());
1293 }
1294
1295 #[test]
1296 fn test_bypass_header_too_short() {
1297 let data = [0u8; 8];
1298 let result = BypassHeader::decode(&data);
1299 assert!(result.is_err());
1300 }
1301
1302 #[test]
1303 fn test_collection_hash_consistency() {
1304 let hash1 = BypassHeader::hash_collection("position_updates");
1305 let hash2 = BypassHeader::hash_collection("position_updates");
1306 let hash3 = BypassHeader::hash_collection("sensor_data");
1307
1308 assert_eq!(hash1, hash2);
1309 assert_ne!(hash1, hash3);
1310 }
1311
1312 #[test]
1313 fn test_bypass_config() {
1314 let config = BypassChannelConfig::new()
1315 .with_collection(BypassCollectionConfig {
1316 collection: "positions".into(),
1317 transport: BypassTransport::Multicast {
1318 group: "239.1.1.100".parse().unwrap(),
1319 port: 5150,
1320 },
1321 encoding: MessageEncoding::Protobuf,
1322 ttl_ms: 200,
1323 priority: MessagePriority::High,
1324 })
1325 .with_collection(BypassCollectionConfig {
1326 collection: "telemetry".into(),
1327 transport: BypassTransport::Unicast,
1328 encoding: MessageEncoding::Cbor,
1329 ttl_ms: 5000,
1330 priority: MessagePriority::Normal,
1331 });
1332
1333 assert!(config.is_bypass_collection("positions"));
1334 assert!(config.is_bypass_collection("telemetry"));
1335 assert!(!config.is_bypass_collection("unknown"));
1336
1337 let pos_config = config.get_collection("positions").unwrap();
1338 assert_eq!(pos_config.priority, MessagePriority::High);
1339 }
1340
1341 #[test]
1342 fn test_ttl_clamping() {
1343 let header = BypassHeader::new("test", Duration::from_secs(1000), 0);
1345 assert_eq!(header.ttl_ms, u16::MAX);
1346 }
1347
1348 #[tokio::test]
1349 async fn test_bypass_channel_creation() {
1350 let config = BypassChannelConfig::new().with_collection(BypassCollectionConfig {
1351 collection: "test".into(),
1352 ..Default::default()
1353 });
1354
1355 let channel = UdpBypassChannel::new(config).await.unwrap();
1356 assert!(!channel.is_running());
1357 assert!(channel.is_bypass_collection("test"));
1358 }
1359
1360 #[tokio::test]
1361 async fn test_bypass_channel_start_stop() {
1362 let config = BypassChannelConfig {
1363 udp: UdpConfig {
1364 bind_port: 0, ..Default::default()
1366 },
1367 ..Default::default()
1368 };
1369
1370 let mut channel = UdpBypassChannel::new(config).await.unwrap();
1371
1372 channel.start().await.unwrap();
1373 assert!(channel.is_running());
1374
1375 channel.stop();
1376 assert!(!channel.is_running());
1377 }
1378
1379 #[tokio::test]
1380 async fn test_bypass_send_receive() {
1381 let config1 = BypassChannelConfig {
1383 udp: UdpConfig {
1384 bind_port: 0,
1385 ..Default::default()
1386 },
1387 collections: vec![BypassCollectionConfig {
1388 collection: "test".into(),
1389 ttl_ms: 5000,
1390 ..Default::default()
1391 }],
1392 ..Default::default()
1393 };
1394
1395 let config2 = BypassChannelConfig {
1396 udp: UdpConfig {
1397 bind_port: 0,
1398 ..Default::default()
1399 },
1400 collections: vec![BypassCollectionConfig {
1401 collection: "test".into(),
1402 ttl_ms: 5000,
1403 ..Default::default()
1404 }],
1405 ..Default::default()
1406 };
1407
1408 let mut channel1 = UdpBypassChannel::new(config1).await.unwrap();
1409 let mut channel2 = UdpBypassChannel::new(config2).await.unwrap();
1410
1411 channel1.start().await.unwrap();
1412 channel2.start().await.unwrap();
1413
1414 let socket2_port = channel2
1416 .socket
1417 .as_ref()
1418 .unwrap()
1419 .local_addr()
1420 .unwrap()
1421 .port();
1422 let socket2_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), socket2_port);
1423
1424 let mut rx = channel2.subscribe();
1426
1427 let test_data = b"Hello, bypass!";
1429 channel1
1430 .send(BypassTarget::Unicast(socket2_addr), "test", test_data)
1431 .await
1432 .unwrap();
1433
1434 let msg = tokio::time::timeout(Duration::from_millis(500), rx.recv())
1436 .await
1437 .expect("timeout")
1438 .expect("receive error");
1439
1440 assert_eq!(msg.data, test_data);
1441 assert_eq!(msg.collection_hash, BypassHeader::hash_collection("test"));
1442
1443 let metrics1 = channel1.metrics();
1445 assert_eq!(metrics1.messages_sent, 1);
1446 assert!(metrics1.bytes_sent > 0);
1447
1448 let metrics2 = channel2.metrics();
1449 assert_eq!(metrics2.messages_received, 1);
1450 assert!(metrics2.bytes_received > 0);
1451
1452 channel1.stop();
1453 channel2.stop();
1454 }
1455
1456 #[test]
1457 fn test_message_too_large() {
1458 let _config = BypassChannelConfig {
1460 max_message_size: 100,
1461 ..Default::default()
1462 };
1463
1464 let err = BypassError::MessageTooLarge {
1466 size: 200,
1467 max: 100,
1468 };
1469 assert!(err.to_string().contains("200"));
1470 assert!(err.to_string().contains("100"));
1471 }
1472
1473 #[test]
1478 fn test_security_config_none() {
1479 let config = BypassSecurityConfig::none();
1480 assert!(!config.require_signature);
1481 assert!(!config.encrypt_payload);
1482 assert!(config.source_allowlist.is_none());
1483 assert!(!config.replay_protection);
1484 assert!(!config.is_enabled());
1485 }
1486
1487 #[test]
1488 fn test_security_config_signed() {
1489 let config = BypassSecurityConfig::signed();
1490 assert!(config.require_signature);
1491 assert!(!config.encrypt_payload);
1492 assert!(config.is_enabled());
1493 }
1494
1495 #[test]
1496 fn test_security_config_full() {
1497 let config = BypassSecurityConfig::full_security();
1498 assert!(config.require_signature);
1499 assert!(config.encrypt_payload);
1500 assert!(config.replay_protection);
1501 assert_eq!(config.replay_window_size, 64);
1502 assert!(config.is_enabled());
1503 }
1504
1505 #[test]
1506 fn test_security_config_allowlist() {
1507 let ip1: IpAddr = "192.168.1.1".parse().unwrap();
1508 let ip2: IpAddr = "192.168.1.2".parse().unwrap();
1509 let ip3: IpAddr = "10.0.0.1".parse().unwrap();
1510
1511 let config = BypassSecurityConfig::none();
1513 assert!(config.is_source_allowed(&ip1));
1514 assert!(config.is_source_allowed(&ip2));
1515 assert!(config.is_source_allowed(&ip3));
1516
1517 let config = BypassSecurityConfig {
1519 source_allowlist: Some(vec![ip1, ip2]),
1520 ..Default::default()
1521 };
1522 assert!(config.is_source_allowed(&ip1));
1523 assert!(config.is_source_allowed(&ip2));
1524 assert!(!config.is_source_allowed(&ip3));
1525 }
1526
1527 #[test]
1528 fn test_credentials_signing() {
1529 use ed25519_dalek::SigningKey;
1530
1531 let seed: [u8; 32] = [1u8; 32];
1533 let signing_key = SigningKey::from_bytes(&seed);
1534 let verifying_key = signing_key.verifying_key();
1535
1536 let peer_ip: IpAddr = "192.168.1.1".parse().unwrap();
1538 let creds = BypassSecurityCredentials::new()
1539 .with_signing_key(signing_key)
1540 .with_peer_key(peer_ip.to_string(), verifying_key);
1541
1542 let message = b"test message for signing";
1544 let signature = creds.sign(message).expect("signing should succeed");
1545
1546 creds
1548 .verify_by_ip(&peer_ip, message, &signature)
1549 .expect("verification should succeed");
1550 }
1551
1552 #[test]
1553 fn test_credentials_invalid_signature() {
1554 use ed25519_dalek::SigningKey;
1555
1556 let seed1: [u8; 32] = [1u8; 32];
1558 let seed2: [u8; 32] = [2u8; 32];
1559 let signing_key1 = SigningKey::from_bytes(&seed1);
1560 let signing_key2 = SigningKey::from_bytes(&seed2);
1561 let verifying_key2 = signing_key2.verifying_key();
1562
1563 let peer_ip: IpAddr = "192.168.1.1".parse().unwrap();
1565 let creds = BypassSecurityCredentials::new()
1566 .with_signing_key(signing_key1)
1567 .with_peer_key(peer_ip.to_string(), verifying_key2);
1568
1569 let message = b"test message";
1571 let signature = creds.sign(message).expect("signing should succeed");
1572
1573 let result = creds.verify_by_ip(&peer_ip, message, &signature);
1575 assert!(matches!(result, Err(BypassError::InvalidSignature)));
1576 }
1577
1578 #[test]
1579 fn test_credentials_encryption() {
1580 let encryption_key = [42u8; 32];
1581 let creds = BypassSecurityCredentials::new().with_encryption_key(encryption_key);
1582
1583 let plaintext = b"secret message for encryption";
1584 let nonce = [0u8; 12];
1585
1586 let ciphertext = creds
1588 .encrypt(plaintext, &nonce)
1589 .expect("encryption should succeed");
1590 assert_ne!(ciphertext.as_slice(), plaintext);
1591 assert_eq!(ciphertext.len(), plaintext.len() + 16);
1593
1594 let decrypted = creds
1596 .decrypt(&ciphertext, &nonce)
1597 .expect("decryption should succeed");
1598 assert_eq!(decrypted, plaintext);
1599 }
1600
1601 #[test]
1602 fn test_credentials_decryption_wrong_key() {
1603 let key1 = [1u8; 32];
1604 let key2 = [2u8; 32];
1605
1606 let creds1 = BypassSecurityCredentials::new().with_encryption_key(key1);
1607 let creds2 = BypassSecurityCredentials::new().with_encryption_key(key2);
1608
1609 let plaintext = b"secret message";
1610 let nonce = [0u8; 12];
1611
1612 let ciphertext = creds1
1614 .encrypt(plaintext, &nonce)
1615 .expect("encryption should succeed");
1616
1617 let result = creds2.decrypt(&ciphertext, &nonce);
1619 assert!(matches!(result, Err(BypassError::DecryptionFailed)));
1620 }
1621
1622 #[test]
1623 fn test_credentials_missing_signing_key() {
1624 let creds = BypassSecurityCredentials::new();
1625 let result = creds.sign(b"message");
1626 assert!(matches!(result, Err(BypassError::MissingCredential(_))));
1627 }
1628
1629 #[test]
1630 fn test_credentials_missing_encryption_key() {
1631 let creds = BypassSecurityCredentials::new();
1632 let result = creds.encrypt(b"message", &[0u8; 12]);
1633 assert!(matches!(result, Err(BypassError::MissingCredential(_))));
1634 }
1635
1636 #[test]
1637 fn test_replay_tracker_first_message() {
1638 let tracker = ReplayTracker::new(64);
1639 let source: IpAddr = "192.168.1.1".parse().unwrap();
1640
1641 tracker
1643 .check(&source, 0)
1644 .expect("first message should succeed");
1645 }
1646
1647 #[test]
1648 fn test_replay_tracker_sequential() {
1649 let tracker = ReplayTracker::new(64);
1650 let source: IpAddr = "192.168.1.1".parse().unwrap();
1651
1652 for seq in 0..10u8 {
1654 tracker
1655 .check(&source, seq)
1656 .unwrap_or_else(|_| panic!("sequence {} should succeed", seq));
1657 }
1658 }
1659
1660 #[test]
1661 fn test_replay_tracker_replay_detected() {
1662 let tracker = ReplayTracker::new(64);
1663 let source: IpAddr = "192.168.1.1".parse().unwrap();
1664
1665 tracker.check(&source, 5).expect("first should succeed");
1667
1668 let result = tracker.check(&source, 5);
1670 assert!(matches!(
1671 result,
1672 Err(BypassError::ReplayDetected { sequence: 5 })
1673 ));
1674 }
1675
1676 #[test]
1677 fn test_replay_tracker_out_of_order() {
1678 let tracker = ReplayTracker::new(64);
1679 let source: IpAddr = "192.168.1.1".parse().unwrap();
1680
1681 tracker.check(&source, 10).expect("10 should succeed");
1683 tracker
1684 .check(&source, 8)
1685 .expect("8 should succeed (within window)");
1686 tracker.check(&source, 12).expect("12 should succeed");
1687 tracker
1688 .check(&source, 9)
1689 .expect("9 should succeed (within window)");
1690
1691 let result = tracker.check(&source, 8);
1693 assert!(matches!(
1694 result,
1695 Err(BypassError::ReplayDetected { sequence: 8 })
1696 ));
1697 }
1698
1699 #[test]
1700 fn test_replay_tracker_multiple_sources() {
1701 let tracker = ReplayTracker::new(64);
1702 let source1: IpAddr = "192.168.1.1".parse().unwrap();
1703 let source2: IpAddr = "192.168.1.2".parse().unwrap();
1704
1705 tracker
1707 .check(&source1, 5)
1708 .expect("source1 seq 5 should succeed");
1709 tracker
1710 .check(&source2, 5)
1711 .expect("source2 seq 5 should succeed");
1712
1713 let result = tracker.check(&source1, 5);
1715 assert!(matches!(result, Err(BypassError::ReplayDetected { .. })));
1716
1717 tracker
1719 .check(&source1, 6)
1720 .expect("source1 seq 6 should succeed");
1721 }
1722
1723 #[test]
1724 fn test_replay_tracker_window_advance() {
1725 let tracker = ReplayTracker::new(64);
1726 let source: IpAddr = "192.168.1.1".parse().unwrap();
1727
1728 tracker.check(&source, 100).expect("100 should succeed");
1730 tracker.check(&source, 110).expect("110 should succeed");
1731 tracker.check(&source, 120).expect("120 should succeed");
1732
1733 tracker.check(&source, 200).expect("200 should succeed");
1735
1736 tracker.check(&source, 201).expect("201 should succeed");
1739 }
1740
1741 #[test]
1742 fn test_replay_tracker_clear() {
1743 let tracker = ReplayTracker::new(64);
1744 let source: IpAddr = "192.168.1.1".parse().unwrap();
1745
1746 tracker.check(&source, 5).expect("initial should succeed");
1747
1748 tracker.clear_source(&source);
1750 tracker
1751 .check(&source, 5)
1752 .expect("after clear should succeed");
1753 }
1754
1755 #[test]
1756 fn test_metrics_snapshot_includes_security() {
1757 let metrics = BypassMetrics::default();
1758
1759 metrics.signature_rejected.fetch_add(1, Ordering::Relaxed);
1761 metrics.decryption_failed.fetch_add(2, Ordering::Relaxed);
1762 metrics.unauthorized_source.fetch_add(3, Ordering::Relaxed);
1763 metrics.replay_rejected.fetch_add(4, Ordering::Relaxed);
1764
1765 let snapshot = metrics.snapshot();
1767 assert_eq!(snapshot.signature_rejected, 1);
1768 assert_eq!(snapshot.decryption_failed, 2);
1769 assert_eq!(snapshot.unauthorized_source, 3);
1770 assert_eq!(snapshot.replay_rejected, 4);
1771 }
1772
1773 #[test]
1774 fn test_bypass_error_display_all_variants() {
1775 let io_err = BypassError::Io(std::io::Error::new(std::io::ErrorKind::Other, "test io"));
1776 assert!(io_err.to_string().contains("IO error"));
1777 assert!(io_err.to_string().contains("test io"));
1778
1779 let encode_err = BypassError::Encode("bad data".to_string());
1780 assert!(encode_err.to_string().contains("Encode error"));
1781
1782 let decode_err = BypassError::Decode("corrupt".to_string());
1783 assert!(decode_err.to_string().contains("Decode error"));
1784
1785 let config_err = BypassError::Config("bad config".to_string());
1786 assert!(config_err.to_string().contains("Config error"));
1787
1788 let not_started = BypassError::NotStarted;
1789 assert!(not_started.to_string().contains("not started"));
1790
1791 let invalid_header = BypassError::InvalidHeader;
1792 assert!(invalid_header.to_string().contains("Invalid bypass header"));
1793
1794 let stale = BypassError::StaleMessage;
1795 assert!(stale.to_string().contains("stale"));
1796
1797 let sig_err = BypassError::InvalidSignature;
1798 assert!(sig_err.to_string().contains("Invalid message signature"));
1799
1800 let decrypt_err = BypassError::DecryptionFailed;
1801 assert!(decrypt_err.to_string().contains("decryption failed"));
1802
1803 let unauth = BypassError::UnauthorizedSource("10.0.0.1".parse().unwrap());
1804 assert!(unauth.to_string().contains("10.0.0.1"));
1805
1806 let replay = BypassError::ReplayDetected { sequence: 42 };
1807 assert!(replay.to_string().contains("42"));
1808
1809 let missing = BypassError::MissingCredential("encryption key".to_string());
1810 assert!(missing.to_string().contains("encryption key"));
1811 }
1812
1813 #[test]
1814 fn test_bypass_error_source() {
1815 let io_err = BypassError::Io(std::io::Error::new(std::io::ErrorKind::Other, "test"));
1816 assert!(std::error::Error::source(&io_err).is_some());
1817
1818 let encode_err = BypassError::Encode("test".into());
1819 assert!(std::error::Error::source(&encode_err).is_none());
1820
1821 let not_started = BypassError::NotStarted;
1822 assert!(std::error::Error::source(¬_started).is_none());
1823 }
1824
1825 #[test]
1826 fn test_bypass_error_from_io() {
1827 let io_err = std::io::Error::new(std::io::ErrorKind::NotFound, "gone");
1828 let bypass_err: BypassError = io_err.into();
1829 assert!(bypass_err.to_string().contains("gone"));
1830 }
1831
1832 #[test]
1833 fn test_message_encoding_display() {
1834 assert_eq!(MessageEncoding::Protobuf.to_string(), "protobuf");
1835 assert_eq!(MessageEncoding::Json.to_string(), "json");
1836 assert_eq!(MessageEncoding::Raw.to_string(), "raw");
1837 assert_eq!(MessageEncoding::Cbor.to_string(), "cbor");
1838 }
1839
1840 #[test]
1841 fn test_bypass_collection_config_ttl() {
1842 let config = BypassCollectionConfig {
1843 ttl_ms: 200,
1844 ..Default::default()
1845 };
1846 assert_eq!(config.ttl(), Duration::from_millis(200));
1847 }
1848
1849 #[test]
1850 fn test_bypass_collection_config_default() {
1851 let config = BypassCollectionConfig::default();
1852 assert!(config.collection.is_empty());
1853 assert_eq!(config.transport, BypassTransport::Unicast);
1854 assert_eq!(config.encoding, MessageEncoding::Protobuf);
1855 assert_eq!(config.ttl_ms, 5000);
1856 assert_eq!(config.priority, MessagePriority::Normal);
1857 }
1858
1859 #[test]
1860 fn test_udp_config_default() {
1861 let config = UdpConfig::default();
1862 assert_eq!(config.bind_port, 5150);
1863 assert_eq!(config.buffer_size, 65536);
1864 assert_eq!(config.multicast_ttl, 32);
1865 }
1866
1867 #[test]
1868 fn test_bypass_channel_config_default() {
1869 let config = BypassChannelConfig::default();
1870 assert!(config.collections.is_empty());
1871 assert!(!config.multicast_enabled);
1872 assert_eq!(config.max_message_size, 0);
1873 }
1874
1875 #[test]
1876 fn test_bypass_channel_config_new() {
1877 let config = BypassChannelConfig::new();
1878 assert!(config.multicast_enabled);
1879 assert_eq!(config.max_message_size, 65000);
1880 }
1881
1882 #[test]
1883 fn test_bypass_security_config_is_enabled_allowlist() {
1884 let config = BypassSecurityConfig {
1885 source_allowlist: Some(vec!["10.0.0.1".parse().unwrap()]),
1886 ..Default::default()
1887 };
1888 assert!(config.is_enabled());
1889 }
1890
1891 #[test]
1892 fn test_bypass_security_config_is_enabled_replay() {
1893 let config = BypassSecurityConfig {
1894 replay_protection: true,
1895 ..Default::default()
1896 };
1897 assert!(config.is_enabled());
1898 }
1899
1900 #[test]
1901 fn test_bypass_security_credentials_debug() {
1902 let creds = BypassSecurityCredentials::new();
1903 let debug = format!("{:?}", creds);
1904 assert!(debug.contains("has_signing_key"));
1905 assert!(debug.contains("false"));
1906 assert!(debug.contains("peer_keys_count"));
1907 }
1908
1909 #[test]
1910 fn test_bypass_security_credentials_verifying_key() {
1911 let creds = BypassSecurityCredentials::new();
1913 assert!(creds.verifying_key().is_none());
1914
1915 let seed: [u8; 32] = [42u8; 32];
1917 let signing_key = ed25519_dalek::SigningKey::from_bytes(&seed);
1918 let creds = BypassSecurityCredentials::new().with_signing_key(signing_key.clone());
1919 let vk = creds.verifying_key();
1920 assert!(vk.is_some());
1921 assert_eq!(vk.unwrap(), signing_key.verifying_key());
1922 }
1923
1924 #[test]
1925 fn test_bypass_security_credentials_verify_unknown_peer() {
1926 let creds = BypassSecurityCredentials::new();
1927 let sig = ed25519_dalek::Signature::from_bytes(&[0u8; 64]);
1928 let result = creds.verify("unknown-peer", b"message", &sig);
1929 assert!(matches!(result, Err(BypassError::MissingCredential(_))));
1930 }
1931
1932 #[test]
1933 fn test_bypass_security_credentials_decrypt_no_key() {
1934 let creds = BypassSecurityCredentials::new();
1935 let result = creds.decrypt(b"ciphertext", &[0u8; 12]);
1936 assert!(matches!(result, Err(BypassError::MissingCredential(_))));
1937 }
1938
1939 #[test]
1940 fn test_replay_tracker_clear_all() {
1941 let tracker = ReplayTracker::new(64);
1942 let source1: IpAddr = "192.168.1.1".parse().unwrap();
1943 let source2: IpAddr = "192.168.1.2".parse().unwrap();
1944
1945 tracker.check(&source1, 1).unwrap();
1946 tracker.check(&source2, 1).unwrap();
1947
1948 tracker.clear_all();
1949
1950 tracker.check(&source1, 1).unwrap();
1952 tracker.check(&source2, 1).unwrap();
1953 }
1954
1955 #[test]
1956 fn test_bypass_header_is_stale() {
1957 let header = BypassHeader::new("test", Duration::from_millis(100), 0);
1958 let now = Instant::now();
1959
1960 assert!(!header.is_stale(now, now));
1962
1963 let sent_at = now - Duration::from_secs(1);
1965 assert!(header.is_stale(now, sent_at));
1966 }
1967
1968 #[test]
1969 fn test_bypass_metrics_snapshot_default() {
1970 let snapshot = BypassMetricsSnapshot::default();
1971 assert_eq!(snapshot.messages_sent, 0);
1972 assert_eq!(snapshot.messages_received, 0);
1973 assert_eq!(snapshot.bytes_sent, 0);
1974 assert_eq!(snapshot.bytes_received, 0);
1975 assert_eq!(snapshot.stale_dropped, 0);
1976 assert_eq!(snapshot.invalid_dropped, 0);
1977 assert_eq!(snapshot.send_errors, 0);
1978 assert_eq!(snapshot.receive_errors, 0);
1979 }
1980
1981 #[tokio::test]
1982 async fn test_bypass_channel_subscribe_collection() {
1983 let config = BypassChannelConfig::new().with_collection(BypassCollectionConfig {
1984 collection: "positions".into(),
1985 ..Default::default()
1986 });
1987
1988 let channel = UdpBypassChannel::new(config).await.unwrap();
1989 let (hash, _rx) = channel.subscribe_collection("positions");
1990 assert_eq!(hash, BypassHeader::hash_collection("positions"));
1991 }
1992
1993 #[tokio::test]
1994 async fn test_bypass_channel_get_collection_by_hash() {
1995 let config = BypassChannelConfig::new().with_collection(BypassCollectionConfig {
1996 collection: "telemetry".into(),
1997 ttl_ms: 200,
1998 ..Default::default()
1999 });
2000
2001 let channel = UdpBypassChannel::new(config).await.unwrap();
2002 let hash = BypassHeader::hash_collection("telemetry");
2003 let col_config = channel.get_collection_by_hash(hash);
2004 assert!(col_config.is_some());
2005 assert_eq!(col_config.unwrap().ttl_ms, 200);
2006
2007 assert!(channel.get_collection_by_hash(12345).is_none());
2009 }
2010
2011 #[tokio::test]
2012 async fn test_bypass_channel_config_accessor() {
2013 let config = BypassChannelConfig {
2014 max_message_size: 1024,
2015 ..BypassChannelConfig::new()
2016 };
2017 let channel = UdpBypassChannel::new(config).await.unwrap();
2018 assert_eq!(channel.config().max_message_size, 1024);
2019 }
2020
2021 #[tokio::test]
2022 async fn test_bypass_send_not_started() {
2023 let config = BypassChannelConfig::new();
2024 let channel = UdpBypassChannel::new(config).await.unwrap();
2025
2026 let result = channel
2027 .send(
2028 BypassTarget::Unicast("127.0.0.1:5000".parse().unwrap()),
2029 "test",
2030 b"data",
2031 )
2032 .await;
2033 assert!(matches!(result, Err(BypassError::NotStarted)));
2034 }
2035
2036 #[tokio::test]
2037 async fn test_bypass_send_message_too_large() {
2038 let config = BypassChannelConfig {
2039 max_message_size: 10,
2040 udp: UdpConfig {
2041 bind_port: 0,
2042 ..Default::default()
2043 },
2044 ..BypassChannelConfig::new()
2045 };
2046 let mut channel = UdpBypassChannel::new(config).await.unwrap();
2047 channel.start().await.unwrap();
2048
2049 let big_data = vec![0u8; 100];
2050 let result = channel
2051 .send(
2052 BypassTarget::Unicast("127.0.0.1:5000".parse().unwrap()),
2053 "test",
2054 &big_data,
2055 )
2056 .await;
2057 assert!(matches!(
2058 result,
2059 Err(BypassError::MessageTooLarge { size: 100, max: 10 })
2060 ));
2061
2062 channel.stop();
2063 }
2064
2065 #[tokio::test]
2066 async fn test_bypass_send_to_collection_unknown() {
2067 let config = BypassChannelConfig {
2068 udp: UdpConfig {
2069 bind_port: 0,
2070 ..Default::default()
2071 },
2072 ..BypassChannelConfig::new()
2073 };
2074 let mut channel = UdpBypassChannel::new(config).await.unwrap();
2075 channel.start().await.unwrap();
2076
2077 let result = channel.send_to_collection("unknown", None, b"data").await;
2078 assert!(matches!(result, Err(BypassError::Config(_))));
2079
2080 channel.stop();
2081 }
2082
2083 #[tokio::test]
2084 async fn test_bypass_send_to_collection_unicast_no_addr() {
2085 let config = BypassChannelConfig {
2086 udp: UdpConfig {
2087 bind_port: 0,
2088 ..Default::default()
2089 },
2090 collections: vec![BypassCollectionConfig {
2091 collection: "test".into(),
2092 transport: BypassTransport::Unicast,
2093 ..Default::default()
2094 }],
2095 ..BypassChannelConfig::new()
2096 };
2097 let mut channel = UdpBypassChannel::new(config).await.unwrap();
2098 channel.start().await.unwrap();
2099
2100 let result = channel.send_to_collection("test", None, b"data").await;
2102 assert!(matches!(result, Err(BypassError::Config(_))));
2103
2104 channel.stop();
2105 }
2106
2107 #[tokio::test]
2108 async fn test_bypass_send_to_collection_broadcast() {
2109 let config = BypassChannelConfig {
2110 udp: UdpConfig {
2111 bind_port: 0,
2112 ..Default::default()
2113 },
2114 collections: vec![BypassCollectionConfig {
2115 collection: "bcast".into(),
2116 transport: BypassTransport::Broadcast,
2117 ..Default::default()
2118 }],
2119 ..BypassChannelConfig::new()
2120 };
2121 let mut channel = UdpBypassChannel::new(config).await.unwrap();
2122 channel.start().await.unwrap();
2123
2124 let _result = channel.send_to_collection("bcast", None, b"data").await;
2127
2128 channel.stop();
2129 }
2130
2131 #[tokio::test]
2132 async fn test_bypass_leave_multicast_no_socket() {
2133 let config = BypassChannelConfig::new();
2134 let channel = UdpBypassChannel::new(config).await.unwrap();
2135
2136 let result = channel.leave_multicast("239.1.1.1".parse().unwrap());
2138 assert!(result.is_ok());
2139 }
2140
2141 #[test]
2142 fn test_bypass_transport_serde() {
2143 let unicast = BypassTransport::Unicast;
2144 let json = serde_json::to_string(&unicast).unwrap();
2145 let parsed: BypassTransport = serde_json::from_str(&json).unwrap();
2146 assert_eq!(parsed, BypassTransport::Unicast);
2147
2148 let multicast = BypassTransport::Multicast {
2149 group: "239.1.1.100".parse().unwrap(),
2150 port: 5150,
2151 };
2152 let json = serde_json::to_string(&multicast).unwrap();
2153 let parsed: BypassTransport = serde_json::from_str(&json).unwrap();
2154 assert_eq!(parsed, multicast);
2155 }
2156
2157 #[test]
2158 fn test_message_encoding_serde() {
2159 for encoding in &[
2160 MessageEncoding::Protobuf,
2161 MessageEncoding::Json,
2162 MessageEncoding::Raw,
2163 MessageEncoding::Cbor,
2164 ] {
2165 let json = serde_json::to_string(encoding).unwrap();
2166 let parsed: MessageEncoding = serde_json::from_str(&json).unwrap();
2167 assert_eq!(parsed, *encoding);
2168 }
2169 }
2170
2171 #[test]
2172 fn test_bypass_header_flags() {
2173 let mut header = BypassHeader::new("test", Duration::from_millis(1000), 0);
2174
2175 assert_eq!(header.flags, 0);
2177 assert_eq!(header.flags & BypassHeader::FLAG_SIGNED, 0);
2178 assert_eq!(header.flags & BypassHeader::FLAG_ENCRYPTED, 0);
2179
2180 header.flags |= BypassHeader::FLAG_SIGNED;
2182 assert_ne!(header.flags & BypassHeader::FLAG_SIGNED, 0);
2183 assert_eq!(header.flags & BypassHeader::FLAG_ENCRYPTED, 0);
2184
2185 header.flags |= BypassHeader::FLAG_ENCRYPTED;
2187 assert_ne!(header.flags & BypassHeader::FLAG_SIGNED, 0);
2188 assert_ne!(header.flags & BypassHeader::FLAG_ENCRYPTED, 0);
2189
2190 let encoded = header.encode();
2192 let decoded = BypassHeader::decode(&encoded).unwrap();
2193 assert_eq!(decoded.flags, header.flags);
2194 }
2195}