Skip to main content

peat_mesh/transport/
bypass.rs

1//! UDP Bypass Channel for ephemeral data
2//!
3//! Provides a direct UDP pathway that bypasses the CRDT sync engine for
4//! high-frequency, low-latency, or bandwidth-constrained scenarios.
5//!
6//! ## Use Cases
7//!
8//! - High-frequency telemetry (10-100 Hz position updates)
9//! - Low-latency commands (<50ms delivery)
10//! - Bandwidth-constrained links (9.6kbps tactical radio)
11//! - Multicast/broadcast to cell members
12//!
13//! ## Architecture
14//!
15//! ```text
16//! ┌─────────────────────────────────────────────────────────────────┐
17//! │                   Data Flow with Bypass                          │
18//! │                                                                  │
19//! │   Application                                                    │
20//! │       │                                                          │
21//! │       ├──────────────────────────┐                               │
22//! │       ▼                          ▼                               │
23//! │   ┌─────────────┐          ┌─────────────┐                       │
24//! │   │ CRDT Store  │          │  UDP Bypass │ ◄── Ephemeral data    │
25//! │   │ (Automerge) │          │   Channel   │                       │
26//! │   └──────┬──────┘          └──────┬──────┘                       │
27//! │          │                        │                              │
28//! │          ▼                        ▼                              │
29//! │   ┌─────────────┐          ┌─────────────┐                       │
30//! │   │   Iroh      │          │    Raw      │                       │
31//! │   │  Transport  │          │    UDP      │                       │
32//! │   └─────────────┘          └─────────────┘                       │
33//! │                                                                  │
34//! └─────────────────────────────────────────────────────────────────┘
35//! ```
36//!
37//! ## Example
38//!
39//! ```ignore
40//! use peat_mesh::transport::bypass::{UdpBypassChannel, BypassChannelConfig};
41//!
42//! // Create bypass channel
43//! let config = BypassChannelConfig::default();
44//! let channel = UdpBypassChannel::new(config).await?;
45//!
46//! // Send position update via bypass (no CRDT overhead)
47//! channel.send(
48//!     BypassTarget::Multicast { group: "239.1.1.100".parse()?, port: 5150 },
49//!     "position_updates",
50//!     &position_bytes,
51//! ).await?;
52//!
53//! // Subscribe to incoming bypass messages
54//! let mut rx = channel.subscribe("position_updates");
55//! while let Some(msg) = rx.recv().await {
56//!     println!("Received from {}: {:?}", msg.source, msg.data);
57//! }
58//! ```
59
60use std::collections::HashMap;
61use std::hash::{Hash, Hasher};
62use std::net::{IpAddr, Ipv4Addr, SocketAddr};
63use std::sync::atomic::{AtomicBool, AtomicU64, AtomicU8, Ordering};
64use std::sync::{Arc, RwLock};
65use std::time::{Duration, Instant};
66
67use chacha20poly1305::{
68    aead::{Aead, KeyInit},
69    ChaCha20Poly1305, Nonce,
70};
71use ed25519_dalek::{Signature, Signer, SigningKey, Verifier, VerifyingKey};
72use serde::{Deserialize, Serialize};
73use tokio::net::UdpSocket;
74use tokio::sync::broadcast;
75use tracing::{debug, info};
76
77use super::MessagePriority;
78
79// =============================================================================
80// Error Types
81// =============================================================================
82
83/// Error type for bypass channel operations
84#[derive(Debug)]
85pub enum BypassError {
86    /// IO error (socket operations)
87    Io(std::io::Error),
88    /// Encoding error
89    Encode(String),
90    /// Decoding error
91    Decode(String),
92    /// Invalid configuration
93    Config(String),
94    /// Channel not started
95    NotStarted,
96    /// Message too large
97    MessageTooLarge { size: usize, max: usize },
98    /// Invalid header
99    InvalidHeader,
100    /// Message is stale (past TTL)
101    StaleMessage,
102    /// Signature verification failed
103    InvalidSignature,
104    /// Decryption failed
105    DecryptionFailed,
106    /// Source IP not in allowlist
107    UnauthorizedSource(IpAddr),
108    /// Replay attack detected (duplicate sequence)
109    ReplayDetected { sequence: u8 },
110    /// Missing security credential (key not configured)
111    MissingCredential(String),
112}
113
114impl std::fmt::Display for BypassError {
115    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
116        match self {
117            BypassError::Io(e) => write!(f, "IO error: {}", e),
118            BypassError::Encode(msg) => write!(f, "Encode error: {}", msg),
119            BypassError::Decode(msg) => write!(f, "Decode error: {}", msg),
120            BypassError::Config(msg) => write!(f, "Config error: {}", msg),
121            BypassError::NotStarted => write!(f, "Bypass channel not started"),
122            BypassError::MessageTooLarge { size, max } => {
123                write!(f, "Message too large: {} bytes (max {})", size, max)
124            }
125            BypassError::InvalidHeader => write!(f, "Invalid bypass header"),
126            BypassError::StaleMessage => write!(f, "Message is stale (past TTL)"),
127            BypassError::InvalidSignature => write!(f, "Invalid message signature"),
128            BypassError::DecryptionFailed => write!(f, "Message decryption failed"),
129            BypassError::UnauthorizedSource(ip) => {
130                write!(f, "Unauthorized source IP: {}", ip)
131            }
132            BypassError::ReplayDetected { sequence } => {
133                write!(f, "Replay attack detected (sequence {})", sequence)
134            }
135            BypassError::MissingCredential(what) => {
136                write!(f, "Missing security credential: {}", what)
137            }
138        }
139    }
140}
141
142impl std::error::Error for BypassError {
143    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
144        match self {
145            BypassError::Io(e) => Some(e),
146            _ => None,
147        }
148    }
149}
150
151impl From<std::io::Error> for BypassError {
152    fn from(err: std::io::Error) -> Self {
153        BypassError::Io(err)
154    }
155}
156
157pub type Result<T> = std::result::Result<T, BypassError>;
158
159// =============================================================================
160// Configuration Types
161// =============================================================================
162
163/// Transport mode for bypass messages
164#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize, Deserialize)]
165#[serde(tag = "type", rename_all = "snake_case")]
166pub enum BypassTransport {
167    /// UDP unicast to specific peer
168    #[default]
169    Unicast,
170    /// UDP multicast to group
171    Multicast {
172        /// Multicast group address
173        group: IpAddr,
174        /// Port number
175        port: u16,
176    },
177    /// UDP broadcast on subnet
178    Broadcast,
179}
180
181/// Message encoding format
182#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
183#[serde(rename_all = "snake_case")]
184pub enum MessageEncoding {
185    /// Protobuf (recommended - compact)
186    #[default]
187    Protobuf,
188    /// JSON (debugging)
189    Json,
190    /// Raw bytes (minimal overhead)
191    Raw,
192    /// CBOR (compact binary)
193    Cbor,
194}
195
196impl std::fmt::Display for MessageEncoding {
197    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
198        match self {
199            MessageEncoding::Protobuf => write!(f, "protobuf"),
200            MessageEncoding::Json => write!(f, "json"),
201            MessageEncoding::Raw => write!(f, "raw"),
202            MessageEncoding::Cbor => write!(f, "cbor"),
203        }
204    }
205}
206
207/// Configuration for a bypass collection
208#[derive(Debug, Clone, Serialize, Deserialize)]
209pub struct BypassCollectionConfig {
210    /// Collection name
211    pub collection: String,
212    /// Transport mode for this collection
213    pub transport: BypassTransport,
214    /// Message encoding format
215    pub encoding: MessageEncoding,
216    /// Time-to-live for messages in milliseconds
217    #[serde(default = "default_ttl_ms")]
218    pub ttl_ms: u64,
219    /// QoS priority for bandwidth allocation
220    #[serde(default)]
221    pub priority: MessagePriority,
222}
223
224fn default_ttl_ms() -> u64 {
225    5000
226}
227
228impl BypassCollectionConfig {
229    /// Get TTL as Duration
230    pub fn ttl(&self) -> Duration {
231        Duration::from_millis(self.ttl_ms)
232    }
233}
234
235impl Default for BypassCollectionConfig {
236    fn default() -> Self {
237        Self {
238            collection: String::new(),
239            transport: BypassTransport::Unicast,
240            encoding: MessageEncoding::Protobuf,
241            ttl_ms: 5000,
242            priority: MessagePriority::Normal,
243        }
244    }
245}
246
247/// UDP configuration
248#[derive(Debug, Clone, Serialize, Deserialize)]
249pub struct UdpConfig {
250    /// Bind port (0 = ephemeral)
251    pub bind_port: u16,
252    /// Buffer size for receiving
253    pub buffer_size: usize,
254    /// Multicast TTL (hop count)
255    pub multicast_ttl: u32,
256}
257
258impl Default for UdpConfig {
259    fn default() -> Self {
260        Self {
261            bind_port: 5150,
262            buffer_size: 65536,
263            multicast_ttl: 32,
264        }
265    }
266}
267
268/// Configuration for bypass channel
269#[derive(Debug, Clone, Default, Serialize, Deserialize)]
270pub struct BypassChannelConfig {
271    /// UDP configuration
272    pub udp: UdpConfig,
273    /// Collections that use bypass
274    pub collections: Vec<BypassCollectionConfig>,
275    /// Enable multicast support
276    pub multicast_enabled: bool,
277    /// Maximum message size
278    pub max_message_size: usize,
279}
280
281impl BypassChannelConfig {
282    /// Create new configuration with defaults
283    pub fn new() -> Self {
284        Self {
285            udp: UdpConfig::default(),
286            collections: Vec::new(),
287            multicast_enabled: true,
288            max_message_size: 65000, // Leave room for header
289        }
290    }
291
292    /// Add a collection to bypass
293    pub fn with_collection(mut self, config: BypassCollectionConfig) -> Self {
294        self.collections.push(config);
295        self
296    }
297
298    /// Get configuration for a collection
299    pub fn get_collection(&self, name: &str) -> Option<&BypassCollectionConfig> {
300        self.collections.iter().find(|c| c.collection == name)
301    }
302
303    /// Check if a collection uses bypass
304    pub fn is_bypass_collection(&self, name: &str) -> bool {
305        self.collections.iter().any(|c| c.collection == name)
306    }
307}
308
309// =============================================================================
310// Security Configuration (ADR-042 Phase 5)
311// =============================================================================
312
313/// Security configuration for bypass channel
314///
315/// Since bypass messages don't go through Iroh's authenticated QUIC transport,
316/// optional security features can be enabled to protect against:
317/// - Message forgery (signing)
318/// - Eavesdropping (encryption)
319/// - Unauthorized sources (allowlisting)
320/// - Replay attacks (sequence window)
321///
322/// ## Security Tradeoffs
323///
324/// | Feature | Overhead | Latency Impact | Protection |
325/// |---------|----------|----------------|------------|
326/// | Signing | +64 bytes | +0.1-0.5ms | Authenticity |
327/// | Encryption | +16 bytes | +0.1-0.3ms | Confidentiality |
328/// | Allowlist | 0 bytes | ~0ms | Source filtering |
329/// | Replay | 0 bytes | ~0ms | Freshness |
330///
331/// ## When to Use
332///
333/// - **High-frequency telemetry**: Signing only (authenticity without encryption overhead)
334/// - **Sensitive commands**: Full encryption + signing
335/// - **Trusted network**: Allowlist only (minimal overhead)
336/// - **Untrusted network**: All features enabled
337///
338/// ## Example
339///
340/// ```rust
341/// use peat_mesh::transport::bypass::BypassSecurityConfig;
342///
343/// // Minimal security for high-frequency telemetry
344/// let config = BypassSecurityConfig {
345///     require_signature: true,
346///     encrypt_payload: false,
347///     ..Default::default()
348/// };
349///
350/// // Full security for sensitive commands
351/// let config = BypassSecurityConfig::full_security();
352/// ```
353#[derive(Debug, Clone, Default)]
354pub struct BypassSecurityConfig {
355    /// Require Ed25519 signature on all messages
356    ///
357    /// When enabled:
358    /// - Outgoing messages are signed with the node's signing key
359    /// - Incoming messages without valid signatures are rejected
360    /// - Adds ~64 bytes overhead per message
361    pub require_signature: bool,
362
363    /// Encrypt payload with formation key (ChaCha20-Poly1305)
364    ///
365    /// When enabled:
366    /// - Payload is encrypted before sending
367    /// - Header remains unencrypted for routing
368    /// - Adds ~16 bytes overhead (Poly1305 auth tag)
369    pub encrypt_payload: bool,
370
371    /// Filter sources by known peer addresses
372    ///
373    /// When Some:
374    /// - Only accept messages from listed IP addresses
375    /// - Messages from unknown IPs are rejected
376    /// - Useful for trusted network segments
377    pub source_allowlist: Option<Vec<IpAddr>>,
378
379    /// Enable replay protection
380    ///
381    /// When enabled:
382    /// - Track seen sequence numbers per source
383    /// - Reject duplicate sequence numbers within window
384    /// - Window size: 64 sequences (covers ~1 minute at 1 Hz)
385    pub replay_protection: bool,
386
387    /// Replay window size (default: 64)
388    ///
389    /// Number of sequence numbers to track per source.
390    /// Larger windows use more memory but handle more reordering.
391    pub replay_window_size: usize,
392}
393
394impl BypassSecurityConfig {
395    /// Create configuration with no security (fastest)
396    pub fn none() -> Self {
397        Self::default()
398    }
399
400    /// Create configuration with signature only (authenticity)
401    pub fn signed() -> Self {
402        Self {
403            require_signature: true,
404            ..Default::default()
405        }
406    }
407
408    /// Create configuration with full security (all features)
409    pub fn full_security() -> Self {
410        Self {
411            require_signature: true,
412            encrypt_payload: true,
413            source_allowlist: None, // Set separately based on known peers
414            replay_protection: true,
415            replay_window_size: 64,
416        }
417    }
418
419    /// Check if any security features are enabled
420    pub fn is_enabled(&self) -> bool {
421        self.require_signature
422            || self.encrypt_payload
423            || self.source_allowlist.is_some()
424            || self.replay_protection
425    }
426
427    /// Check if source IP is allowed
428    pub fn is_source_allowed(&self, ip: &IpAddr) -> bool {
429        match &self.source_allowlist {
430            Some(list) => list.contains(ip),
431            None => true, // No allowlist = all sources allowed
432        }
433    }
434}
435
436/// Security credentials for bypass channel
437///
438/// Contains the cryptographic keys needed for signing and encryption.
439#[derive(Clone, Default)]
440pub struct BypassSecurityCredentials {
441    /// Ed25519 signing key (private)
442    signing_key: Option<SigningKey>,
443
444    /// Ed25519 verifying keys for known peers (public)
445    /// Maps peer ID or IP to their public key
446    peer_keys: HashMap<String, VerifyingKey>,
447
448    /// ChaCha20-Poly1305 encryption key (shared formation key)
449    /// 256-bit key shared among formation members
450    encryption_key: Option<[u8; 32]>,
451}
452
453impl std::fmt::Debug for BypassSecurityCredentials {
454    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
455        f.debug_struct("BypassSecurityCredentials")
456            .field("has_signing_key", &self.signing_key.is_some())
457            .field("peer_keys_count", &self.peer_keys.len())
458            .field("has_encryption_key", &self.encryption_key.is_some())
459            .finish()
460    }
461}
462
463impl BypassSecurityCredentials {
464    /// Create new empty credentials
465    pub fn new() -> Self {
466        Self::default()
467    }
468
469    /// Set the signing key
470    pub fn with_signing_key(mut self, key: SigningKey) -> Self {
471        self.signing_key = Some(key);
472        self
473    }
474
475    /// Add a peer's verifying key
476    pub fn with_peer_key(mut self, peer_id: impl Into<String>, key: VerifyingKey) -> Self {
477        self.peer_keys.insert(peer_id.into(), key);
478        self
479    }
480
481    /// Set the encryption key
482    pub fn with_encryption_key(mut self, key: [u8; 32]) -> Self {
483        self.encryption_key = Some(key);
484        self
485    }
486
487    /// Get the verifying key for our signing key
488    pub fn verifying_key(&self) -> Option<VerifyingKey> {
489        self.signing_key.as_ref().map(|k| k.verifying_key())
490    }
491
492    /// Sign a message
493    pub fn sign(&self, message: &[u8]) -> Result<Signature> {
494        let key = self
495            .signing_key
496            .as_ref()
497            .ok_or_else(|| BypassError::MissingCredential("signing key".into()))?;
498        Ok(key.sign(message))
499    }
500
501    /// Verify a signature from a known peer
502    pub fn verify(&self, peer_id: &str, message: &[u8], signature: &Signature) -> Result<()> {
503        let key = self
504            .peer_keys
505            .get(peer_id)
506            .ok_or_else(|| BypassError::MissingCredential(format!("peer key for {}", peer_id)))?;
507        key.verify(message, signature)
508            .map_err(|_| BypassError::InvalidSignature)
509    }
510
511    /// Verify a signature using IP address as peer identifier
512    pub fn verify_by_ip(&self, ip: &IpAddr, message: &[u8], signature: &Signature) -> Result<()> {
513        self.verify(&ip.to_string(), message, signature)
514    }
515
516    /// Encrypt payload
517    pub fn encrypt(&self, plaintext: &[u8], nonce: &[u8; 12]) -> Result<Vec<u8>> {
518        let key = self
519            .encryption_key
520            .as_ref()
521            .ok_or_else(|| BypassError::MissingCredential("encryption key".into()))?;
522
523        let cipher = ChaCha20Poly1305::new_from_slice(key)
524            .map_err(|_| BypassError::Config("Invalid encryption key".into()))?;
525
526        let nonce = Nonce::from_slice(nonce);
527        cipher
528            .encrypt(nonce, plaintext)
529            .map_err(|_| BypassError::Encode("Encryption failed".into()))
530    }
531
532    /// Decrypt payload
533    pub fn decrypt(&self, ciphertext: &[u8], nonce: &[u8; 12]) -> Result<Vec<u8>> {
534        let key = self
535            .encryption_key
536            .as_ref()
537            .ok_or_else(|| BypassError::MissingCredential("encryption key".into()))?;
538
539        let cipher = ChaCha20Poly1305::new_from_slice(key)
540            .map_err(|_| BypassError::Config("Invalid encryption key".into()))?;
541
542        let nonce = Nonce::from_slice(nonce);
543        cipher
544            .decrypt(nonce, ciphertext)
545            .map_err(|_| BypassError::DecryptionFailed)
546    }
547}
548
549/// Replay protection tracker
550///
551/// Tracks seen sequence numbers per source to detect replay attacks.
552/// Uses a sliding window to handle out-of-order delivery.
553#[derive(Debug, Default)]
554pub struct ReplayTracker {
555    /// Seen sequences per source IP
556    /// Each entry is a bitset representing seen sequences
557    windows: RwLock<HashMap<IpAddr, ReplayWindow>>,
558
559    /// Window size
560    window_size: usize,
561}
562
563/// Sliding window for replay detection
564#[derive(Debug)]
565struct ReplayWindow {
566    /// Highest sequence seen
567    highest_seq: u8,
568    /// Bitmap of seen sequences (relative to highest)
569    seen: u64,
570}
571
572impl ReplayWindow {
573    fn new() -> Self {
574        Self {
575            highest_seq: 0,
576            seen: 0,
577        }
578    }
579
580    /// Check if sequence is valid (not a replay)
581    /// Returns true if valid, false if replay
582    fn check_and_update(&mut self, seq: u8, window_size: usize) -> bool {
583        let window_size = window_size.min(64) as u8;
584
585        // Calculate distance from highest seen
586        let diff = self.highest_seq.wrapping_sub(seq);
587
588        if diff == 0 && self.seen == 0 {
589            // First packet
590            self.highest_seq = seq;
591            self.seen = 1;
592            return true;
593        }
594
595        if seq == self.highest_seq {
596            // Exact replay of highest
597            return false;
598        }
599
600        // Check if seq is ahead of highest
601        let ahead = seq.wrapping_sub(self.highest_seq);
602        if ahead > 0 && ahead < 128 {
603            // New highest sequence
604            let shift = ahead as u32;
605            if shift < 64 {
606                self.seen = (self.seen << shift) | 1;
607            } else {
608                self.seen = 1;
609            }
610            self.highest_seq = seq;
611            return true;
612        }
613
614        // Check if seq is within window
615        if diff < window_size && diff < 64 {
616            let bit = 1u64 << diff;
617            if self.seen & bit != 0 {
618                // Already seen
619                return false;
620            }
621            self.seen |= bit;
622            return true;
623        }
624
625        // Too old
626        false
627    }
628}
629
630impl ReplayTracker {
631    /// Create new replay tracker
632    pub fn new(window_size: usize) -> Self {
633        Self {
634            windows: RwLock::new(HashMap::new()),
635            window_size: window_size.min(64),
636        }
637    }
638
639    /// Check if message is valid (not a replay)
640    /// Returns Ok(()) if valid, Err if replay detected
641    pub fn check(&self, source: &IpAddr, sequence: u8) -> Result<()> {
642        let mut windows = self.windows.write().unwrap();
643        let window = windows.entry(*source).or_insert_with(ReplayWindow::new);
644
645        if window.check_and_update(sequence, self.window_size) {
646            Ok(())
647        } else {
648            Err(BypassError::ReplayDetected { sequence })
649        }
650    }
651
652    /// Clear tracking for a source
653    pub fn clear_source(&self, source: &IpAddr) {
654        self.windows.write().unwrap().remove(source);
655    }
656
657    /// Clear all tracking
658    pub fn clear_all(&self) {
659        self.windows.write().unwrap().clear();
660    }
661}
662
663// =============================================================================
664// Bypass Header (12 bytes)
665// =============================================================================
666
667/// Bypass message header
668///
669/// Compact 12-byte header for bypass messages:
670/// - Magic: 4 bytes ("PEAT")
671/// - Collection hash: 4 bytes (FNV-1a hash of collection name)
672/// - TTL: 2 bytes (milliseconds, max ~65s)
673/// - Flags: 1 byte
674/// - Sequence: 1 byte (wrapping counter)
675#[derive(Debug, Clone, Copy)]
676pub struct BypassHeader {
677    /// Magic number (0x50454154 = "PEAT")
678    pub magic: [u8; 4],
679    /// Collection name hash (FNV-1a)
680    pub collection_hash: u32,
681    /// TTL in milliseconds
682    pub ttl_ms: u16,
683    /// Flags
684    pub flags: u8,
685    /// Sequence number
686    pub sequence: u8,
687}
688
689impl BypassHeader {
690    /// Magic bytes: "PEAT"
691    pub const MAGIC: [u8; 4] = [0x45, 0x43, 0x48, 0x45];
692
693    /// Header size in bytes
694    pub const SIZE: usize = 12;
695
696    /// Flag: message is compressed
697    pub const FLAG_COMPRESSED: u8 = 0x01;
698    /// Flag: message is encrypted
699    pub const FLAG_ENCRYPTED: u8 = 0x02;
700    /// Flag: message is signed
701    pub const FLAG_SIGNED: u8 = 0x04;
702
703    /// Create a new header
704    pub fn new(collection: &str, ttl: Duration, sequence: u8) -> Self {
705        Self {
706            magic: Self::MAGIC,
707            collection_hash: Self::hash_collection(collection),
708            ttl_ms: ttl.as_millis().min(u16::MAX as u128) as u16,
709            flags: 0,
710            sequence,
711        }
712    }
713
714    /// Hash a collection name using FNV-1a
715    pub fn hash_collection(name: &str) -> u32 {
716        let mut hasher = fnv::FnvHasher::default();
717        name.hash(&mut hasher);
718        hasher.finish() as u32
719    }
720
721    /// Check if header has valid magic
722    pub fn is_valid(&self) -> bool {
723        self.magic == Self::MAGIC
724    }
725
726    /// Encode header to bytes
727    pub fn encode(&self) -> [u8; Self::SIZE] {
728        let mut buf = [0u8; Self::SIZE];
729        buf[0..4].copy_from_slice(&self.magic);
730        buf[4..8].copy_from_slice(&self.collection_hash.to_be_bytes());
731        buf[8..10].copy_from_slice(&self.ttl_ms.to_be_bytes());
732        buf[10] = self.flags;
733        buf[11] = self.sequence;
734        buf
735    }
736
737    /// Decode header from bytes
738    pub fn decode(buf: &[u8]) -> Result<Self> {
739        if buf.len() < Self::SIZE {
740            return Err(BypassError::InvalidHeader);
741        }
742
743        let mut magic = [0u8; 4];
744        magic.copy_from_slice(&buf[0..4]);
745
746        if magic != Self::MAGIC {
747            return Err(BypassError::InvalidHeader);
748        }
749
750        let collection_hash = u32::from_be_bytes([buf[4], buf[5], buf[6], buf[7]]);
751        let ttl_ms = u16::from_be_bytes([buf[8], buf[9]]);
752        let flags = buf[10];
753        let sequence = buf[11];
754
755        Ok(Self {
756            magic,
757            collection_hash,
758            ttl_ms,
759            flags,
760            sequence,
761        })
762    }
763
764    /// Check if message is stale based on TTL
765    pub fn is_stale(&self, received_at: Instant, sent_at: Instant) -> bool {
766        let elapsed = received_at.duration_since(sent_at);
767        elapsed > Duration::from_millis(self.ttl_ms as u64)
768    }
769}
770
771// =============================================================================
772// FNV-1a Hasher (simple, fast hash for collection names)
773// =============================================================================
774
775mod fnv {
776    use std::hash::Hasher;
777
778    const FNV_OFFSET_BASIS: u64 = 14695981039346656037;
779    const FNV_PRIME: u64 = 1099511628211;
780
781    #[derive(Default)]
782    pub struct FnvHasher(u64);
783
784    impl Hasher for FnvHasher {
785        fn write(&mut self, bytes: &[u8]) {
786            for byte in bytes {
787                self.0 ^= *byte as u64;
788                self.0 = self.0.wrapping_mul(FNV_PRIME);
789            }
790        }
791
792        fn finish(&self) -> u64 {
793            self.0
794        }
795    }
796
797    impl FnvHasher {
798        pub fn default() -> Self {
799            Self(FNV_OFFSET_BASIS)
800        }
801    }
802}
803
804// =============================================================================
805// Bypass Message
806// =============================================================================
807
808/// Incoming bypass message
809#[derive(Debug, Clone)]
810pub struct BypassMessage {
811    /// Source address
812    pub source: SocketAddr,
813    /// Collection hash (from header)
814    pub collection_hash: u32,
815    /// Message payload (decoded)
816    pub data: Vec<u8>,
817    /// When message was received
818    pub received_at: Instant,
819    /// Sequence number
820    pub sequence: u8,
821    /// Message priority (inferred from collection config)
822    pub priority: MessagePriority,
823}
824
825/// Target for bypass send
826#[derive(Debug, Clone)]
827pub enum BypassTarget {
828    /// Unicast to specific address
829    Unicast(SocketAddr),
830    /// Multicast to group
831    Multicast { group: IpAddr, port: u16 },
832    /// Broadcast on subnet
833    Broadcast { port: u16 },
834}
835
836// =============================================================================
837// Bypass Metrics
838// =============================================================================
839
840/// Metrics for bypass channel
841#[derive(Debug, Default)]
842pub struct BypassMetrics {
843    /// Messages sent
844    pub messages_sent: AtomicU64,
845    /// Messages received
846    pub messages_received: AtomicU64,
847    /// Bytes sent
848    pub bytes_sent: AtomicU64,
849    /// Bytes received
850    pub bytes_received: AtomicU64,
851    /// Messages dropped (stale)
852    pub stale_dropped: AtomicU64,
853    /// Messages dropped (invalid header)
854    pub invalid_dropped: AtomicU64,
855    /// Send errors
856    pub send_errors: AtomicU64,
857    /// Receive errors
858    pub receive_errors: AtomicU64,
859
860    // Security metrics (ADR-042 Phase 5)
861    /// Messages rejected due to invalid signature
862    pub signature_rejected: AtomicU64,
863    /// Messages rejected due to decryption failure
864    pub decryption_failed: AtomicU64,
865    /// Messages rejected due to unauthorized source
866    pub unauthorized_source: AtomicU64,
867    /// Messages rejected due to replay detection
868    pub replay_rejected: AtomicU64,
869}
870
871impl BypassMetrics {
872    /// Create snapshot of current metrics
873    pub fn snapshot(&self) -> BypassMetricsSnapshot {
874        BypassMetricsSnapshot {
875            messages_sent: self.messages_sent.load(Ordering::Relaxed),
876            messages_received: self.messages_received.load(Ordering::Relaxed),
877            bytes_sent: self.bytes_sent.load(Ordering::Relaxed),
878            bytes_received: self.bytes_received.load(Ordering::Relaxed),
879            stale_dropped: self.stale_dropped.load(Ordering::Relaxed),
880            invalid_dropped: self.invalid_dropped.load(Ordering::Relaxed),
881            send_errors: self.send_errors.load(Ordering::Relaxed),
882            receive_errors: self.receive_errors.load(Ordering::Relaxed),
883            signature_rejected: self.signature_rejected.load(Ordering::Relaxed),
884            decryption_failed: self.decryption_failed.load(Ordering::Relaxed),
885            unauthorized_source: self.unauthorized_source.load(Ordering::Relaxed),
886            replay_rejected: self.replay_rejected.load(Ordering::Relaxed),
887        }
888    }
889}
890
891/// Snapshot of bypass metrics
892#[derive(Debug, Clone, Default)]
893pub struct BypassMetricsSnapshot {
894    pub messages_sent: u64,
895    pub messages_received: u64,
896    pub bytes_sent: u64,
897    pub bytes_received: u64,
898    pub stale_dropped: u64,
899    pub invalid_dropped: u64,
900    pub send_errors: u64,
901    pub receive_errors: u64,
902
903    // Security metrics (ADR-042 Phase 5)
904    /// Messages rejected due to invalid signature
905    pub signature_rejected: u64,
906    /// Messages rejected due to decryption failure
907    pub decryption_failed: u64,
908    /// Messages rejected due to unauthorized source
909    pub unauthorized_source: u64,
910    /// Messages rejected due to replay detection
911    pub replay_rejected: u64,
912}
913
914// =============================================================================
915// UDP Bypass Channel
916// =============================================================================
917
918/// UDP Bypass Channel for ephemeral data
919///
920/// Provides direct UDP messaging that bypasses CRDT sync for
921/// low-latency, high-frequency data.
922pub struct UdpBypassChannel {
923    /// Configuration
924    config: BypassChannelConfig,
925
926    /// UDP socket for unicast/broadcast
927    socket: Option<Arc<UdpSocket>>,
928
929    /// Multicast sockets per group
930    multicast_sockets: RwLock<HashMap<IpAddr, Arc<UdpSocket>>>,
931
932    /// Collection hash to config mapping
933    collection_map: HashMap<u32, BypassCollectionConfig>,
934
935    /// Sequence counter
936    sequence: AtomicU8,
937
938    /// Metrics
939    metrics: Arc<BypassMetrics>,
940
941    /// Broadcast sender for incoming messages
942    incoming_tx: broadcast::Sender<BypassMessage>,
943
944    /// Running flag
945    running: Arc<AtomicBool>,
946}
947
948impl UdpBypassChannel {
949    /// Create a new bypass channel
950    pub async fn new(config: BypassChannelConfig) -> Result<Self> {
951        // Build collection hash map
952        let collection_map: HashMap<u32, BypassCollectionConfig> = config
953            .collections
954            .iter()
955            .map(|c| (BypassHeader::hash_collection(&c.collection), c.clone()))
956            .collect();
957
958        let (incoming_tx, _) = broadcast::channel(1024);
959
960        Ok(Self {
961            config,
962            socket: None,
963            multicast_sockets: RwLock::new(HashMap::new()),
964            collection_map,
965            sequence: AtomicU8::new(0),
966            metrics: Arc::new(BypassMetrics::default()),
967            incoming_tx,
968            running: Arc::new(AtomicBool::new(false)),
969        })
970    }
971
972    /// Start the bypass channel
973    pub async fn start(&mut self) -> Result<()> {
974        if self.running.load(Ordering::SeqCst) {
975            return Ok(());
976        }
977
978        // Bind UDP socket
979        let bind_addr = format!("0.0.0.0:{}", self.config.udp.bind_port);
980        let socket = UdpSocket::bind(&bind_addr).await?;
981        socket.set_broadcast(true)?;
982
983        let socket = Arc::new(socket);
984        self.socket = Some(socket.clone());
985
986        // Start receiver loop
987        let incoming_tx = self.incoming_tx.clone();
988        let metrics = self.metrics.clone();
989        let collection_map = self.collection_map.clone();
990        let buffer_size = self.config.udp.buffer_size;
991        let running = self.running.clone();
992
993        running.store(true, Ordering::SeqCst);
994
995        tokio::spawn(async move {
996            let mut buf = vec![0u8; buffer_size];
997
998            while running.load(Ordering::SeqCst) {
999                match tokio::time::timeout(Duration::from_millis(100), socket.recv_from(&mut buf))
1000                    .await
1001                {
1002                    Ok(Ok((len, src))) => {
1003                        let received_at = Instant::now();
1004
1005                        // Parse header
1006                        if len < BypassHeader::SIZE {
1007                            metrics.invalid_dropped.fetch_add(1, Ordering::Relaxed);
1008                            continue;
1009                        }
1010
1011                        let header = match BypassHeader::decode(&buf[..BypassHeader::SIZE]) {
1012                            Ok(h) => h,
1013                            Err(_) => {
1014                                metrics.invalid_dropped.fetch_add(1, Ordering::Relaxed);
1015                                continue;
1016                            }
1017                        };
1018
1019                        // Extract payload
1020                        let payload = buf[BypassHeader::SIZE..len].to_vec();
1021
1022                        // Look up collection config for priority
1023                        let priority = collection_map
1024                            .get(&header.collection_hash)
1025                            .map(|c| c.priority)
1026                            .unwrap_or(MessagePriority::Normal);
1027
1028                        let message = BypassMessage {
1029                            source: src,
1030                            collection_hash: header.collection_hash,
1031                            data: payload,
1032                            received_at,
1033                            sequence: header.sequence,
1034                            priority,
1035                        };
1036
1037                        metrics.messages_received.fetch_add(1, Ordering::Relaxed);
1038                        metrics
1039                            .bytes_received
1040                            .fetch_add(len as u64, Ordering::Relaxed);
1041
1042                        // Broadcast to subscribers (ignore if no subscribers)
1043                        let _ = incoming_tx.send(message);
1044                    }
1045                    Ok(Err(_e)) => {
1046                        metrics.receive_errors.fetch_add(1, Ordering::Relaxed);
1047                    }
1048                    Err(_) => {
1049                        // Timeout, just continue
1050                    }
1051                }
1052            }
1053        });
1054
1055        info!(
1056            "Bypass channel started on port {}",
1057            self.config.udp.bind_port
1058        );
1059        Ok(())
1060    }
1061
1062    /// Stop the bypass channel
1063    pub fn stop(&mut self) {
1064        self.running.store(false, Ordering::SeqCst);
1065        self.socket = None;
1066        self.multicast_sockets.write().unwrap().clear();
1067        info!("Bypass channel stopped");
1068    }
1069
1070    /// Check if channel is running
1071    pub fn is_running(&self) -> bool {
1072        self.running.load(Ordering::SeqCst)
1073    }
1074
1075    /// Get the next sequence number
1076    fn next_sequence(&self) -> u8 {
1077        self.sequence.fetch_add(1, Ordering::Relaxed)
1078    }
1079
1080    /// Send a message via bypass channel
1081    pub async fn send(&self, target: BypassTarget, collection: &str, data: &[u8]) -> Result<()> {
1082        let socket = self.socket.as_ref().ok_or(BypassError::NotStarted)?;
1083
1084        // Check message size (0 = unlimited)
1085        if self.config.max_message_size > 0 && data.len() > self.config.max_message_size {
1086            return Err(BypassError::MessageTooLarge {
1087                size: data.len(),
1088                max: self.config.max_message_size,
1089            });
1090        }
1091
1092        // Get TTL from collection config or use default
1093        let ttl = self
1094            .config
1095            .get_collection(collection)
1096            .map(|c| c.ttl())
1097            .unwrap_or(Duration::from_secs(5));
1098
1099        // Create header
1100        let header = BypassHeader::new(collection, ttl, self.next_sequence());
1101        let header_bytes = header.encode();
1102
1103        // Build frame
1104        let mut frame = Vec::with_capacity(BypassHeader::SIZE + data.len());
1105        frame.extend_from_slice(&header_bytes);
1106        frame.extend_from_slice(data);
1107
1108        // Send based on target
1109        let bytes_sent = match target {
1110            BypassTarget::Unicast(addr) => socket.send_to(&frame, addr).await?,
1111            BypassTarget::Multicast { group, port } => {
1112                let mcast_socket = self.get_or_create_multicast(group).await?;
1113                mcast_socket.send_to(&frame, (group, port)).await?
1114            }
1115            BypassTarget::Broadcast { port } => {
1116                let broadcast_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::BROADCAST), port);
1117                socket.send_to(&frame, broadcast_addr).await?
1118            }
1119        };
1120
1121        self.metrics.messages_sent.fetch_add(1, Ordering::Relaxed);
1122        self.metrics
1123            .bytes_sent
1124            .fetch_add(bytes_sent as u64, Ordering::Relaxed);
1125
1126        Ok(())
1127    }
1128
1129    /// Send with collection config (uses config's transport settings)
1130    pub async fn send_to_collection(
1131        &self,
1132        collection: &str,
1133        target_addr: Option<SocketAddr>,
1134        data: &[u8],
1135    ) -> Result<()> {
1136        let config = self
1137            .config
1138            .get_collection(collection)
1139            .ok_or_else(|| BypassError::Config(format!("Unknown collection: {}", collection)))?;
1140
1141        let target = match &config.transport {
1142            BypassTransport::Unicast => {
1143                let addr = target_addr.ok_or_else(|| {
1144                    BypassError::Config("Unicast requires target address".to_string())
1145                })?;
1146                BypassTarget::Unicast(addr)
1147            }
1148            BypassTransport::Multicast { group, port } => BypassTarget::Multicast {
1149                group: *group,
1150                port: *port,
1151            },
1152            BypassTransport::Broadcast => BypassTarget::Broadcast {
1153                port: self.config.udp.bind_port,
1154            },
1155        };
1156
1157        self.send(target, collection, data).await
1158    }
1159
1160    /// Subscribe to incoming bypass messages
1161    pub fn subscribe(&self) -> broadcast::Receiver<BypassMessage> {
1162        self.incoming_tx.subscribe()
1163    }
1164
1165    /// Subscribe to messages for a specific collection
1166    pub fn subscribe_collection(
1167        &self,
1168        collection: &str,
1169    ) -> (u32, broadcast::Receiver<BypassMessage>) {
1170        let hash = BypassHeader::hash_collection(collection);
1171        (hash, self.incoming_tx.subscribe())
1172    }
1173
1174    /// Get or create a multicast socket for a group
1175    async fn get_or_create_multicast(&self, group: IpAddr) -> Result<Arc<UdpSocket>> {
1176        // Check if already exists
1177        {
1178            let sockets = self.multicast_sockets.read().unwrap();
1179            if let Some(socket) = sockets.get(&group) {
1180                return Ok(socket.clone());
1181            }
1182        }
1183
1184        // Create new multicast socket
1185        let socket = UdpSocket::bind("0.0.0.0:0").await?;
1186
1187        match group {
1188            IpAddr::V4(addr) => {
1189                socket.join_multicast_v4(addr, Ipv4Addr::UNSPECIFIED)?;
1190                socket.set_multicast_ttl_v4(self.config.udp.multicast_ttl)?;
1191            }
1192            IpAddr::V6(addr) => {
1193                socket.join_multicast_v6(&addr, 0)?;
1194            }
1195        }
1196
1197        let socket = Arc::new(socket);
1198        self.multicast_sockets
1199            .write()
1200            .unwrap()
1201            .insert(group, socket.clone());
1202
1203        debug!("Joined multicast group: {}", group);
1204        Ok(socket)
1205    }
1206
1207    /// Leave a multicast group
1208    pub fn leave_multicast(&self, group: IpAddr) -> Result<()> {
1209        if let Some(socket) = self.multicast_sockets.write().unwrap().remove(&group) {
1210            match group {
1211                IpAddr::V4(addr) => {
1212                    // Note: socket drop will leave the group, but explicit leave is cleaner
1213                    if let Ok(socket) = Arc::try_unwrap(socket) {
1214                        let _ = socket.leave_multicast_v4(addr, Ipv4Addr::UNSPECIFIED);
1215                    }
1216                }
1217                IpAddr::V6(addr) => {
1218                    if let Ok(socket) = Arc::try_unwrap(socket) {
1219                        let _ = socket.leave_multicast_v6(&addr, 0);
1220                    }
1221                }
1222            }
1223            debug!("Left multicast group: {}", group);
1224        }
1225        Ok(())
1226    }
1227
1228    /// Get current metrics
1229    pub fn metrics(&self) -> BypassMetricsSnapshot {
1230        self.metrics.snapshot()
1231    }
1232
1233    /// Get configuration
1234    pub fn config(&self) -> &BypassChannelConfig {
1235        &self.config
1236    }
1237
1238    /// Check if a collection is configured for bypass
1239    pub fn is_bypass_collection(&self, name: &str) -> bool {
1240        self.config.is_bypass_collection(name)
1241    }
1242
1243    /// Get collection config by hash
1244    pub fn get_collection_by_hash(&self, hash: u32) -> Option<&BypassCollectionConfig> {
1245        self.collection_map.get(&hash)
1246    }
1247}
1248
1249// =============================================================================
1250// Tests
1251// =============================================================================
1252
1253#[cfg(test)]
1254mod tests {
1255    use super::*;
1256
1257    #[test]
1258    fn test_bypass_header_encode_decode() {
1259        let header = BypassHeader::new("test_collection", Duration::from_millis(1000), 42);
1260        let encoded = header.encode();
1261        let decoded = BypassHeader::decode(&encoded).unwrap();
1262
1263        assert_eq!(decoded.magic, BypassHeader::MAGIC);
1264        assert_eq!(decoded.collection_hash, header.collection_hash);
1265        assert_eq!(decoded.ttl_ms, 1000);
1266        assert_eq!(decoded.sequence, 42);
1267        assert!(decoded.is_valid());
1268    }
1269
1270    #[test]
1271    fn test_bypass_header_invalid_magic() {
1272        let mut data = [0u8; 12];
1273        data[0..4].copy_from_slice(&[0, 0, 0, 0]);
1274        let result = BypassHeader::decode(&data);
1275        assert!(result.is_err());
1276    }
1277
1278    #[test]
1279    fn test_bypass_header_too_short() {
1280        let data = [0u8; 8];
1281        let result = BypassHeader::decode(&data);
1282        assert!(result.is_err());
1283    }
1284
1285    #[test]
1286    fn test_collection_hash_consistency() {
1287        let hash1 = BypassHeader::hash_collection("position_updates");
1288        let hash2 = BypassHeader::hash_collection("position_updates");
1289        let hash3 = BypassHeader::hash_collection("sensor_data");
1290
1291        assert_eq!(hash1, hash2);
1292        assert_ne!(hash1, hash3);
1293    }
1294
1295    #[test]
1296    fn test_bypass_config() {
1297        let config = BypassChannelConfig::new()
1298            .with_collection(BypassCollectionConfig {
1299                collection: "positions".into(),
1300                transport: BypassTransport::Multicast {
1301                    group: "239.1.1.100".parse().unwrap(),
1302                    port: 5150,
1303                },
1304                encoding: MessageEncoding::Protobuf,
1305                ttl_ms: 200,
1306                priority: MessagePriority::High,
1307            })
1308            .with_collection(BypassCollectionConfig {
1309                collection: "telemetry".into(),
1310                transport: BypassTransport::Unicast,
1311                encoding: MessageEncoding::Cbor,
1312                ttl_ms: 5000,
1313                priority: MessagePriority::Normal,
1314            });
1315
1316        assert!(config.is_bypass_collection("positions"));
1317        assert!(config.is_bypass_collection("telemetry"));
1318        assert!(!config.is_bypass_collection("unknown"));
1319
1320        let pos_config = config.get_collection("positions").unwrap();
1321        assert_eq!(pos_config.priority, MessagePriority::High);
1322    }
1323
1324    #[test]
1325    fn test_ttl_clamping() {
1326        // TTL greater than u16::MAX should be clamped
1327        let header = BypassHeader::new("test", Duration::from_secs(1000), 0);
1328        assert_eq!(header.ttl_ms, u16::MAX);
1329    }
1330
1331    #[tokio::test]
1332    async fn test_bypass_channel_creation() {
1333        let config = BypassChannelConfig::new().with_collection(BypassCollectionConfig {
1334            collection: "test".into(),
1335            ..Default::default()
1336        });
1337
1338        let channel = UdpBypassChannel::new(config).await.unwrap();
1339        assert!(!channel.is_running());
1340        assert!(channel.is_bypass_collection("test"));
1341    }
1342
1343    #[tokio::test]
1344    async fn test_bypass_channel_start_stop() {
1345        let config = BypassChannelConfig {
1346            udp: UdpConfig {
1347                bind_port: 0, // Ephemeral port
1348                ..Default::default()
1349            },
1350            ..Default::default()
1351        };
1352
1353        let mut channel = UdpBypassChannel::new(config).await.unwrap();
1354
1355        channel.start().await.unwrap();
1356        assert!(channel.is_running());
1357
1358        channel.stop();
1359        assert!(!channel.is_running());
1360    }
1361
1362    #[tokio::test]
1363    async fn test_bypass_send_receive() {
1364        // Create two channels on different ports
1365        let config1 = BypassChannelConfig {
1366            udp: UdpConfig {
1367                bind_port: 0,
1368                ..Default::default()
1369            },
1370            collections: vec![BypassCollectionConfig {
1371                collection: "test".into(),
1372                ttl_ms: 5000,
1373                ..Default::default()
1374            }],
1375            ..Default::default()
1376        };
1377
1378        let config2 = BypassChannelConfig {
1379            udp: UdpConfig {
1380                bind_port: 0,
1381                ..Default::default()
1382            },
1383            collections: vec![BypassCollectionConfig {
1384                collection: "test".into(),
1385                ttl_ms: 5000,
1386                ..Default::default()
1387            }],
1388            ..Default::default()
1389        };
1390
1391        let mut channel1 = UdpBypassChannel::new(config1).await.unwrap();
1392        let mut channel2 = UdpBypassChannel::new(config2).await.unwrap();
1393
1394        channel1.start().await.unwrap();
1395        channel2.start().await.unwrap();
1396
1397        // Get channel2's port and construct localhost address
1398        let socket2_port = channel2
1399            .socket
1400            .as_ref()
1401            .unwrap()
1402            .local_addr()
1403            .unwrap()
1404            .port();
1405        let socket2_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), socket2_port);
1406
1407        // Subscribe to messages on channel2
1408        let mut rx = channel2.subscribe();
1409
1410        // Send from channel1 to channel2
1411        let test_data = b"Hello, bypass!";
1412        channel1
1413            .send(BypassTarget::Unicast(socket2_addr), "test", test_data)
1414            .await
1415            .unwrap();
1416
1417        // Receive on channel2
1418        let msg = tokio::time::timeout(Duration::from_millis(500), rx.recv())
1419            .await
1420            .expect("timeout")
1421            .expect("receive error");
1422
1423        assert_eq!(msg.data, test_data);
1424        assert_eq!(msg.collection_hash, BypassHeader::hash_collection("test"));
1425
1426        // Check metrics
1427        let metrics1 = channel1.metrics();
1428        assert_eq!(metrics1.messages_sent, 1);
1429        assert!(metrics1.bytes_sent > 0);
1430
1431        let metrics2 = channel2.metrics();
1432        assert_eq!(metrics2.messages_received, 1);
1433        assert!(metrics2.bytes_received > 0);
1434
1435        channel1.stop();
1436        channel2.stop();
1437    }
1438
1439    #[test]
1440    fn test_message_too_large() {
1441        // This test doesn't need async since we're just testing the error condition
1442        let _config = BypassChannelConfig {
1443            max_message_size: 100,
1444            ..Default::default()
1445        };
1446
1447        // Create error manually since we can't easily test async in sync context
1448        let err = BypassError::MessageTooLarge {
1449            size: 200,
1450            max: 100,
1451        };
1452        assert!(err.to_string().contains("200"));
1453        assert!(err.to_string().contains("100"));
1454    }
1455
1456    // =========================================================================
1457    // Security Tests (ADR-042 Phase 5)
1458    // =========================================================================
1459
1460    #[test]
1461    fn test_security_config_none() {
1462        let config = BypassSecurityConfig::none();
1463        assert!(!config.require_signature);
1464        assert!(!config.encrypt_payload);
1465        assert!(config.source_allowlist.is_none());
1466        assert!(!config.replay_protection);
1467        assert!(!config.is_enabled());
1468    }
1469
1470    #[test]
1471    fn test_security_config_signed() {
1472        let config = BypassSecurityConfig::signed();
1473        assert!(config.require_signature);
1474        assert!(!config.encrypt_payload);
1475        assert!(config.is_enabled());
1476    }
1477
1478    #[test]
1479    fn test_security_config_full() {
1480        let config = BypassSecurityConfig::full_security();
1481        assert!(config.require_signature);
1482        assert!(config.encrypt_payload);
1483        assert!(config.replay_protection);
1484        assert_eq!(config.replay_window_size, 64);
1485        assert!(config.is_enabled());
1486    }
1487
1488    #[test]
1489    fn test_security_config_allowlist() {
1490        let ip1: IpAddr = "192.168.1.1".parse().unwrap();
1491        let ip2: IpAddr = "192.168.1.2".parse().unwrap();
1492        let ip3: IpAddr = "10.0.0.1".parse().unwrap();
1493
1494        // No allowlist - all allowed
1495        let config = BypassSecurityConfig::none();
1496        assert!(config.is_source_allowed(&ip1));
1497        assert!(config.is_source_allowed(&ip2));
1498        assert!(config.is_source_allowed(&ip3));
1499
1500        // With allowlist - only listed IPs allowed
1501        let config = BypassSecurityConfig {
1502            source_allowlist: Some(vec![ip1, ip2]),
1503            ..Default::default()
1504        };
1505        assert!(config.is_source_allowed(&ip1));
1506        assert!(config.is_source_allowed(&ip2));
1507        assert!(!config.is_source_allowed(&ip3));
1508    }
1509
1510    #[test]
1511    fn test_credentials_signing() {
1512        use ed25519_dalek::SigningKey;
1513
1514        // Use fixed seed for deterministic testing
1515        let seed: [u8; 32] = [1u8; 32];
1516        let signing_key = SigningKey::from_bytes(&seed);
1517        let verifying_key = signing_key.verifying_key();
1518
1519        // Create credentials
1520        let peer_ip: IpAddr = "192.168.1.1".parse().unwrap();
1521        let creds = BypassSecurityCredentials::new()
1522            .with_signing_key(signing_key)
1523            .with_peer_key(peer_ip.to_string(), verifying_key);
1524
1525        // Sign a message
1526        let message = b"test message for signing";
1527        let signature = creds.sign(message).expect("signing should succeed");
1528
1529        // Verify the signature
1530        creds
1531            .verify_by_ip(&peer_ip, message, &signature)
1532            .expect("verification should succeed");
1533    }
1534
1535    #[test]
1536    fn test_credentials_invalid_signature() {
1537        use ed25519_dalek::SigningKey;
1538
1539        // Use different fixed seeds for two key pairs
1540        let seed1: [u8; 32] = [1u8; 32];
1541        let seed2: [u8; 32] = [2u8; 32];
1542        let signing_key1 = SigningKey::from_bytes(&seed1);
1543        let signing_key2 = SigningKey::from_bytes(&seed2);
1544        let verifying_key2 = signing_key2.verifying_key();
1545
1546        // Create credentials with mismatched keys
1547        let peer_ip: IpAddr = "192.168.1.1".parse().unwrap();
1548        let creds = BypassSecurityCredentials::new()
1549            .with_signing_key(signing_key1)
1550            .with_peer_key(peer_ip.to_string(), verifying_key2);
1551
1552        // Sign with key1
1553        let message = b"test message";
1554        let signature = creds.sign(message).expect("signing should succeed");
1555
1556        // Verification with key2 should fail
1557        let result = creds.verify_by_ip(&peer_ip, message, &signature);
1558        assert!(matches!(result, Err(BypassError::InvalidSignature)));
1559    }
1560
1561    #[test]
1562    fn test_credentials_encryption() {
1563        let encryption_key = [42u8; 32];
1564        let creds = BypassSecurityCredentials::new().with_encryption_key(encryption_key);
1565
1566        let plaintext = b"secret message for encryption";
1567        let nonce = [0u8; 12];
1568
1569        // Encrypt
1570        let ciphertext = creds
1571            .encrypt(plaintext, &nonce)
1572            .expect("encryption should succeed");
1573        assert_ne!(ciphertext.as_slice(), plaintext);
1574        // Ciphertext should be plaintext + 16 bytes (Poly1305 tag)
1575        assert_eq!(ciphertext.len(), plaintext.len() + 16);
1576
1577        // Decrypt
1578        let decrypted = creds
1579            .decrypt(&ciphertext, &nonce)
1580            .expect("decryption should succeed");
1581        assert_eq!(decrypted, plaintext);
1582    }
1583
1584    #[test]
1585    fn test_credentials_decryption_wrong_key() {
1586        let key1 = [1u8; 32];
1587        let key2 = [2u8; 32];
1588
1589        let creds1 = BypassSecurityCredentials::new().with_encryption_key(key1);
1590        let creds2 = BypassSecurityCredentials::new().with_encryption_key(key2);
1591
1592        let plaintext = b"secret message";
1593        let nonce = [0u8; 12];
1594
1595        // Encrypt with key1
1596        let ciphertext = creds1
1597            .encrypt(plaintext, &nonce)
1598            .expect("encryption should succeed");
1599
1600        // Decrypt with key2 should fail
1601        let result = creds2.decrypt(&ciphertext, &nonce);
1602        assert!(matches!(result, Err(BypassError::DecryptionFailed)));
1603    }
1604
1605    #[test]
1606    fn test_credentials_missing_signing_key() {
1607        let creds = BypassSecurityCredentials::new();
1608        let result = creds.sign(b"message");
1609        assert!(matches!(result, Err(BypassError::MissingCredential(_))));
1610    }
1611
1612    #[test]
1613    fn test_credentials_missing_encryption_key() {
1614        let creds = BypassSecurityCredentials::new();
1615        let result = creds.encrypt(b"message", &[0u8; 12]);
1616        assert!(matches!(result, Err(BypassError::MissingCredential(_))));
1617    }
1618
1619    #[test]
1620    fn test_replay_tracker_first_message() {
1621        let tracker = ReplayTracker::new(64);
1622        let source: IpAddr = "192.168.1.1".parse().unwrap();
1623
1624        // First message should always succeed
1625        tracker
1626            .check(&source, 0)
1627            .expect("first message should succeed");
1628    }
1629
1630    #[test]
1631    fn test_replay_tracker_sequential() {
1632        let tracker = ReplayTracker::new(64);
1633        let source: IpAddr = "192.168.1.1".parse().unwrap();
1634
1635        // Sequential messages should all succeed
1636        for seq in 0..10u8 {
1637            tracker
1638                .check(&source, seq)
1639                .unwrap_or_else(|_| panic!("sequence {} should succeed", seq));
1640        }
1641    }
1642
1643    #[test]
1644    fn test_replay_tracker_replay_detected() {
1645        let tracker = ReplayTracker::new(64);
1646        let source: IpAddr = "192.168.1.1".parse().unwrap();
1647
1648        // First message succeeds
1649        tracker.check(&source, 5).expect("first should succeed");
1650
1651        // Replay should be detected
1652        let result = tracker.check(&source, 5);
1653        assert!(matches!(
1654            result,
1655            Err(BypassError::ReplayDetected { sequence: 5 })
1656        ));
1657    }
1658
1659    #[test]
1660    fn test_replay_tracker_out_of_order() {
1661        let tracker = ReplayTracker::new(64);
1662        let source: IpAddr = "192.168.1.1".parse().unwrap();
1663
1664        // Messages can arrive out of order within window
1665        tracker.check(&source, 10).expect("10 should succeed");
1666        tracker
1667            .check(&source, 8)
1668            .expect("8 should succeed (within window)");
1669        tracker.check(&source, 12).expect("12 should succeed");
1670        tracker
1671            .check(&source, 9)
1672            .expect("9 should succeed (within window)");
1673
1674        // But not replayed
1675        let result = tracker.check(&source, 8);
1676        assert!(matches!(
1677            result,
1678            Err(BypassError::ReplayDetected { sequence: 8 })
1679        ));
1680    }
1681
1682    #[test]
1683    fn test_replay_tracker_multiple_sources() {
1684        let tracker = ReplayTracker::new(64);
1685        let source1: IpAddr = "192.168.1.1".parse().unwrap();
1686        let source2: IpAddr = "192.168.1.2".parse().unwrap();
1687
1688        // Same sequence from different sources should both succeed
1689        tracker
1690            .check(&source1, 5)
1691            .expect("source1 seq 5 should succeed");
1692        tracker
1693            .check(&source2, 5)
1694            .expect("source2 seq 5 should succeed");
1695
1696        // Replay from same source should fail
1697        let result = tracker.check(&source1, 5);
1698        assert!(matches!(result, Err(BypassError::ReplayDetected { .. })));
1699
1700        // But different sequence should work
1701        tracker
1702            .check(&source1, 6)
1703            .expect("source1 seq 6 should succeed");
1704    }
1705
1706    #[test]
1707    fn test_replay_tracker_window_advance() {
1708        let tracker = ReplayTracker::new(64);
1709        let source: IpAddr = "192.168.1.1".parse().unwrap();
1710
1711        // Start with a sequence in the middle of the range
1712        tracker.check(&source, 100).expect("100 should succeed");
1713        tracker.check(&source, 110).expect("110 should succeed");
1714        tracker.check(&source, 120).expect("120 should succeed");
1715
1716        // Much later sequence should succeed and advance the window
1717        tracker.check(&source, 200).expect("200 should succeed");
1718
1719        // Sequence 100 is now outside the window (more than 64 behind)
1720        // But we already marked it, so this tests window advancement
1721        tracker.check(&source, 201).expect("201 should succeed");
1722    }
1723
1724    #[test]
1725    fn test_replay_tracker_clear() {
1726        let tracker = ReplayTracker::new(64);
1727        let source: IpAddr = "192.168.1.1".parse().unwrap();
1728
1729        tracker.check(&source, 5).expect("initial should succeed");
1730
1731        // Clear and replay should now succeed
1732        tracker.clear_source(&source);
1733        tracker
1734            .check(&source, 5)
1735            .expect("after clear should succeed");
1736    }
1737
1738    #[test]
1739    fn test_metrics_snapshot_includes_security() {
1740        let metrics = BypassMetrics::default();
1741
1742        // Increment security metrics
1743        metrics.signature_rejected.fetch_add(1, Ordering::Relaxed);
1744        metrics.decryption_failed.fetch_add(2, Ordering::Relaxed);
1745        metrics.unauthorized_source.fetch_add(3, Ordering::Relaxed);
1746        metrics.replay_rejected.fetch_add(4, Ordering::Relaxed);
1747
1748        // Snapshot should include them
1749        let snapshot = metrics.snapshot();
1750        assert_eq!(snapshot.signature_rejected, 1);
1751        assert_eq!(snapshot.decryption_failed, 2);
1752        assert_eq!(snapshot.unauthorized_source, 3);
1753        assert_eq!(snapshot.replay_rejected, 4);
1754    }
1755
1756    #[test]
1757    fn test_bypass_error_display_all_variants() {
1758        let io_err = BypassError::Io(std::io::Error::new(std::io::ErrorKind::Other, "test io"));
1759        assert!(io_err.to_string().contains("IO error"));
1760        assert!(io_err.to_string().contains("test io"));
1761
1762        let encode_err = BypassError::Encode("bad data".to_string());
1763        assert!(encode_err.to_string().contains("Encode error"));
1764
1765        let decode_err = BypassError::Decode("corrupt".to_string());
1766        assert!(decode_err.to_string().contains("Decode error"));
1767
1768        let config_err = BypassError::Config("bad config".to_string());
1769        assert!(config_err.to_string().contains("Config error"));
1770
1771        let not_started = BypassError::NotStarted;
1772        assert!(not_started.to_string().contains("not started"));
1773
1774        let invalid_header = BypassError::InvalidHeader;
1775        assert!(invalid_header.to_string().contains("Invalid bypass header"));
1776
1777        let stale = BypassError::StaleMessage;
1778        assert!(stale.to_string().contains("stale"));
1779
1780        let sig_err = BypassError::InvalidSignature;
1781        assert!(sig_err.to_string().contains("Invalid message signature"));
1782
1783        let decrypt_err = BypassError::DecryptionFailed;
1784        assert!(decrypt_err.to_string().contains("decryption failed"));
1785
1786        let unauth = BypassError::UnauthorizedSource("10.0.0.1".parse().unwrap());
1787        assert!(unauth.to_string().contains("10.0.0.1"));
1788
1789        let replay = BypassError::ReplayDetected { sequence: 42 };
1790        assert!(replay.to_string().contains("42"));
1791
1792        let missing = BypassError::MissingCredential("encryption key".to_string());
1793        assert!(missing.to_string().contains("encryption key"));
1794    }
1795
1796    #[test]
1797    fn test_bypass_error_source() {
1798        let io_err = BypassError::Io(std::io::Error::new(std::io::ErrorKind::Other, "test"));
1799        assert!(std::error::Error::source(&io_err).is_some());
1800
1801        let encode_err = BypassError::Encode("test".into());
1802        assert!(std::error::Error::source(&encode_err).is_none());
1803
1804        let not_started = BypassError::NotStarted;
1805        assert!(std::error::Error::source(&not_started).is_none());
1806    }
1807
1808    #[test]
1809    fn test_bypass_error_from_io() {
1810        let io_err = std::io::Error::new(std::io::ErrorKind::NotFound, "gone");
1811        let bypass_err: BypassError = io_err.into();
1812        assert!(bypass_err.to_string().contains("gone"));
1813    }
1814
1815    #[test]
1816    fn test_message_encoding_display() {
1817        assert_eq!(MessageEncoding::Protobuf.to_string(), "protobuf");
1818        assert_eq!(MessageEncoding::Json.to_string(), "json");
1819        assert_eq!(MessageEncoding::Raw.to_string(), "raw");
1820        assert_eq!(MessageEncoding::Cbor.to_string(), "cbor");
1821    }
1822
1823    #[test]
1824    fn test_bypass_collection_config_ttl() {
1825        let config = BypassCollectionConfig {
1826            ttl_ms: 200,
1827            ..Default::default()
1828        };
1829        assert_eq!(config.ttl(), Duration::from_millis(200));
1830    }
1831
1832    #[test]
1833    fn test_bypass_collection_config_default() {
1834        let config = BypassCollectionConfig::default();
1835        assert!(config.collection.is_empty());
1836        assert_eq!(config.transport, BypassTransport::Unicast);
1837        assert_eq!(config.encoding, MessageEncoding::Protobuf);
1838        assert_eq!(config.ttl_ms, 5000);
1839        assert_eq!(config.priority, MessagePriority::Normal);
1840    }
1841
1842    #[test]
1843    fn test_udp_config_default() {
1844        let config = UdpConfig::default();
1845        assert_eq!(config.bind_port, 5150);
1846        assert_eq!(config.buffer_size, 65536);
1847        assert_eq!(config.multicast_ttl, 32);
1848    }
1849
1850    #[test]
1851    fn test_bypass_channel_config_default() {
1852        let config = BypassChannelConfig::default();
1853        assert!(config.collections.is_empty());
1854        assert!(!config.multicast_enabled);
1855        assert_eq!(config.max_message_size, 0);
1856    }
1857
1858    #[test]
1859    fn test_bypass_channel_config_new() {
1860        let config = BypassChannelConfig::new();
1861        assert!(config.multicast_enabled);
1862        assert_eq!(config.max_message_size, 65000);
1863    }
1864
1865    #[test]
1866    fn test_bypass_security_config_is_enabled_allowlist() {
1867        let config = BypassSecurityConfig {
1868            source_allowlist: Some(vec!["10.0.0.1".parse().unwrap()]),
1869            ..Default::default()
1870        };
1871        assert!(config.is_enabled());
1872    }
1873
1874    #[test]
1875    fn test_bypass_security_config_is_enabled_replay() {
1876        let config = BypassSecurityConfig {
1877            replay_protection: true,
1878            ..Default::default()
1879        };
1880        assert!(config.is_enabled());
1881    }
1882
1883    #[test]
1884    fn test_bypass_security_credentials_debug() {
1885        let creds = BypassSecurityCredentials::new();
1886        let debug = format!("{:?}", creds);
1887        assert!(debug.contains("has_signing_key"));
1888        assert!(debug.contains("false"));
1889        assert!(debug.contains("peer_keys_count"));
1890    }
1891
1892    #[test]
1893    fn test_bypass_security_credentials_verifying_key() {
1894        // Without signing key
1895        let creds = BypassSecurityCredentials::new();
1896        assert!(creds.verifying_key().is_none());
1897
1898        // With signing key
1899        let seed: [u8; 32] = [42u8; 32];
1900        let signing_key = ed25519_dalek::SigningKey::from_bytes(&seed);
1901        let creds = BypassSecurityCredentials::new().with_signing_key(signing_key.clone());
1902        let vk = creds.verifying_key();
1903        assert!(vk.is_some());
1904        assert_eq!(vk.unwrap(), signing_key.verifying_key());
1905    }
1906
1907    #[test]
1908    fn test_bypass_security_credentials_verify_unknown_peer() {
1909        let creds = BypassSecurityCredentials::new();
1910        let sig = ed25519_dalek::Signature::from_bytes(&[0u8; 64]);
1911        let result = creds.verify("unknown-peer", b"message", &sig);
1912        assert!(matches!(result, Err(BypassError::MissingCredential(_))));
1913    }
1914
1915    #[test]
1916    fn test_bypass_security_credentials_decrypt_no_key() {
1917        let creds = BypassSecurityCredentials::new();
1918        let result = creds.decrypt(b"ciphertext", &[0u8; 12]);
1919        assert!(matches!(result, Err(BypassError::MissingCredential(_))));
1920    }
1921
1922    #[test]
1923    fn test_replay_tracker_clear_all() {
1924        let tracker = ReplayTracker::new(64);
1925        let source1: IpAddr = "192.168.1.1".parse().unwrap();
1926        let source2: IpAddr = "192.168.1.2".parse().unwrap();
1927
1928        tracker.check(&source1, 1).unwrap();
1929        tracker.check(&source2, 1).unwrap();
1930
1931        tracker.clear_all();
1932
1933        // After clear_all, same sequences should succeed
1934        tracker.check(&source1, 1).unwrap();
1935        tracker.check(&source2, 1).unwrap();
1936    }
1937
1938    #[test]
1939    fn test_bypass_header_is_stale() {
1940        let header = BypassHeader::new("test", Duration::from_millis(100), 0);
1941        let now = Instant::now();
1942
1943        // Not stale: sent_at = now, received_at = now
1944        assert!(!header.is_stale(now, now));
1945
1946        // Stale: sent 1 second ago, TTL is 100ms
1947        let sent_at = now - Duration::from_secs(1);
1948        assert!(header.is_stale(now, sent_at));
1949    }
1950
1951    #[test]
1952    fn test_bypass_metrics_snapshot_default() {
1953        let snapshot = BypassMetricsSnapshot::default();
1954        assert_eq!(snapshot.messages_sent, 0);
1955        assert_eq!(snapshot.messages_received, 0);
1956        assert_eq!(snapshot.bytes_sent, 0);
1957        assert_eq!(snapshot.bytes_received, 0);
1958        assert_eq!(snapshot.stale_dropped, 0);
1959        assert_eq!(snapshot.invalid_dropped, 0);
1960        assert_eq!(snapshot.send_errors, 0);
1961        assert_eq!(snapshot.receive_errors, 0);
1962    }
1963
1964    #[tokio::test]
1965    async fn test_bypass_channel_subscribe_collection() {
1966        let config = BypassChannelConfig::new().with_collection(BypassCollectionConfig {
1967            collection: "positions".into(),
1968            ..Default::default()
1969        });
1970
1971        let channel = UdpBypassChannel::new(config).await.unwrap();
1972        let (hash, _rx) = channel.subscribe_collection("positions");
1973        assert_eq!(hash, BypassHeader::hash_collection("positions"));
1974    }
1975
1976    #[tokio::test]
1977    async fn test_bypass_channel_get_collection_by_hash() {
1978        let config = BypassChannelConfig::new().with_collection(BypassCollectionConfig {
1979            collection: "telemetry".into(),
1980            ttl_ms: 200,
1981            ..Default::default()
1982        });
1983
1984        let channel = UdpBypassChannel::new(config).await.unwrap();
1985        let hash = BypassHeader::hash_collection("telemetry");
1986        let col_config = channel.get_collection_by_hash(hash);
1987        assert!(col_config.is_some());
1988        assert_eq!(col_config.unwrap().ttl_ms, 200);
1989
1990        // Unknown hash
1991        assert!(channel.get_collection_by_hash(12345).is_none());
1992    }
1993
1994    #[tokio::test]
1995    async fn test_bypass_channel_config_accessor() {
1996        let config = BypassChannelConfig {
1997            max_message_size: 1024,
1998            ..BypassChannelConfig::new()
1999        };
2000        let channel = UdpBypassChannel::new(config).await.unwrap();
2001        assert_eq!(channel.config().max_message_size, 1024);
2002    }
2003
2004    #[tokio::test]
2005    async fn test_bypass_send_not_started() {
2006        let config = BypassChannelConfig::new();
2007        let channel = UdpBypassChannel::new(config).await.unwrap();
2008
2009        let result = channel
2010            .send(
2011                BypassTarget::Unicast("127.0.0.1:5000".parse().unwrap()),
2012                "test",
2013                b"data",
2014            )
2015            .await;
2016        assert!(matches!(result, Err(BypassError::NotStarted)));
2017    }
2018
2019    #[tokio::test]
2020    async fn test_bypass_send_message_too_large() {
2021        let config = BypassChannelConfig {
2022            max_message_size: 10,
2023            udp: UdpConfig {
2024                bind_port: 0,
2025                ..Default::default()
2026            },
2027            ..BypassChannelConfig::new()
2028        };
2029        let mut channel = UdpBypassChannel::new(config).await.unwrap();
2030        channel.start().await.unwrap();
2031
2032        let big_data = vec![0u8; 100];
2033        let result = channel
2034            .send(
2035                BypassTarget::Unicast("127.0.0.1:5000".parse().unwrap()),
2036                "test",
2037                &big_data,
2038            )
2039            .await;
2040        assert!(matches!(
2041            result,
2042            Err(BypassError::MessageTooLarge { size: 100, max: 10 })
2043        ));
2044
2045        channel.stop();
2046    }
2047
2048    #[tokio::test]
2049    async fn test_bypass_send_to_collection_unknown() {
2050        let config = BypassChannelConfig {
2051            udp: UdpConfig {
2052                bind_port: 0,
2053                ..Default::default()
2054            },
2055            ..BypassChannelConfig::new()
2056        };
2057        let mut channel = UdpBypassChannel::new(config).await.unwrap();
2058        channel.start().await.unwrap();
2059
2060        let result = channel.send_to_collection("unknown", None, b"data").await;
2061        assert!(matches!(result, Err(BypassError::Config(_))));
2062
2063        channel.stop();
2064    }
2065
2066    #[tokio::test]
2067    async fn test_bypass_send_to_collection_unicast_no_addr() {
2068        let config = BypassChannelConfig {
2069            udp: UdpConfig {
2070                bind_port: 0,
2071                ..Default::default()
2072            },
2073            collections: vec![BypassCollectionConfig {
2074                collection: "test".into(),
2075                transport: BypassTransport::Unicast,
2076                ..Default::default()
2077            }],
2078            ..BypassChannelConfig::new()
2079        };
2080        let mut channel = UdpBypassChannel::new(config).await.unwrap();
2081        channel.start().await.unwrap();
2082
2083        // Unicast without target address should error
2084        let result = channel.send_to_collection("test", None, b"data").await;
2085        assert!(matches!(result, Err(BypassError::Config(_))));
2086
2087        channel.stop();
2088    }
2089
2090    #[tokio::test]
2091    async fn test_bypass_send_to_collection_broadcast() {
2092        let config = BypassChannelConfig {
2093            udp: UdpConfig {
2094                bind_port: 0,
2095                ..Default::default()
2096            },
2097            collections: vec![BypassCollectionConfig {
2098                collection: "bcast".into(),
2099                transport: BypassTransport::Broadcast,
2100                ..Default::default()
2101            }],
2102            ..BypassChannelConfig::new()
2103        };
2104        let mut channel = UdpBypassChannel::new(config).await.unwrap();
2105        channel.start().await.unwrap();
2106
2107        // Broadcast should work without target address
2108        // Note: actual broadcast send may fail on some systems, but the path is exercised
2109        let _result = channel.send_to_collection("bcast", None, b"data").await;
2110
2111        channel.stop();
2112    }
2113
2114    #[tokio::test]
2115    async fn test_bypass_leave_multicast_no_socket() {
2116        let config = BypassChannelConfig::new();
2117        let channel = UdpBypassChannel::new(config).await.unwrap();
2118
2119        // Leaving a group we never joined should be ok
2120        let result = channel.leave_multicast("239.1.1.1".parse().unwrap());
2121        assert!(result.is_ok());
2122    }
2123
2124    #[test]
2125    fn test_bypass_transport_serde() {
2126        let unicast = BypassTransport::Unicast;
2127        let json = serde_json::to_string(&unicast).unwrap();
2128        let parsed: BypassTransport = serde_json::from_str(&json).unwrap();
2129        assert_eq!(parsed, BypassTransport::Unicast);
2130
2131        let multicast = BypassTransport::Multicast {
2132            group: "239.1.1.100".parse().unwrap(),
2133            port: 5150,
2134        };
2135        let json = serde_json::to_string(&multicast).unwrap();
2136        let parsed: BypassTransport = serde_json::from_str(&json).unwrap();
2137        assert_eq!(parsed, multicast);
2138    }
2139
2140    #[test]
2141    fn test_message_encoding_serde() {
2142        for encoding in &[
2143            MessageEncoding::Protobuf,
2144            MessageEncoding::Json,
2145            MessageEncoding::Raw,
2146            MessageEncoding::Cbor,
2147        ] {
2148            let json = serde_json::to_string(encoding).unwrap();
2149            let parsed: MessageEncoding = serde_json::from_str(&json).unwrap();
2150            assert_eq!(parsed, *encoding);
2151        }
2152    }
2153
2154    #[test]
2155    fn test_bypass_header_flags() {
2156        let mut header = BypassHeader::new("test", Duration::from_millis(1000), 0);
2157
2158        // No flags by default
2159        assert_eq!(header.flags, 0);
2160        assert_eq!(header.flags & BypassHeader::FLAG_SIGNED, 0);
2161        assert_eq!(header.flags & BypassHeader::FLAG_ENCRYPTED, 0);
2162
2163        // Set signed flag
2164        header.flags |= BypassHeader::FLAG_SIGNED;
2165        assert_ne!(header.flags & BypassHeader::FLAG_SIGNED, 0);
2166        assert_eq!(header.flags & BypassHeader::FLAG_ENCRYPTED, 0);
2167
2168        // Set encrypted flag
2169        header.flags |= BypassHeader::FLAG_ENCRYPTED;
2170        assert_ne!(header.flags & BypassHeader::FLAG_SIGNED, 0);
2171        assert_ne!(header.flags & BypassHeader::FLAG_ENCRYPTED, 0);
2172
2173        // Encode/decode preserves flags
2174        let encoded = header.encode();
2175        let decoded = BypassHeader::decode(&encoded).unwrap();
2176        assert_eq!(decoded.flags, header.flags);
2177    }
2178}