1use std::collections::HashMap;
61use std::hash::{Hash, Hasher};
62use std::net::{IpAddr, Ipv4Addr, SocketAddr};
63use std::sync::atomic::{AtomicBool, AtomicU64, AtomicU8, Ordering};
64use std::sync::{Arc, RwLock};
65use std::time::{Duration, Instant};
66
67use chacha20poly1305::{
68 aead::{Aead, KeyInit},
69 ChaCha20Poly1305, Nonce,
70};
71use ed25519_dalek::{Signature, Signer, SigningKey, Verifier, VerifyingKey};
72use serde::{Deserialize, Serialize};
73use tokio::net::UdpSocket;
74use tokio::sync::broadcast;
75use tracing::{debug, info};
76
77use super::MessagePriority;
78
79#[derive(Debug)]
85pub enum BypassError {
86 Io(std::io::Error),
88 Encode(String),
90 Decode(String),
92 Config(String),
94 NotStarted,
96 MessageTooLarge { size: usize, max: usize },
98 InvalidHeader,
100 StaleMessage,
102 InvalidSignature,
104 DecryptionFailed,
106 UnauthorizedSource(IpAddr),
108 ReplayDetected { sequence: u8 },
110 MissingCredential(String),
112}
113
114impl std::fmt::Display for BypassError {
115 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
116 match self {
117 BypassError::Io(e) => write!(f, "IO error: {}", e),
118 BypassError::Encode(msg) => write!(f, "Encode error: {}", msg),
119 BypassError::Decode(msg) => write!(f, "Decode error: {}", msg),
120 BypassError::Config(msg) => write!(f, "Config error: {}", msg),
121 BypassError::NotStarted => write!(f, "Bypass channel not started"),
122 BypassError::MessageTooLarge { size, max } => {
123 write!(f, "Message too large: {} bytes (max {})", size, max)
124 }
125 BypassError::InvalidHeader => write!(f, "Invalid bypass header"),
126 BypassError::StaleMessage => write!(f, "Message is stale (past TTL)"),
127 BypassError::InvalidSignature => write!(f, "Invalid message signature"),
128 BypassError::DecryptionFailed => write!(f, "Message decryption failed"),
129 BypassError::UnauthorizedSource(ip) => {
130 write!(f, "Unauthorized source IP: {}", ip)
131 }
132 BypassError::ReplayDetected { sequence } => {
133 write!(f, "Replay attack detected (sequence {})", sequence)
134 }
135 BypassError::MissingCredential(what) => {
136 write!(f, "Missing security credential: {}", what)
137 }
138 }
139 }
140}
141
142impl std::error::Error for BypassError {
143 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
144 match self {
145 BypassError::Io(e) => Some(e),
146 _ => None,
147 }
148 }
149}
150
151impl From<std::io::Error> for BypassError {
152 fn from(err: std::io::Error) -> Self {
153 BypassError::Io(err)
154 }
155}
156
157pub type Result<T> = std::result::Result<T, BypassError>;
158
159#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize, Deserialize)]
165#[serde(tag = "type", rename_all = "snake_case")]
166pub enum BypassTransport {
167 #[default]
169 Unicast,
170 Multicast {
172 group: IpAddr,
174 port: u16,
176 },
177 Broadcast,
179}
180
181#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
183#[serde(rename_all = "snake_case")]
184pub enum MessageEncoding {
185 #[default]
187 Protobuf,
188 Json,
190 Raw,
192 Cbor,
194}
195
196impl std::fmt::Display for MessageEncoding {
197 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
198 match self {
199 MessageEncoding::Protobuf => write!(f, "protobuf"),
200 MessageEncoding::Json => write!(f, "json"),
201 MessageEncoding::Raw => write!(f, "raw"),
202 MessageEncoding::Cbor => write!(f, "cbor"),
203 }
204 }
205}
206
207#[derive(Debug, Clone, Serialize, Deserialize)]
209pub struct BypassCollectionConfig {
210 pub collection: String,
212 pub transport: BypassTransport,
214 pub encoding: MessageEncoding,
216 #[serde(default = "default_ttl_ms")]
218 pub ttl_ms: u64,
219 #[serde(default)]
221 pub priority: MessagePriority,
222}
223
224fn default_ttl_ms() -> u64 {
225 5000
226}
227
228impl BypassCollectionConfig {
229 pub fn ttl(&self) -> Duration {
231 Duration::from_millis(self.ttl_ms)
232 }
233}
234
235impl Default for BypassCollectionConfig {
236 fn default() -> Self {
237 Self {
238 collection: String::new(),
239 transport: BypassTransport::Unicast,
240 encoding: MessageEncoding::Protobuf,
241 ttl_ms: 5000,
242 priority: MessagePriority::Normal,
243 }
244 }
245}
246
247#[derive(Debug, Clone, Serialize, Deserialize)]
249pub struct UdpConfig {
250 pub bind_port: u16,
252 pub buffer_size: usize,
254 pub multicast_ttl: u32,
256}
257
258impl Default for UdpConfig {
259 fn default() -> Self {
260 Self {
261 bind_port: 5150,
262 buffer_size: 65536,
263 multicast_ttl: 32,
264 }
265 }
266}
267
268#[derive(Debug, Clone, Default, Serialize, Deserialize)]
270pub struct BypassChannelConfig {
271 pub udp: UdpConfig,
273 pub collections: Vec<BypassCollectionConfig>,
275 pub multicast_enabled: bool,
277 pub max_message_size: usize,
279}
280
281impl BypassChannelConfig {
282 pub fn new() -> Self {
284 Self {
285 udp: UdpConfig::default(),
286 collections: Vec::new(),
287 multicast_enabled: true,
288 max_message_size: 65000, }
290 }
291
292 pub fn with_collection(mut self, config: BypassCollectionConfig) -> Self {
294 self.collections.push(config);
295 self
296 }
297
298 pub fn get_collection(&self, name: &str) -> Option<&BypassCollectionConfig> {
300 self.collections.iter().find(|c| c.collection == name)
301 }
302
303 pub fn is_bypass_collection(&self, name: &str) -> bool {
305 self.collections.iter().any(|c| c.collection == name)
306 }
307}
308
309#[derive(Debug, Clone, Default)]
354pub struct BypassSecurityConfig {
355 pub require_signature: bool,
362
363 pub encrypt_payload: bool,
370
371 pub source_allowlist: Option<Vec<IpAddr>>,
378
379 pub replay_protection: bool,
386
387 pub replay_window_size: usize,
392}
393
394impl BypassSecurityConfig {
395 pub fn none() -> Self {
397 Self::default()
398 }
399
400 pub fn signed() -> Self {
402 Self {
403 require_signature: true,
404 ..Default::default()
405 }
406 }
407
408 pub fn full_security() -> Self {
410 Self {
411 require_signature: true,
412 encrypt_payload: true,
413 source_allowlist: None, replay_protection: true,
415 replay_window_size: 64,
416 }
417 }
418
419 pub fn is_enabled(&self) -> bool {
421 self.require_signature
422 || self.encrypt_payload
423 || self.source_allowlist.is_some()
424 || self.replay_protection
425 }
426
427 pub fn is_source_allowed(&self, ip: &IpAddr) -> bool {
429 match &self.source_allowlist {
430 Some(list) => list.contains(ip),
431 None => true, }
433 }
434}
435
436#[derive(Clone, Default)]
440pub struct BypassSecurityCredentials {
441 signing_key: Option<SigningKey>,
443
444 peer_keys: HashMap<String, VerifyingKey>,
447
448 encryption_key: Option<[u8; 32]>,
451}
452
453impl std::fmt::Debug for BypassSecurityCredentials {
454 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
455 f.debug_struct("BypassSecurityCredentials")
456 .field("has_signing_key", &self.signing_key.is_some())
457 .field("peer_keys_count", &self.peer_keys.len())
458 .field("has_encryption_key", &self.encryption_key.is_some())
459 .finish()
460 }
461}
462
463impl BypassSecurityCredentials {
464 pub fn new() -> Self {
466 Self::default()
467 }
468
469 pub fn with_signing_key(mut self, key: SigningKey) -> Self {
471 self.signing_key = Some(key);
472 self
473 }
474
475 pub fn with_peer_key(mut self, peer_id: impl Into<String>, key: VerifyingKey) -> Self {
477 self.peer_keys.insert(peer_id.into(), key);
478 self
479 }
480
481 pub fn with_encryption_key(mut self, key: [u8; 32]) -> Self {
483 self.encryption_key = Some(key);
484 self
485 }
486
487 pub fn verifying_key(&self) -> Option<VerifyingKey> {
489 self.signing_key.as_ref().map(|k| k.verifying_key())
490 }
491
492 pub fn sign(&self, message: &[u8]) -> Result<Signature> {
494 let key = self
495 .signing_key
496 .as_ref()
497 .ok_or_else(|| BypassError::MissingCredential("signing key".into()))?;
498 Ok(key.sign(message))
499 }
500
501 pub fn verify(&self, peer_id: &str, message: &[u8], signature: &Signature) -> Result<()> {
503 let key = self
504 .peer_keys
505 .get(peer_id)
506 .ok_or_else(|| BypassError::MissingCredential(format!("peer key for {}", peer_id)))?;
507 key.verify(message, signature)
508 .map_err(|_| BypassError::InvalidSignature)
509 }
510
511 pub fn verify_by_ip(&self, ip: &IpAddr, message: &[u8], signature: &Signature) -> Result<()> {
513 self.verify(&ip.to_string(), message, signature)
514 }
515
516 pub fn encrypt(&self, plaintext: &[u8], nonce: &[u8; 12]) -> Result<Vec<u8>> {
518 let key = self
519 .encryption_key
520 .as_ref()
521 .ok_or_else(|| BypassError::MissingCredential("encryption key".into()))?;
522
523 let cipher = ChaCha20Poly1305::new_from_slice(key)
524 .map_err(|_| BypassError::Config("Invalid encryption key".into()))?;
525
526 let nonce = Nonce::from_slice(nonce);
527 cipher
528 .encrypt(nonce, plaintext)
529 .map_err(|_| BypassError::Encode("Encryption failed".into()))
530 }
531
532 pub fn decrypt(&self, ciphertext: &[u8], nonce: &[u8; 12]) -> Result<Vec<u8>> {
534 let key = self
535 .encryption_key
536 .as_ref()
537 .ok_or_else(|| BypassError::MissingCredential("encryption key".into()))?;
538
539 let cipher = ChaCha20Poly1305::new_from_slice(key)
540 .map_err(|_| BypassError::Config("Invalid encryption key".into()))?;
541
542 let nonce = Nonce::from_slice(nonce);
543 cipher
544 .decrypt(nonce, ciphertext)
545 .map_err(|_| BypassError::DecryptionFailed)
546 }
547}
548
549#[derive(Debug, Default)]
554pub struct ReplayTracker {
555 windows: RwLock<HashMap<IpAddr, ReplayWindow>>,
558
559 window_size: usize,
561}
562
563#[derive(Debug)]
565struct ReplayWindow {
566 highest_seq: u8,
568 seen: u64,
570}
571
572impl ReplayWindow {
573 fn new() -> Self {
574 Self {
575 highest_seq: 0,
576 seen: 0,
577 }
578 }
579
580 fn check_and_update(&mut self, seq: u8, window_size: usize) -> bool {
583 let window_size = window_size.min(64) as u8;
584
585 let diff = self.highest_seq.wrapping_sub(seq);
587
588 if diff == 0 && self.seen == 0 {
589 self.highest_seq = seq;
591 self.seen = 1;
592 return true;
593 }
594
595 if seq == self.highest_seq {
596 return false;
598 }
599
600 let ahead = seq.wrapping_sub(self.highest_seq);
602 if ahead > 0 && ahead < 128 {
603 let shift = ahead as u32;
605 if shift < 64 {
606 self.seen = (self.seen << shift) | 1;
607 } else {
608 self.seen = 1;
609 }
610 self.highest_seq = seq;
611 return true;
612 }
613
614 if diff < window_size && diff < 64 {
616 let bit = 1u64 << diff;
617 if self.seen & bit != 0 {
618 return false;
620 }
621 self.seen |= bit;
622 return true;
623 }
624
625 false
627 }
628}
629
630impl ReplayTracker {
631 pub fn new(window_size: usize) -> Self {
633 Self {
634 windows: RwLock::new(HashMap::new()),
635 window_size: window_size.min(64),
636 }
637 }
638
639 pub fn check(&self, source: &IpAddr, sequence: u8) -> Result<()> {
642 let mut windows = self.windows.write().unwrap();
643 let window = windows.entry(*source).or_insert_with(ReplayWindow::new);
644
645 if window.check_and_update(sequence, self.window_size) {
646 Ok(())
647 } else {
648 Err(BypassError::ReplayDetected { sequence })
649 }
650 }
651
652 pub fn clear_source(&self, source: &IpAddr) {
654 self.windows.write().unwrap().remove(source);
655 }
656
657 pub fn clear_all(&self) {
659 self.windows.write().unwrap().clear();
660 }
661}
662
663#[derive(Debug, Clone, Copy)]
676pub struct BypassHeader {
677 pub magic: [u8; 4],
679 pub collection_hash: u32,
681 pub ttl_ms: u16,
683 pub flags: u8,
685 pub sequence: u8,
687}
688
689impl BypassHeader {
690 pub const MAGIC: [u8; 4] = [0x45, 0x43, 0x48, 0x45];
692
693 pub const SIZE: usize = 12;
695
696 pub const FLAG_COMPRESSED: u8 = 0x01;
698 pub const FLAG_ENCRYPTED: u8 = 0x02;
700 pub const FLAG_SIGNED: u8 = 0x04;
702
703 pub fn new(collection: &str, ttl: Duration, sequence: u8) -> Self {
705 Self {
706 magic: Self::MAGIC,
707 collection_hash: Self::hash_collection(collection),
708 ttl_ms: ttl.as_millis().min(u16::MAX as u128) as u16,
709 flags: 0,
710 sequence,
711 }
712 }
713
714 pub fn hash_collection(name: &str) -> u32 {
716 let mut hasher = fnv::FnvHasher::default();
717 name.hash(&mut hasher);
718 hasher.finish() as u32
719 }
720
721 pub fn is_valid(&self) -> bool {
723 self.magic == Self::MAGIC
724 }
725
726 pub fn encode(&self) -> [u8; Self::SIZE] {
728 let mut buf = [0u8; Self::SIZE];
729 buf[0..4].copy_from_slice(&self.magic);
730 buf[4..8].copy_from_slice(&self.collection_hash.to_be_bytes());
731 buf[8..10].copy_from_slice(&self.ttl_ms.to_be_bytes());
732 buf[10] = self.flags;
733 buf[11] = self.sequence;
734 buf
735 }
736
737 pub fn decode(buf: &[u8]) -> Result<Self> {
739 if buf.len() < Self::SIZE {
740 return Err(BypassError::InvalidHeader);
741 }
742
743 let mut magic = [0u8; 4];
744 magic.copy_from_slice(&buf[0..4]);
745
746 if magic != Self::MAGIC {
747 return Err(BypassError::InvalidHeader);
748 }
749
750 let collection_hash = u32::from_be_bytes([buf[4], buf[5], buf[6], buf[7]]);
751 let ttl_ms = u16::from_be_bytes([buf[8], buf[9]]);
752 let flags = buf[10];
753 let sequence = buf[11];
754
755 Ok(Self {
756 magic,
757 collection_hash,
758 ttl_ms,
759 flags,
760 sequence,
761 })
762 }
763
764 pub fn is_stale(&self, received_at: Instant, sent_at: Instant) -> bool {
766 let elapsed = received_at.duration_since(sent_at);
767 elapsed > Duration::from_millis(self.ttl_ms as u64)
768 }
769}
770
771mod fnv {
776 use std::hash::Hasher;
777
778 const FNV_OFFSET_BASIS: u64 = 14695981039346656037;
779 const FNV_PRIME: u64 = 1099511628211;
780
781 #[derive(Default)]
782 pub struct FnvHasher(u64);
783
784 impl Hasher for FnvHasher {
785 fn write(&mut self, bytes: &[u8]) {
786 for byte in bytes {
787 self.0 ^= *byte as u64;
788 self.0 = self.0.wrapping_mul(FNV_PRIME);
789 }
790 }
791
792 fn finish(&self) -> u64 {
793 self.0
794 }
795 }
796
797 impl FnvHasher {
798 pub fn default() -> Self {
799 Self(FNV_OFFSET_BASIS)
800 }
801 }
802}
803
804#[derive(Debug, Clone)]
810pub struct BypassMessage {
811 pub source: SocketAddr,
813 pub collection_hash: u32,
815 pub data: Vec<u8>,
817 pub received_at: Instant,
819 pub sequence: u8,
821 pub priority: MessagePriority,
823}
824
825#[derive(Debug, Clone)]
827pub enum BypassTarget {
828 Unicast(SocketAddr),
830 Multicast { group: IpAddr, port: u16 },
832 Broadcast { port: u16 },
834}
835
836#[derive(Debug, Default)]
842pub struct BypassMetrics {
843 pub messages_sent: AtomicU64,
845 pub messages_received: AtomicU64,
847 pub bytes_sent: AtomicU64,
849 pub bytes_received: AtomicU64,
851 pub stale_dropped: AtomicU64,
853 pub invalid_dropped: AtomicU64,
855 pub send_errors: AtomicU64,
857 pub receive_errors: AtomicU64,
859
860 pub signature_rejected: AtomicU64,
863 pub decryption_failed: AtomicU64,
865 pub unauthorized_source: AtomicU64,
867 pub replay_rejected: AtomicU64,
869}
870
871impl BypassMetrics {
872 pub fn snapshot(&self) -> BypassMetricsSnapshot {
874 BypassMetricsSnapshot {
875 messages_sent: self.messages_sent.load(Ordering::Relaxed),
876 messages_received: self.messages_received.load(Ordering::Relaxed),
877 bytes_sent: self.bytes_sent.load(Ordering::Relaxed),
878 bytes_received: self.bytes_received.load(Ordering::Relaxed),
879 stale_dropped: self.stale_dropped.load(Ordering::Relaxed),
880 invalid_dropped: self.invalid_dropped.load(Ordering::Relaxed),
881 send_errors: self.send_errors.load(Ordering::Relaxed),
882 receive_errors: self.receive_errors.load(Ordering::Relaxed),
883 signature_rejected: self.signature_rejected.load(Ordering::Relaxed),
884 decryption_failed: self.decryption_failed.load(Ordering::Relaxed),
885 unauthorized_source: self.unauthorized_source.load(Ordering::Relaxed),
886 replay_rejected: self.replay_rejected.load(Ordering::Relaxed),
887 }
888 }
889}
890
891#[derive(Debug, Clone, Default)]
893pub struct BypassMetricsSnapshot {
894 pub messages_sent: u64,
895 pub messages_received: u64,
896 pub bytes_sent: u64,
897 pub bytes_received: u64,
898 pub stale_dropped: u64,
899 pub invalid_dropped: u64,
900 pub send_errors: u64,
901 pub receive_errors: u64,
902
903 pub signature_rejected: u64,
906 pub decryption_failed: u64,
908 pub unauthorized_source: u64,
910 pub replay_rejected: u64,
912}
913
914pub struct UdpBypassChannel {
923 config: BypassChannelConfig,
925
926 socket: Option<Arc<UdpSocket>>,
928
929 multicast_sockets: RwLock<HashMap<IpAddr, Arc<UdpSocket>>>,
931
932 collection_map: HashMap<u32, BypassCollectionConfig>,
934
935 sequence: AtomicU8,
937
938 metrics: Arc<BypassMetrics>,
940
941 incoming_tx: broadcast::Sender<BypassMessage>,
943
944 running: Arc<AtomicBool>,
946}
947
948impl UdpBypassChannel {
949 pub async fn new(config: BypassChannelConfig) -> Result<Self> {
951 let collection_map: HashMap<u32, BypassCollectionConfig> = config
953 .collections
954 .iter()
955 .map(|c| (BypassHeader::hash_collection(&c.collection), c.clone()))
956 .collect();
957
958 let (incoming_tx, _) = broadcast::channel(1024);
959
960 Ok(Self {
961 config,
962 socket: None,
963 multicast_sockets: RwLock::new(HashMap::new()),
964 collection_map,
965 sequence: AtomicU8::new(0),
966 metrics: Arc::new(BypassMetrics::default()),
967 incoming_tx,
968 running: Arc::new(AtomicBool::new(false)),
969 })
970 }
971
972 pub async fn start(&mut self) -> Result<()> {
974 if self.running.load(Ordering::SeqCst) {
975 return Ok(());
976 }
977
978 let bind_addr = format!("0.0.0.0:{}", self.config.udp.bind_port);
980 let socket = UdpSocket::bind(&bind_addr).await?;
981 socket.set_broadcast(true)?;
982
983 let socket = Arc::new(socket);
984 self.socket = Some(socket.clone());
985
986 let incoming_tx = self.incoming_tx.clone();
988 let metrics = self.metrics.clone();
989 let collection_map = self.collection_map.clone();
990 let buffer_size = self.config.udp.buffer_size;
991 let running = self.running.clone();
992
993 running.store(true, Ordering::SeqCst);
994
995 tokio::spawn(async move {
996 let mut buf = vec![0u8; buffer_size];
997
998 while running.load(Ordering::SeqCst) {
999 match tokio::time::timeout(Duration::from_millis(100), socket.recv_from(&mut buf))
1000 .await
1001 {
1002 Ok(Ok((len, src))) => {
1003 let received_at = Instant::now();
1004
1005 if len < BypassHeader::SIZE {
1007 metrics.invalid_dropped.fetch_add(1, Ordering::Relaxed);
1008 continue;
1009 }
1010
1011 let header = match BypassHeader::decode(&buf[..BypassHeader::SIZE]) {
1012 Ok(h) => h,
1013 Err(_) => {
1014 metrics.invalid_dropped.fetch_add(1, Ordering::Relaxed);
1015 continue;
1016 }
1017 };
1018
1019 let payload = buf[BypassHeader::SIZE..len].to_vec();
1021
1022 let priority = collection_map
1024 .get(&header.collection_hash)
1025 .map(|c| c.priority)
1026 .unwrap_or(MessagePriority::Normal);
1027
1028 let message = BypassMessage {
1029 source: src,
1030 collection_hash: header.collection_hash,
1031 data: payload,
1032 received_at,
1033 sequence: header.sequence,
1034 priority,
1035 };
1036
1037 metrics.messages_received.fetch_add(1, Ordering::Relaxed);
1038 metrics
1039 .bytes_received
1040 .fetch_add(len as u64, Ordering::Relaxed);
1041
1042 let _ = incoming_tx.send(message);
1044 }
1045 Ok(Err(_e)) => {
1046 metrics.receive_errors.fetch_add(1, Ordering::Relaxed);
1047 }
1048 Err(_) => {
1049 }
1051 }
1052 }
1053 });
1054
1055 info!(
1056 "Bypass channel started on port {}",
1057 self.config.udp.bind_port
1058 );
1059 Ok(())
1060 }
1061
1062 pub fn stop(&mut self) {
1064 self.running.store(false, Ordering::SeqCst);
1065 self.socket = None;
1066 self.multicast_sockets.write().unwrap().clear();
1067 info!("Bypass channel stopped");
1068 }
1069
1070 pub fn is_running(&self) -> bool {
1072 self.running.load(Ordering::SeqCst)
1073 }
1074
1075 fn next_sequence(&self) -> u8 {
1077 self.sequence.fetch_add(1, Ordering::Relaxed)
1078 }
1079
1080 pub async fn send(&self, target: BypassTarget, collection: &str, data: &[u8]) -> Result<()> {
1082 let socket = self.socket.as_ref().ok_or(BypassError::NotStarted)?;
1083
1084 if self.config.max_message_size > 0 && data.len() > self.config.max_message_size {
1086 return Err(BypassError::MessageTooLarge {
1087 size: data.len(),
1088 max: self.config.max_message_size,
1089 });
1090 }
1091
1092 let ttl = self
1094 .config
1095 .get_collection(collection)
1096 .map(|c| c.ttl())
1097 .unwrap_or(Duration::from_secs(5));
1098
1099 let header = BypassHeader::new(collection, ttl, self.next_sequence());
1101 let header_bytes = header.encode();
1102
1103 let mut frame = Vec::with_capacity(BypassHeader::SIZE + data.len());
1105 frame.extend_from_slice(&header_bytes);
1106 frame.extend_from_slice(data);
1107
1108 let bytes_sent = match target {
1110 BypassTarget::Unicast(addr) => socket.send_to(&frame, addr).await?,
1111 BypassTarget::Multicast { group, port } => {
1112 let mcast_socket = self.get_or_create_multicast(group).await?;
1113 mcast_socket.send_to(&frame, (group, port)).await?
1114 }
1115 BypassTarget::Broadcast { port } => {
1116 let broadcast_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::BROADCAST), port);
1117 socket.send_to(&frame, broadcast_addr).await?
1118 }
1119 };
1120
1121 self.metrics.messages_sent.fetch_add(1, Ordering::Relaxed);
1122 self.metrics
1123 .bytes_sent
1124 .fetch_add(bytes_sent as u64, Ordering::Relaxed);
1125
1126 Ok(())
1127 }
1128
1129 pub async fn send_to_collection(
1131 &self,
1132 collection: &str,
1133 target_addr: Option<SocketAddr>,
1134 data: &[u8],
1135 ) -> Result<()> {
1136 let config = self
1137 .config
1138 .get_collection(collection)
1139 .ok_or_else(|| BypassError::Config(format!("Unknown collection: {}", collection)))?;
1140
1141 let target = match &config.transport {
1142 BypassTransport::Unicast => {
1143 let addr = target_addr.ok_or_else(|| {
1144 BypassError::Config("Unicast requires target address".to_string())
1145 })?;
1146 BypassTarget::Unicast(addr)
1147 }
1148 BypassTransport::Multicast { group, port } => BypassTarget::Multicast {
1149 group: *group,
1150 port: *port,
1151 },
1152 BypassTransport::Broadcast => BypassTarget::Broadcast {
1153 port: self.config.udp.bind_port,
1154 },
1155 };
1156
1157 self.send(target, collection, data).await
1158 }
1159
1160 pub fn subscribe(&self) -> broadcast::Receiver<BypassMessage> {
1162 self.incoming_tx.subscribe()
1163 }
1164
1165 pub fn subscribe_collection(
1167 &self,
1168 collection: &str,
1169 ) -> (u32, broadcast::Receiver<BypassMessage>) {
1170 let hash = BypassHeader::hash_collection(collection);
1171 (hash, self.incoming_tx.subscribe())
1172 }
1173
1174 async fn get_or_create_multicast(&self, group: IpAddr) -> Result<Arc<UdpSocket>> {
1176 {
1178 let sockets = self.multicast_sockets.read().unwrap();
1179 if let Some(socket) = sockets.get(&group) {
1180 return Ok(socket.clone());
1181 }
1182 }
1183
1184 let socket = UdpSocket::bind("0.0.0.0:0").await?;
1186
1187 match group {
1188 IpAddr::V4(addr) => {
1189 socket.join_multicast_v4(addr, Ipv4Addr::UNSPECIFIED)?;
1190 socket.set_multicast_ttl_v4(self.config.udp.multicast_ttl)?;
1191 }
1192 IpAddr::V6(addr) => {
1193 socket.join_multicast_v6(&addr, 0)?;
1194 }
1195 }
1196
1197 let socket = Arc::new(socket);
1198 self.multicast_sockets
1199 .write()
1200 .unwrap()
1201 .insert(group, socket.clone());
1202
1203 debug!("Joined multicast group: {}", group);
1204 Ok(socket)
1205 }
1206
1207 pub fn leave_multicast(&self, group: IpAddr) -> Result<()> {
1209 if let Some(socket) = self.multicast_sockets.write().unwrap().remove(&group) {
1210 match group {
1211 IpAddr::V4(addr) => {
1212 if let Ok(socket) = Arc::try_unwrap(socket) {
1214 let _ = socket.leave_multicast_v4(addr, Ipv4Addr::UNSPECIFIED);
1215 }
1216 }
1217 IpAddr::V6(addr) => {
1218 if let Ok(socket) = Arc::try_unwrap(socket) {
1219 let _ = socket.leave_multicast_v6(&addr, 0);
1220 }
1221 }
1222 }
1223 debug!("Left multicast group: {}", group);
1224 }
1225 Ok(())
1226 }
1227
1228 pub fn metrics(&self) -> BypassMetricsSnapshot {
1230 self.metrics.snapshot()
1231 }
1232
1233 pub fn config(&self) -> &BypassChannelConfig {
1235 &self.config
1236 }
1237
1238 pub fn is_bypass_collection(&self, name: &str) -> bool {
1240 self.config.is_bypass_collection(name)
1241 }
1242
1243 pub fn get_collection_by_hash(&self, hash: u32) -> Option<&BypassCollectionConfig> {
1245 self.collection_map.get(&hash)
1246 }
1247}
1248
1249#[cfg(test)]
1254mod tests {
1255 use super::*;
1256
1257 #[test]
1258 fn test_bypass_header_encode_decode() {
1259 let header = BypassHeader::new("test_collection", Duration::from_millis(1000), 42);
1260 let encoded = header.encode();
1261 let decoded = BypassHeader::decode(&encoded).unwrap();
1262
1263 assert_eq!(decoded.magic, BypassHeader::MAGIC);
1264 assert_eq!(decoded.collection_hash, header.collection_hash);
1265 assert_eq!(decoded.ttl_ms, 1000);
1266 assert_eq!(decoded.sequence, 42);
1267 assert!(decoded.is_valid());
1268 }
1269
1270 #[test]
1271 fn test_bypass_header_invalid_magic() {
1272 let mut data = [0u8; 12];
1273 data[0..4].copy_from_slice(&[0, 0, 0, 0]);
1274 let result = BypassHeader::decode(&data);
1275 assert!(result.is_err());
1276 }
1277
1278 #[test]
1279 fn test_bypass_header_too_short() {
1280 let data = [0u8; 8];
1281 let result = BypassHeader::decode(&data);
1282 assert!(result.is_err());
1283 }
1284
1285 #[test]
1286 fn test_collection_hash_consistency() {
1287 let hash1 = BypassHeader::hash_collection("position_updates");
1288 let hash2 = BypassHeader::hash_collection("position_updates");
1289 let hash3 = BypassHeader::hash_collection("sensor_data");
1290
1291 assert_eq!(hash1, hash2);
1292 assert_ne!(hash1, hash3);
1293 }
1294
1295 #[test]
1296 fn test_bypass_config() {
1297 let config = BypassChannelConfig::new()
1298 .with_collection(BypassCollectionConfig {
1299 collection: "positions".into(),
1300 transport: BypassTransport::Multicast {
1301 group: "239.1.1.100".parse().unwrap(),
1302 port: 5150,
1303 },
1304 encoding: MessageEncoding::Protobuf,
1305 ttl_ms: 200,
1306 priority: MessagePriority::High,
1307 })
1308 .with_collection(BypassCollectionConfig {
1309 collection: "telemetry".into(),
1310 transport: BypassTransport::Unicast,
1311 encoding: MessageEncoding::Cbor,
1312 ttl_ms: 5000,
1313 priority: MessagePriority::Normal,
1314 });
1315
1316 assert!(config.is_bypass_collection("positions"));
1317 assert!(config.is_bypass_collection("telemetry"));
1318 assert!(!config.is_bypass_collection("unknown"));
1319
1320 let pos_config = config.get_collection("positions").unwrap();
1321 assert_eq!(pos_config.priority, MessagePriority::High);
1322 }
1323
1324 #[test]
1325 fn test_ttl_clamping() {
1326 let header = BypassHeader::new("test", Duration::from_secs(1000), 0);
1328 assert_eq!(header.ttl_ms, u16::MAX);
1329 }
1330
1331 #[tokio::test]
1332 async fn test_bypass_channel_creation() {
1333 let config = BypassChannelConfig::new().with_collection(BypassCollectionConfig {
1334 collection: "test".into(),
1335 ..Default::default()
1336 });
1337
1338 let channel = UdpBypassChannel::new(config).await.unwrap();
1339 assert!(!channel.is_running());
1340 assert!(channel.is_bypass_collection("test"));
1341 }
1342
1343 #[tokio::test]
1344 async fn test_bypass_channel_start_stop() {
1345 let config = BypassChannelConfig {
1346 udp: UdpConfig {
1347 bind_port: 0, ..Default::default()
1349 },
1350 ..Default::default()
1351 };
1352
1353 let mut channel = UdpBypassChannel::new(config).await.unwrap();
1354
1355 channel.start().await.unwrap();
1356 assert!(channel.is_running());
1357
1358 channel.stop();
1359 assert!(!channel.is_running());
1360 }
1361
1362 #[tokio::test]
1363 async fn test_bypass_send_receive() {
1364 let config1 = BypassChannelConfig {
1366 udp: UdpConfig {
1367 bind_port: 0,
1368 ..Default::default()
1369 },
1370 collections: vec![BypassCollectionConfig {
1371 collection: "test".into(),
1372 ttl_ms: 5000,
1373 ..Default::default()
1374 }],
1375 ..Default::default()
1376 };
1377
1378 let config2 = BypassChannelConfig {
1379 udp: UdpConfig {
1380 bind_port: 0,
1381 ..Default::default()
1382 },
1383 collections: vec![BypassCollectionConfig {
1384 collection: "test".into(),
1385 ttl_ms: 5000,
1386 ..Default::default()
1387 }],
1388 ..Default::default()
1389 };
1390
1391 let mut channel1 = UdpBypassChannel::new(config1).await.unwrap();
1392 let mut channel2 = UdpBypassChannel::new(config2).await.unwrap();
1393
1394 channel1.start().await.unwrap();
1395 channel2.start().await.unwrap();
1396
1397 let socket2_port = channel2
1399 .socket
1400 .as_ref()
1401 .unwrap()
1402 .local_addr()
1403 .unwrap()
1404 .port();
1405 let socket2_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), socket2_port);
1406
1407 let mut rx = channel2.subscribe();
1409
1410 let test_data = b"Hello, bypass!";
1412 channel1
1413 .send(BypassTarget::Unicast(socket2_addr), "test", test_data)
1414 .await
1415 .unwrap();
1416
1417 let msg = tokio::time::timeout(Duration::from_millis(500), rx.recv())
1419 .await
1420 .expect("timeout")
1421 .expect("receive error");
1422
1423 assert_eq!(msg.data, test_data);
1424 assert_eq!(msg.collection_hash, BypassHeader::hash_collection("test"));
1425
1426 let metrics1 = channel1.metrics();
1428 assert_eq!(metrics1.messages_sent, 1);
1429 assert!(metrics1.bytes_sent > 0);
1430
1431 let metrics2 = channel2.metrics();
1432 assert_eq!(metrics2.messages_received, 1);
1433 assert!(metrics2.bytes_received > 0);
1434
1435 channel1.stop();
1436 channel2.stop();
1437 }
1438
1439 #[test]
1440 fn test_message_too_large() {
1441 let _config = BypassChannelConfig {
1443 max_message_size: 100,
1444 ..Default::default()
1445 };
1446
1447 let err = BypassError::MessageTooLarge {
1449 size: 200,
1450 max: 100,
1451 };
1452 assert!(err.to_string().contains("200"));
1453 assert!(err.to_string().contains("100"));
1454 }
1455
1456 #[test]
1461 fn test_security_config_none() {
1462 let config = BypassSecurityConfig::none();
1463 assert!(!config.require_signature);
1464 assert!(!config.encrypt_payload);
1465 assert!(config.source_allowlist.is_none());
1466 assert!(!config.replay_protection);
1467 assert!(!config.is_enabled());
1468 }
1469
1470 #[test]
1471 fn test_security_config_signed() {
1472 let config = BypassSecurityConfig::signed();
1473 assert!(config.require_signature);
1474 assert!(!config.encrypt_payload);
1475 assert!(config.is_enabled());
1476 }
1477
1478 #[test]
1479 fn test_security_config_full() {
1480 let config = BypassSecurityConfig::full_security();
1481 assert!(config.require_signature);
1482 assert!(config.encrypt_payload);
1483 assert!(config.replay_protection);
1484 assert_eq!(config.replay_window_size, 64);
1485 assert!(config.is_enabled());
1486 }
1487
1488 #[test]
1489 fn test_security_config_allowlist() {
1490 let ip1: IpAddr = "192.168.1.1".parse().unwrap();
1491 let ip2: IpAddr = "192.168.1.2".parse().unwrap();
1492 let ip3: IpAddr = "10.0.0.1".parse().unwrap();
1493
1494 let config = BypassSecurityConfig::none();
1496 assert!(config.is_source_allowed(&ip1));
1497 assert!(config.is_source_allowed(&ip2));
1498 assert!(config.is_source_allowed(&ip3));
1499
1500 let config = BypassSecurityConfig {
1502 source_allowlist: Some(vec![ip1, ip2]),
1503 ..Default::default()
1504 };
1505 assert!(config.is_source_allowed(&ip1));
1506 assert!(config.is_source_allowed(&ip2));
1507 assert!(!config.is_source_allowed(&ip3));
1508 }
1509
1510 #[test]
1511 fn test_credentials_signing() {
1512 use ed25519_dalek::SigningKey;
1513
1514 let seed: [u8; 32] = [1u8; 32];
1516 let signing_key = SigningKey::from_bytes(&seed);
1517 let verifying_key = signing_key.verifying_key();
1518
1519 let peer_ip: IpAddr = "192.168.1.1".parse().unwrap();
1521 let creds = BypassSecurityCredentials::new()
1522 .with_signing_key(signing_key)
1523 .with_peer_key(peer_ip.to_string(), verifying_key);
1524
1525 let message = b"test message for signing";
1527 let signature = creds.sign(message).expect("signing should succeed");
1528
1529 creds
1531 .verify_by_ip(&peer_ip, message, &signature)
1532 .expect("verification should succeed");
1533 }
1534
1535 #[test]
1536 fn test_credentials_invalid_signature() {
1537 use ed25519_dalek::SigningKey;
1538
1539 let seed1: [u8; 32] = [1u8; 32];
1541 let seed2: [u8; 32] = [2u8; 32];
1542 let signing_key1 = SigningKey::from_bytes(&seed1);
1543 let signing_key2 = SigningKey::from_bytes(&seed2);
1544 let verifying_key2 = signing_key2.verifying_key();
1545
1546 let peer_ip: IpAddr = "192.168.1.1".parse().unwrap();
1548 let creds = BypassSecurityCredentials::new()
1549 .with_signing_key(signing_key1)
1550 .with_peer_key(peer_ip.to_string(), verifying_key2);
1551
1552 let message = b"test message";
1554 let signature = creds.sign(message).expect("signing should succeed");
1555
1556 let result = creds.verify_by_ip(&peer_ip, message, &signature);
1558 assert!(matches!(result, Err(BypassError::InvalidSignature)));
1559 }
1560
1561 #[test]
1562 fn test_credentials_encryption() {
1563 let encryption_key = [42u8; 32];
1564 let creds = BypassSecurityCredentials::new().with_encryption_key(encryption_key);
1565
1566 let plaintext = b"secret message for encryption";
1567 let nonce = [0u8; 12];
1568
1569 let ciphertext = creds
1571 .encrypt(plaintext, &nonce)
1572 .expect("encryption should succeed");
1573 assert_ne!(ciphertext.as_slice(), plaintext);
1574 assert_eq!(ciphertext.len(), plaintext.len() + 16);
1576
1577 let decrypted = creds
1579 .decrypt(&ciphertext, &nonce)
1580 .expect("decryption should succeed");
1581 assert_eq!(decrypted, plaintext);
1582 }
1583
1584 #[test]
1585 fn test_credentials_decryption_wrong_key() {
1586 let key1 = [1u8; 32];
1587 let key2 = [2u8; 32];
1588
1589 let creds1 = BypassSecurityCredentials::new().with_encryption_key(key1);
1590 let creds2 = BypassSecurityCredentials::new().with_encryption_key(key2);
1591
1592 let plaintext = b"secret message";
1593 let nonce = [0u8; 12];
1594
1595 let ciphertext = creds1
1597 .encrypt(plaintext, &nonce)
1598 .expect("encryption should succeed");
1599
1600 let result = creds2.decrypt(&ciphertext, &nonce);
1602 assert!(matches!(result, Err(BypassError::DecryptionFailed)));
1603 }
1604
1605 #[test]
1606 fn test_credentials_missing_signing_key() {
1607 let creds = BypassSecurityCredentials::new();
1608 let result = creds.sign(b"message");
1609 assert!(matches!(result, Err(BypassError::MissingCredential(_))));
1610 }
1611
1612 #[test]
1613 fn test_credentials_missing_encryption_key() {
1614 let creds = BypassSecurityCredentials::new();
1615 let result = creds.encrypt(b"message", &[0u8; 12]);
1616 assert!(matches!(result, Err(BypassError::MissingCredential(_))));
1617 }
1618
1619 #[test]
1620 fn test_replay_tracker_first_message() {
1621 let tracker = ReplayTracker::new(64);
1622 let source: IpAddr = "192.168.1.1".parse().unwrap();
1623
1624 tracker
1626 .check(&source, 0)
1627 .expect("first message should succeed");
1628 }
1629
1630 #[test]
1631 fn test_replay_tracker_sequential() {
1632 let tracker = ReplayTracker::new(64);
1633 let source: IpAddr = "192.168.1.1".parse().unwrap();
1634
1635 for seq in 0..10u8 {
1637 tracker
1638 .check(&source, seq)
1639 .unwrap_or_else(|_| panic!("sequence {} should succeed", seq));
1640 }
1641 }
1642
1643 #[test]
1644 fn test_replay_tracker_replay_detected() {
1645 let tracker = ReplayTracker::new(64);
1646 let source: IpAddr = "192.168.1.1".parse().unwrap();
1647
1648 tracker.check(&source, 5).expect("first should succeed");
1650
1651 let result = tracker.check(&source, 5);
1653 assert!(matches!(
1654 result,
1655 Err(BypassError::ReplayDetected { sequence: 5 })
1656 ));
1657 }
1658
1659 #[test]
1660 fn test_replay_tracker_out_of_order() {
1661 let tracker = ReplayTracker::new(64);
1662 let source: IpAddr = "192.168.1.1".parse().unwrap();
1663
1664 tracker.check(&source, 10).expect("10 should succeed");
1666 tracker
1667 .check(&source, 8)
1668 .expect("8 should succeed (within window)");
1669 tracker.check(&source, 12).expect("12 should succeed");
1670 tracker
1671 .check(&source, 9)
1672 .expect("9 should succeed (within window)");
1673
1674 let result = tracker.check(&source, 8);
1676 assert!(matches!(
1677 result,
1678 Err(BypassError::ReplayDetected { sequence: 8 })
1679 ));
1680 }
1681
1682 #[test]
1683 fn test_replay_tracker_multiple_sources() {
1684 let tracker = ReplayTracker::new(64);
1685 let source1: IpAddr = "192.168.1.1".parse().unwrap();
1686 let source2: IpAddr = "192.168.1.2".parse().unwrap();
1687
1688 tracker
1690 .check(&source1, 5)
1691 .expect("source1 seq 5 should succeed");
1692 tracker
1693 .check(&source2, 5)
1694 .expect("source2 seq 5 should succeed");
1695
1696 let result = tracker.check(&source1, 5);
1698 assert!(matches!(result, Err(BypassError::ReplayDetected { .. })));
1699
1700 tracker
1702 .check(&source1, 6)
1703 .expect("source1 seq 6 should succeed");
1704 }
1705
1706 #[test]
1707 fn test_replay_tracker_window_advance() {
1708 let tracker = ReplayTracker::new(64);
1709 let source: IpAddr = "192.168.1.1".parse().unwrap();
1710
1711 tracker.check(&source, 100).expect("100 should succeed");
1713 tracker.check(&source, 110).expect("110 should succeed");
1714 tracker.check(&source, 120).expect("120 should succeed");
1715
1716 tracker.check(&source, 200).expect("200 should succeed");
1718
1719 tracker.check(&source, 201).expect("201 should succeed");
1722 }
1723
1724 #[test]
1725 fn test_replay_tracker_clear() {
1726 let tracker = ReplayTracker::new(64);
1727 let source: IpAddr = "192.168.1.1".parse().unwrap();
1728
1729 tracker.check(&source, 5).expect("initial should succeed");
1730
1731 tracker.clear_source(&source);
1733 tracker
1734 .check(&source, 5)
1735 .expect("after clear should succeed");
1736 }
1737
1738 #[test]
1739 fn test_metrics_snapshot_includes_security() {
1740 let metrics = BypassMetrics::default();
1741
1742 metrics.signature_rejected.fetch_add(1, Ordering::Relaxed);
1744 metrics.decryption_failed.fetch_add(2, Ordering::Relaxed);
1745 metrics.unauthorized_source.fetch_add(3, Ordering::Relaxed);
1746 metrics.replay_rejected.fetch_add(4, Ordering::Relaxed);
1747
1748 let snapshot = metrics.snapshot();
1750 assert_eq!(snapshot.signature_rejected, 1);
1751 assert_eq!(snapshot.decryption_failed, 2);
1752 assert_eq!(snapshot.unauthorized_source, 3);
1753 assert_eq!(snapshot.replay_rejected, 4);
1754 }
1755
1756 #[test]
1757 fn test_bypass_error_display_all_variants() {
1758 let io_err = BypassError::Io(std::io::Error::new(std::io::ErrorKind::Other, "test io"));
1759 assert!(io_err.to_string().contains("IO error"));
1760 assert!(io_err.to_string().contains("test io"));
1761
1762 let encode_err = BypassError::Encode("bad data".to_string());
1763 assert!(encode_err.to_string().contains("Encode error"));
1764
1765 let decode_err = BypassError::Decode("corrupt".to_string());
1766 assert!(decode_err.to_string().contains("Decode error"));
1767
1768 let config_err = BypassError::Config("bad config".to_string());
1769 assert!(config_err.to_string().contains("Config error"));
1770
1771 let not_started = BypassError::NotStarted;
1772 assert!(not_started.to_string().contains("not started"));
1773
1774 let invalid_header = BypassError::InvalidHeader;
1775 assert!(invalid_header.to_string().contains("Invalid bypass header"));
1776
1777 let stale = BypassError::StaleMessage;
1778 assert!(stale.to_string().contains("stale"));
1779
1780 let sig_err = BypassError::InvalidSignature;
1781 assert!(sig_err.to_string().contains("Invalid message signature"));
1782
1783 let decrypt_err = BypassError::DecryptionFailed;
1784 assert!(decrypt_err.to_string().contains("decryption failed"));
1785
1786 let unauth = BypassError::UnauthorizedSource("10.0.0.1".parse().unwrap());
1787 assert!(unauth.to_string().contains("10.0.0.1"));
1788
1789 let replay = BypassError::ReplayDetected { sequence: 42 };
1790 assert!(replay.to_string().contains("42"));
1791
1792 let missing = BypassError::MissingCredential("encryption key".to_string());
1793 assert!(missing.to_string().contains("encryption key"));
1794 }
1795
1796 #[test]
1797 fn test_bypass_error_source() {
1798 let io_err = BypassError::Io(std::io::Error::new(std::io::ErrorKind::Other, "test"));
1799 assert!(std::error::Error::source(&io_err).is_some());
1800
1801 let encode_err = BypassError::Encode("test".into());
1802 assert!(std::error::Error::source(&encode_err).is_none());
1803
1804 let not_started = BypassError::NotStarted;
1805 assert!(std::error::Error::source(¬_started).is_none());
1806 }
1807
1808 #[test]
1809 fn test_bypass_error_from_io() {
1810 let io_err = std::io::Error::new(std::io::ErrorKind::NotFound, "gone");
1811 let bypass_err: BypassError = io_err.into();
1812 assert!(bypass_err.to_string().contains("gone"));
1813 }
1814
1815 #[test]
1816 fn test_message_encoding_display() {
1817 assert_eq!(MessageEncoding::Protobuf.to_string(), "protobuf");
1818 assert_eq!(MessageEncoding::Json.to_string(), "json");
1819 assert_eq!(MessageEncoding::Raw.to_string(), "raw");
1820 assert_eq!(MessageEncoding::Cbor.to_string(), "cbor");
1821 }
1822
1823 #[test]
1824 fn test_bypass_collection_config_ttl() {
1825 let config = BypassCollectionConfig {
1826 ttl_ms: 200,
1827 ..Default::default()
1828 };
1829 assert_eq!(config.ttl(), Duration::from_millis(200));
1830 }
1831
1832 #[test]
1833 fn test_bypass_collection_config_default() {
1834 let config = BypassCollectionConfig::default();
1835 assert!(config.collection.is_empty());
1836 assert_eq!(config.transport, BypassTransport::Unicast);
1837 assert_eq!(config.encoding, MessageEncoding::Protobuf);
1838 assert_eq!(config.ttl_ms, 5000);
1839 assert_eq!(config.priority, MessagePriority::Normal);
1840 }
1841
1842 #[test]
1843 fn test_udp_config_default() {
1844 let config = UdpConfig::default();
1845 assert_eq!(config.bind_port, 5150);
1846 assert_eq!(config.buffer_size, 65536);
1847 assert_eq!(config.multicast_ttl, 32);
1848 }
1849
1850 #[test]
1851 fn test_bypass_channel_config_default() {
1852 let config = BypassChannelConfig::default();
1853 assert!(config.collections.is_empty());
1854 assert!(!config.multicast_enabled);
1855 assert_eq!(config.max_message_size, 0);
1856 }
1857
1858 #[test]
1859 fn test_bypass_channel_config_new() {
1860 let config = BypassChannelConfig::new();
1861 assert!(config.multicast_enabled);
1862 assert_eq!(config.max_message_size, 65000);
1863 }
1864
1865 #[test]
1866 fn test_bypass_security_config_is_enabled_allowlist() {
1867 let config = BypassSecurityConfig {
1868 source_allowlist: Some(vec!["10.0.0.1".parse().unwrap()]),
1869 ..Default::default()
1870 };
1871 assert!(config.is_enabled());
1872 }
1873
1874 #[test]
1875 fn test_bypass_security_config_is_enabled_replay() {
1876 let config = BypassSecurityConfig {
1877 replay_protection: true,
1878 ..Default::default()
1879 };
1880 assert!(config.is_enabled());
1881 }
1882
1883 #[test]
1884 fn test_bypass_security_credentials_debug() {
1885 let creds = BypassSecurityCredentials::new();
1886 let debug = format!("{:?}", creds);
1887 assert!(debug.contains("has_signing_key"));
1888 assert!(debug.contains("false"));
1889 assert!(debug.contains("peer_keys_count"));
1890 }
1891
1892 #[test]
1893 fn test_bypass_security_credentials_verifying_key() {
1894 let creds = BypassSecurityCredentials::new();
1896 assert!(creds.verifying_key().is_none());
1897
1898 let seed: [u8; 32] = [42u8; 32];
1900 let signing_key = ed25519_dalek::SigningKey::from_bytes(&seed);
1901 let creds = BypassSecurityCredentials::new().with_signing_key(signing_key.clone());
1902 let vk = creds.verifying_key();
1903 assert!(vk.is_some());
1904 assert_eq!(vk.unwrap(), signing_key.verifying_key());
1905 }
1906
1907 #[test]
1908 fn test_bypass_security_credentials_verify_unknown_peer() {
1909 let creds = BypassSecurityCredentials::new();
1910 let sig = ed25519_dalek::Signature::from_bytes(&[0u8; 64]);
1911 let result = creds.verify("unknown-peer", b"message", &sig);
1912 assert!(matches!(result, Err(BypassError::MissingCredential(_))));
1913 }
1914
1915 #[test]
1916 fn test_bypass_security_credentials_decrypt_no_key() {
1917 let creds = BypassSecurityCredentials::new();
1918 let result = creds.decrypt(b"ciphertext", &[0u8; 12]);
1919 assert!(matches!(result, Err(BypassError::MissingCredential(_))));
1920 }
1921
1922 #[test]
1923 fn test_replay_tracker_clear_all() {
1924 let tracker = ReplayTracker::new(64);
1925 let source1: IpAddr = "192.168.1.1".parse().unwrap();
1926 let source2: IpAddr = "192.168.1.2".parse().unwrap();
1927
1928 tracker.check(&source1, 1).unwrap();
1929 tracker.check(&source2, 1).unwrap();
1930
1931 tracker.clear_all();
1932
1933 tracker.check(&source1, 1).unwrap();
1935 tracker.check(&source2, 1).unwrap();
1936 }
1937
1938 #[test]
1939 fn test_bypass_header_is_stale() {
1940 let header = BypassHeader::new("test", Duration::from_millis(100), 0);
1941 let now = Instant::now();
1942
1943 assert!(!header.is_stale(now, now));
1945
1946 let sent_at = now - Duration::from_secs(1);
1948 assert!(header.is_stale(now, sent_at));
1949 }
1950
1951 #[test]
1952 fn test_bypass_metrics_snapshot_default() {
1953 let snapshot = BypassMetricsSnapshot::default();
1954 assert_eq!(snapshot.messages_sent, 0);
1955 assert_eq!(snapshot.messages_received, 0);
1956 assert_eq!(snapshot.bytes_sent, 0);
1957 assert_eq!(snapshot.bytes_received, 0);
1958 assert_eq!(snapshot.stale_dropped, 0);
1959 assert_eq!(snapshot.invalid_dropped, 0);
1960 assert_eq!(snapshot.send_errors, 0);
1961 assert_eq!(snapshot.receive_errors, 0);
1962 }
1963
1964 #[tokio::test]
1965 async fn test_bypass_channel_subscribe_collection() {
1966 let config = BypassChannelConfig::new().with_collection(BypassCollectionConfig {
1967 collection: "positions".into(),
1968 ..Default::default()
1969 });
1970
1971 let channel = UdpBypassChannel::new(config).await.unwrap();
1972 let (hash, _rx) = channel.subscribe_collection("positions");
1973 assert_eq!(hash, BypassHeader::hash_collection("positions"));
1974 }
1975
1976 #[tokio::test]
1977 async fn test_bypass_channel_get_collection_by_hash() {
1978 let config = BypassChannelConfig::new().with_collection(BypassCollectionConfig {
1979 collection: "telemetry".into(),
1980 ttl_ms: 200,
1981 ..Default::default()
1982 });
1983
1984 let channel = UdpBypassChannel::new(config).await.unwrap();
1985 let hash = BypassHeader::hash_collection("telemetry");
1986 let col_config = channel.get_collection_by_hash(hash);
1987 assert!(col_config.is_some());
1988 assert_eq!(col_config.unwrap().ttl_ms, 200);
1989
1990 assert!(channel.get_collection_by_hash(12345).is_none());
1992 }
1993
1994 #[tokio::test]
1995 async fn test_bypass_channel_config_accessor() {
1996 let config = BypassChannelConfig {
1997 max_message_size: 1024,
1998 ..BypassChannelConfig::new()
1999 };
2000 let channel = UdpBypassChannel::new(config).await.unwrap();
2001 assert_eq!(channel.config().max_message_size, 1024);
2002 }
2003
2004 #[tokio::test]
2005 async fn test_bypass_send_not_started() {
2006 let config = BypassChannelConfig::new();
2007 let channel = UdpBypassChannel::new(config).await.unwrap();
2008
2009 let result = channel
2010 .send(
2011 BypassTarget::Unicast("127.0.0.1:5000".parse().unwrap()),
2012 "test",
2013 b"data",
2014 )
2015 .await;
2016 assert!(matches!(result, Err(BypassError::NotStarted)));
2017 }
2018
2019 #[tokio::test]
2020 async fn test_bypass_send_message_too_large() {
2021 let config = BypassChannelConfig {
2022 max_message_size: 10,
2023 udp: UdpConfig {
2024 bind_port: 0,
2025 ..Default::default()
2026 },
2027 ..BypassChannelConfig::new()
2028 };
2029 let mut channel = UdpBypassChannel::new(config).await.unwrap();
2030 channel.start().await.unwrap();
2031
2032 let big_data = vec![0u8; 100];
2033 let result = channel
2034 .send(
2035 BypassTarget::Unicast("127.0.0.1:5000".parse().unwrap()),
2036 "test",
2037 &big_data,
2038 )
2039 .await;
2040 assert!(matches!(
2041 result,
2042 Err(BypassError::MessageTooLarge { size: 100, max: 10 })
2043 ));
2044
2045 channel.stop();
2046 }
2047
2048 #[tokio::test]
2049 async fn test_bypass_send_to_collection_unknown() {
2050 let config = BypassChannelConfig {
2051 udp: UdpConfig {
2052 bind_port: 0,
2053 ..Default::default()
2054 },
2055 ..BypassChannelConfig::new()
2056 };
2057 let mut channel = UdpBypassChannel::new(config).await.unwrap();
2058 channel.start().await.unwrap();
2059
2060 let result = channel.send_to_collection("unknown", None, b"data").await;
2061 assert!(matches!(result, Err(BypassError::Config(_))));
2062
2063 channel.stop();
2064 }
2065
2066 #[tokio::test]
2067 async fn test_bypass_send_to_collection_unicast_no_addr() {
2068 let config = BypassChannelConfig {
2069 udp: UdpConfig {
2070 bind_port: 0,
2071 ..Default::default()
2072 },
2073 collections: vec![BypassCollectionConfig {
2074 collection: "test".into(),
2075 transport: BypassTransport::Unicast,
2076 ..Default::default()
2077 }],
2078 ..BypassChannelConfig::new()
2079 };
2080 let mut channel = UdpBypassChannel::new(config).await.unwrap();
2081 channel.start().await.unwrap();
2082
2083 let result = channel.send_to_collection("test", None, b"data").await;
2085 assert!(matches!(result, Err(BypassError::Config(_))));
2086
2087 channel.stop();
2088 }
2089
2090 #[tokio::test]
2091 async fn test_bypass_send_to_collection_broadcast() {
2092 let config = BypassChannelConfig {
2093 udp: UdpConfig {
2094 bind_port: 0,
2095 ..Default::default()
2096 },
2097 collections: vec![BypassCollectionConfig {
2098 collection: "bcast".into(),
2099 transport: BypassTransport::Broadcast,
2100 ..Default::default()
2101 }],
2102 ..BypassChannelConfig::new()
2103 };
2104 let mut channel = UdpBypassChannel::new(config).await.unwrap();
2105 channel.start().await.unwrap();
2106
2107 let _result = channel.send_to_collection("bcast", None, b"data").await;
2110
2111 channel.stop();
2112 }
2113
2114 #[tokio::test]
2115 async fn test_bypass_leave_multicast_no_socket() {
2116 let config = BypassChannelConfig::new();
2117 let channel = UdpBypassChannel::new(config).await.unwrap();
2118
2119 let result = channel.leave_multicast("239.1.1.1".parse().unwrap());
2121 assert!(result.is_ok());
2122 }
2123
2124 #[test]
2125 fn test_bypass_transport_serde() {
2126 let unicast = BypassTransport::Unicast;
2127 let json = serde_json::to_string(&unicast).unwrap();
2128 let parsed: BypassTransport = serde_json::from_str(&json).unwrap();
2129 assert_eq!(parsed, BypassTransport::Unicast);
2130
2131 let multicast = BypassTransport::Multicast {
2132 group: "239.1.1.100".parse().unwrap(),
2133 port: 5150,
2134 };
2135 let json = serde_json::to_string(&multicast).unwrap();
2136 let parsed: BypassTransport = serde_json::from_str(&json).unwrap();
2137 assert_eq!(parsed, multicast);
2138 }
2139
2140 #[test]
2141 fn test_message_encoding_serde() {
2142 for encoding in &[
2143 MessageEncoding::Protobuf,
2144 MessageEncoding::Json,
2145 MessageEncoding::Raw,
2146 MessageEncoding::Cbor,
2147 ] {
2148 let json = serde_json::to_string(encoding).unwrap();
2149 let parsed: MessageEncoding = serde_json::from_str(&json).unwrap();
2150 assert_eq!(parsed, *encoding);
2151 }
2152 }
2153
2154 #[test]
2155 fn test_bypass_header_flags() {
2156 let mut header = BypassHeader::new("test", Duration::from_millis(1000), 0);
2157
2158 assert_eq!(header.flags, 0);
2160 assert_eq!(header.flags & BypassHeader::FLAG_SIGNED, 0);
2161 assert_eq!(header.flags & BypassHeader::FLAG_ENCRYPTED, 0);
2162
2163 header.flags |= BypassHeader::FLAG_SIGNED;
2165 assert_ne!(header.flags & BypassHeader::FLAG_SIGNED, 0);
2166 assert_eq!(header.flags & BypassHeader::FLAG_ENCRYPTED, 0);
2167
2168 header.flags |= BypassHeader::FLAG_ENCRYPTED;
2170 assert_ne!(header.flags & BypassHeader::FLAG_SIGNED, 0);
2171 assert_ne!(header.flags & BypassHeader::FLAG_ENCRYPTED, 0);
2172
2173 let encoded = header.encode();
2175 let decoded = BypassHeader::decode(&encoded).unwrap();
2176 assert_eq!(decoded.flags, header.flags);
2177 }
2178}