Skip to main content

peat_mesh/transport/
bypass.rs

1//! UDP Bypass Channel for ephemeral data
2//!
3//! Provides a direct UDP pathway that bypasses the CRDT sync engine for
4//! high-frequency, low-latency, or bandwidth-constrained scenarios.
5//!
6//! ## Use Cases
7//!
8//! - High-frequency telemetry (10-100 Hz position updates)
9//! - Low-latency commands (<50ms delivery)
10//! - Bandwidth-constrained links (9.6kbps tactical radio)
11//! - Multicast/broadcast to cell members
12//!
13//! ## Architecture
14//!
15//! ```text
16//! ┌─────────────────────────────────────────────────────────────────┐
17//! │                   Data Flow with Bypass                          │
18//! │                                                                  │
19//! │   Application                                                    │
20//! │       │                                                          │
21//! │       ├──────────────────────────┐                               │
22//! │       ▼                          ▼                               │
23//! │   ┌─────────────┐          ┌─────────────┐                       │
24//! │   │ CRDT Store  │          │  UDP Bypass │ ◄── Ephemeral data    │
25//! │   │ (Automerge) │          │   Channel   │                       │
26//! │   └──────┬──────┘          └──────┬──────┘                       │
27//! │          │                        │                              │
28//! │          ▼                        ▼                              │
29//! │   ┌─────────────┐          ┌─────────────┐                       │
30//! │   │   Iroh      │          │    Raw      │                       │
31//! │   │  Transport  │          │    UDP      │                       │
32//! │   └─────────────┘          └─────────────┘                       │
33//! │                                                                  │
34//! └─────────────────────────────────────────────────────────────────┘
35//! ```
36//!
37//! ## Example
38//!
39//! ```ignore
40//! use peat_mesh::transport::bypass::{UdpBypassChannel, BypassChannelConfig};
41//!
42//! // Create bypass channel
43//! let config = BypassChannelConfig::default();
44//! let channel = UdpBypassChannel::new(config).await?;
45//!
46//! // Send position update via bypass (no CRDT overhead)
47//! channel.send(
48//!     BypassTarget::Multicast { group: "239.1.1.100".parse()?, port: 5150 },
49//!     "position_updates",
50//!     &position_bytes,
51//! ).await?;
52//!
53//! // Subscribe to incoming bypass messages
54//! let mut rx = channel.subscribe("position_updates");
55//! while let Some(msg) = rx.recv().await {
56//!     println!("Received from {}: {:?}", msg.source, msg.data);
57//! }
58//! ```
59
60use std::collections::HashMap;
61use std::hash::{Hash, Hasher};
62use std::net::{IpAddr, Ipv4Addr, SocketAddr};
63use std::sync::atomic::{AtomicBool, AtomicU64, AtomicU8, Ordering};
64use std::sync::{Arc, RwLock};
65use std::time::{Duration, Instant};
66
67use chacha20poly1305::{
68    aead::{Aead, KeyInit},
69    ChaCha20Poly1305, Nonce,
70};
71use ed25519_dalek::{Signature, Signer, SigningKey, Verifier, VerifyingKey};
72use serde::{Deserialize, Serialize};
73use tokio::net::UdpSocket;
74use tokio::sync::broadcast;
75use tracing::{debug, info};
76
77use super::MessagePriority;
78
79// =============================================================================
80// Error Types
81// =============================================================================
82
83/// Error type for bypass channel operations
84#[derive(Debug)]
85pub enum BypassError {
86    /// IO error (socket operations)
87    Io(std::io::Error),
88    /// Encoding error
89    Encode(String),
90    /// Decoding error
91    Decode(String),
92    /// Invalid configuration
93    Config(String),
94    /// Channel not started
95    NotStarted,
96    /// Message too large
97    MessageTooLarge { size: usize, max: usize },
98    /// Invalid header
99    InvalidHeader,
100    /// Message is stale (past TTL)
101    StaleMessage,
102    /// Signature verification failed
103    InvalidSignature,
104    /// Decryption failed
105    DecryptionFailed,
106    /// Source IP not in allowlist
107    UnauthorizedSource(IpAddr),
108    /// Replay attack detected (duplicate sequence)
109    ReplayDetected { sequence: u8 },
110    /// Missing security credential (key not configured)
111    MissingCredential(String),
112}
113
114impl std::fmt::Display for BypassError {
115    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
116        match self {
117            BypassError::Io(e) => write!(f, "IO error: {}", e),
118            BypassError::Encode(msg) => write!(f, "Encode error: {}", msg),
119            BypassError::Decode(msg) => write!(f, "Decode error: {}", msg),
120            BypassError::Config(msg) => write!(f, "Config error: {}", msg),
121            BypassError::NotStarted => write!(f, "Bypass channel not started"),
122            BypassError::MessageTooLarge { size, max } => {
123                write!(f, "Message too large: {} bytes (max {})", size, max)
124            }
125            BypassError::InvalidHeader => write!(f, "Invalid bypass header"),
126            BypassError::StaleMessage => write!(f, "Message is stale (past TTL)"),
127            BypassError::InvalidSignature => write!(f, "Invalid message signature"),
128            BypassError::DecryptionFailed => write!(f, "Message decryption failed"),
129            BypassError::UnauthorizedSource(ip) => {
130                write!(f, "Unauthorized source IP: {}", ip)
131            }
132            BypassError::ReplayDetected { sequence } => {
133                write!(f, "Replay attack detected (sequence {})", sequence)
134            }
135            BypassError::MissingCredential(what) => {
136                write!(f, "Missing security credential: {}", what)
137            }
138        }
139    }
140}
141
142impl std::error::Error for BypassError {
143    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
144        match self {
145            BypassError::Io(e) => Some(e),
146            _ => None,
147        }
148    }
149}
150
151impl From<std::io::Error> for BypassError {
152    fn from(err: std::io::Error) -> Self {
153        BypassError::Io(err)
154    }
155}
156
157pub type Result<T> = std::result::Result<T, BypassError>;
158
159// =============================================================================
160// Configuration Types
161// =============================================================================
162
163/// Transport mode for bypass messages
164#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize, Deserialize)]
165#[serde(tag = "type", rename_all = "snake_case")]
166pub enum BypassTransport {
167    /// UDP unicast to specific peer
168    #[default]
169    Unicast,
170    /// UDP multicast to group
171    Multicast {
172        /// Multicast group address
173        group: IpAddr,
174        /// Port number
175        port: u16,
176    },
177    /// UDP broadcast on subnet
178    Broadcast,
179}
180
181/// Message encoding format
182#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
183#[serde(rename_all = "snake_case")]
184pub enum MessageEncoding {
185    /// Protobuf (recommended - compact)
186    #[default]
187    Protobuf,
188    /// JSON (debugging)
189    Json,
190    /// Raw bytes (minimal overhead)
191    Raw,
192    /// CBOR (compact binary)
193    Cbor,
194}
195
196impl std::fmt::Display for MessageEncoding {
197    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
198        match self {
199            MessageEncoding::Protobuf => write!(f, "protobuf"),
200            MessageEncoding::Json => write!(f, "json"),
201            MessageEncoding::Raw => write!(f, "raw"),
202            MessageEncoding::Cbor => write!(f, "cbor"),
203        }
204    }
205}
206
207/// Configuration for a bypass collection
208#[derive(Debug, Clone, Serialize, Deserialize)]
209pub struct BypassCollectionConfig {
210    /// Collection name
211    pub collection: String,
212    /// Transport mode for this collection
213    pub transport: BypassTransport,
214    /// Message encoding format
215    pub encoding: MessageEncoding,
216    /// Time-to-live for messages in milliseconds
217    #[serde(default = "default_ttl_ms")]
218    pub ttl_ms: u64,
219    /// QoS priority for bandwidth allocation
220    #[serde(default)]
221    pub priority: MessagePriority,
222}
223
224fn default_ttl_ms() -> u64 {
225    5000
226}
227
228impl BypassCollectionConfig {
229    /// Get TTL as Duration
230    pub fn ttl(&self) -> Duration {
231        Duration::from_millis(self.ttl_ms)
232    }
233}
234
235impl Default for BypassCollectionConfig {
236    fn default() -> Self {
237        Self {
238            collection: String::new(),
239            transport: BypassTransport::Unicast,
240            encoding: MessageEncoding::Protobuf,
241            ttl_ms: 5000,
242            priority: MessagePriority::Normal,
243        }
244    }
245}
246
247/// UDP configuration
248#[derive(Debug, Clone, Serialize, Deserialize)]
249pub struct UdpConfig {
250    /// Bind port (0 = ephemeral)
251    pub bind_port: u16,
252    /// Buffer size for receiving
253    pub buffer_size: usize,
254    /// Multicast TTL (hop count)
255    pub multicast_ttl: u32,
256}
257
258impl Default for UdpConfig {
259    fn default() -> Self {
260        Self {
261            bind_port: 5150,
262            buffer_size: 65536,
263            multicast_ttl: 32,
264        }
265    }
266}
267
268/// Configuration for bypass channel
269#[derive(Debug, Clone, Default, Serialize, Deserialize)]
270pub struct BypassChannelConfig {
271    /// UDP configuration
272    pub udp: UdpConfig,
273    /// Collections that use bypass
274    pub collections: Vec<BypassCollectionConfig>,
275    /// Enable multicast support
276    pub multicast_enabled: bool,
277    /// Maximum message size
278    pub max_message_size: usize,
279}
280
281impl BypassChannelConfig {
282    /// Create new configuration with defaults
283    pub fn new() -> Self {
284        Self {
285            udp: UdpConfig::default(),
286            collections: Vec::new(),
287            multicast_enabled: true,
288            max_message_size: 65000, // Leave room for header
289        }
290    }
291
292    /// Add a collection to bypass
293    pub fn with_collection(mut self, config: BypassCollectionConfig) -> Self {
294        self.collections.push(config);
295        self
296    }
297
298    /// Get configuration for a collection
299    pub fn get_collection(&self, name: &str) -> Option<&BypassCollectionConfig> {
300        self.collections.iter().find(|c| c.collection == name)
301    }
302
303    /// Check if a collection uses bypass
304    pub fn is_bypass_collection(&self, name: &str) -> bool {
305        self.collections.iter().any(|c| c.collection == name)
306    }
307}
308
309// =============================================================================
310// Security Configuration (ADR-042 Phase 5)
311// =============================================================================
312
313/// Security configuration for bypass channel
314///
315/// Since bypass messages don't go through Iroh's authenticated QUIC transport,
316/// optional security features can be enabled to protect against:
317/// - Message forgery (signing)
318/// - Eavesdropping (encryption)
319/// - Unauthorized sources (allowlisting)
320/// - Replay attacks (sequence window)
321///
322/// ## Security Tradeoffs
323///
324/// | Feature | Overhead | Latency Impact | Protection |
325/// |---------|----------|----------------|------------|
326/// | Signing | +64 bytes | +0.1-0.5ms | Authenticity |
327/// | Encryption | +16 bytes | +0.1-0.3ms | Confidentiality |
328/// | Allowlist | 0 bytes | ~0ms | Source filtering |
329/// | Replay | 0 bytes | ~0ms | Freshness |
330///
331/// ## When to Use
332///
333/// - **High-frequency telemetry**: Signing only (authenticity without encryption overhead)
334/// - **Sensitive commands**: Full encryption + signing
335/// - **Trusted network**: Allowlist only (minimal overhead)
336/// - **Untrusted network**: All features enabled
337///
338/// ## Example
339///
340/// ```rust
341/// use peat_mesh::transport::bypass::BypassSecurityConfig;
342///
343/// // Minimal security for high-frequency telemetry
344/// let config = BypassSecurityConfig {
345///     require_signature: true,
346///     encrypt_payload: false,
347///     ..Default::default()
348/// };
349///
350/// // Full security for sensitive commands
351/// let config = BypassSecurityConfig::full_security();
352/// ```
353#[derive(Debug, Clone, Default)]
354pub struct BypassSecurityConfig {
355    /// Require Ed25519 signature on all messages
356    ///
357    /// When enabled:
358    /// - Outgoing messages are signed with the node's signing key
359    /// - Incoming messages without valid signatures are rejected
360    /// - Adds ~64 bytes overhead per message
361    pub require_signature: bool,
362
363    /// Encrypt payload with formation key (ChaCha20-Poly1305)
364    ///
365    /// When enabled:
366    /// - Payload is encrypted before sending
367    /// - Header remains unencrypted for routing
368    /// - Adds ~16 bytes overhead (Poly1305 auth tag)
369    pub encrypt_payload: bool,
370
371    /// Filter sources by known peer addresses
372    ///
373    /// When Some:
374    /// - Only accept messages from listed IP addresses
375    /// - Messages from unknown IPs are rejected
376    /// - Useful for trusted network segments
377    pub source_allowlist: Option<Vec<IpAddr>>,
378
379    /// Enable replay protection
380    ///
381    /// When enabled:
382    /// - Track seen sequence numbers per source
383    /// - Reject duplicate sequence numbers within window
384    /// - Window size: 64 sequences (covers ~1 minute at 1 Hz)
385    pub replay_protection: bool,
386
387    /// Replay window size (default: 64)
388    ///
389    /// Number of sequence numbers to track per source.
390    /// Larger windows use more memory but handle more reordering.
391    pub replay_window_size: usize,
392}
393
394impl BypassSecurityConfig {
395    /// Create configuration with no security (fastest)
396    pub fn none() -> Self {
397        Self::default()
398    }
399
400    /// Create configuration with signature only (authenticity)
401    pub fn signed() -> Self {
402        Self {
403            require_signature: true,
404            ..Default::default()
405        }
406    }
407
408    /// Create configuration with full security (all features)
409    pub fn full_security() -> Self {
410        Self {
411            require_signature: true,
412            encrypt_payload: true,
413            source_allowlist: None, // Set separately based on known peers
414            replay_protection: true,
415            replay_window_size: 64,
416        }
417    }
418
419    /// Check if any security features are enabled
420    pub fn is_enabled(&self) -> bool {
421        self.require_signature
422            || self.encrypt_payload
423            || self.source_allowlist.is_some()
424            || self.replay_protection
425    }
426
427    /// Check if source IP is allowed
428    pub fn is_source_allowed(&self, ip: &IpAddr) -> bool {
429        match &self.source_allowlist {
430            Some(list) => list.contains(ip),
431            None => true, // No allowlist = all sources allowed
432        }
433    }
434}
435
436/// Security credentials for bypass channel
437///
438/// Contains the cryptographic keys needed for signing and encryption.
439#[derive(Clone, Default)]
440pub struct BypassSecurityCredentials {
441    /// Ed25519 signing key (private)
442    signing_key: Option<SigningKey>,
443
444    /// Ed25519 verifying keys for known peers (public)
445    /// Maps peer ID or IP to their public key
446    peer_keys: HashMap<String, VerifyingKey>,
447
448    /// ChaCha20-Poly1305 encryption key (shared formation key)
449    /// 256-bit key shared among formation members
450    encryption_key: Option<[u8; 32]>,
451}
452
453impl std::fmt::Debug for BypassSecurityCredentials {
454    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
455        f.debug_struct("BypassSecurityCredentials")
456            .field("has_signing_key", &self.signing_key.is_some())
457            .field("peer_keys_count", &self.peer_keys.len())
458            .field("has_encryption_key", &self.encryption_key.is_some())
459            .finish()
460    }
461}
462
463impl BypassSecurityCredentials {
464    /// Create new empty credentials
465    pub fn new() -> Self {
466        Self::default()
467    }
468
469    /// Set the signing key
470    pub fn with_signing_key(mut self, key: SigningKey) -> Self {
471        self.signing_key = Some(key);
472        self
473    }
474
475    /// Add a peer's verifying key
476    pub fn with_peer_key(mut self, peer_id: impl Into<String>, key: VerifyingKey) -> Self {
477        self.peer_keys.insert(peer_id.into(), key);
478        self
479    }
480
481    /// Set the encryption key
482    pub fn with_encryption_key(mut self, key: [u8; 32]) -> Self {
483        self.encryption_key = Some(key);
484        self
485    }
486
487    /// Get the verifying key for our signing key
488    pub fn verifying_key(&self) -> Option<VerifyingKey> {
489        self.signing_key.as_ref().map(|k| k.verifying_key())
490    }
491
492    /// Sign a message
493    pub fn sign(&self, message: &[u8]) -> Result<Signature> {
494        let key = self
495            .signing_key
496            .as_ref()
497            .ok_or_else(|| BypassError::MissingCredential("signing key".into()))?;
498        Ok(key.sign(message))
499    }
500
501    /// Verify a signature from a known peer
502    pub fn verify(&self, peer_id: &str, message: &[u8], signature: &Signature) -> Result<()> {
503        let key = self
504            .peer_keys
505            .get(peer_id)
506            .ok_or_else(|| BypassError::MissingCredential(format!("peer key for {}", peer_id)))?;
507        key.verify(message, signature)
508            .map_err(|_| BypassError::InvalidSignature)
509    }
510
511    /// Verify a signature using IP address as peer identifier
512    pub fn verify_by_ip(&self, ip: &IpAddr, message: &[u8], signature: &Signature) -> Result<()> {
513        self.verify(&ip.to_string(), message, signature)
514    }
515
516    /// Encrypt payload
517    pub fn encrypt(&self, plaintext: &[u8], nonce: &[u8; 12]) -> Result<Vec<u8>> {
518        let key = self
519            .encryption_key
520            .as_ref()
521            .ok_or_else(|| BypassError::MissingCredential("encryption key".into()))?;
522
523        let cipher = ChaCha20Poly1305::new_from_slice(key)
524            .map_err(|_| BypassError::Config("Invalid encryption key".into()))?;
525
526        let nonce = Nonce::from_slice(nonce);
527        cipher
528            .encrypt(nonce, plaintext)
529            .map_err(|_| BypassError::Encode("Encryption failed".into()))
530    }
531
532    /// Decrypt payload
533    pub fn decrypt(&self, ciphertext: &[u8], nonce: &[u8; 12]) -> Result<Vec<u8>> {
534        let key = self
535            .encryption_key
536            .as_ref()
537            .ok_or_else(|| BypassError::MissingCredential("encryption key".into()))?;
538
539        let cipher = ChaCha20Poly1305::new_from_slice(key)
540            .map_err(|_| BypassError::Config("Invalid encryption key".into()))?;
541
542        let nonce = Nonce::from_slice(nonce);
543        cipher
544            .decrypt(nonce, ciphertext)
545            .map_err(|_| BypassError::DecryptionFailed)
546    }
547}
548
549/// Replay protection tracker
550///
551/// Tracks seen sequence numbers per source to detect replay attacks.
552/// Uses a sliding window to handle out-of-order delivery.
553#[derive(Debug, Default)]
554pub struct ReplayTracker {
555    /// Seen sequences per source IP
556    /// Each entry is a bitset representing seen sequences
557    windows: RwLock<HashMap<IpAddr, ReplayWindow>>,
558
559    /// Window size
560    window_size: usize,
561}
562
563/// Sliding window for replay detection
564#[derive(Debug)]
565struct ReplayWindow {
566    /// Highest sequence seen
567    highest_seq: u8,
568    /// Bitmap of seen sequences (relative to highest)
569    seen: u64,
570}
571
572impl ReplayWindow {
573    fn new() -> Self {
574        Self {
575            highest_seq: 0,
576            seen: 0,
577        }
578    }
579
580    /// Check if sequence is valid (not a replay)
581    /// Returns true if valid, false if replay
582    fn check_and_update(&mut self, seq: u8, window_size: usize) -> bool {
583        let window_size = window_size.min(64) as u8;
584
585        // Calculate distance from highest seen
586        let diff = self.highest_seq.wrapping_sub(seq);
587
588        if diff == 0 && self.seen == 0 {
589            // First packet
590            self.highest_seq = seq;
591            self.seen = 1;
592            return true;
593        }
594
595        if seq == self.highest_seq {
596            // Exact replay of highest
597            return false;
598        }
599
600        // Check if seq is ahead of highest
601        let ahead = seq.wrapping_sub(self.highest_seq);
602        if ahead > 0 && ahead < 128 {
603            // New highest sequence
604            let shift = ahead as u32;
605            if shift < 64 {
606                self.seen = (self.seen << shift) | 1;
607            } else {
608                self.seen = 1;
609            }
610            self.highest_seq = seq;
611            return true;
612        }
613
614        // Check if seq is within window
615        if diff < window_size && diff < 64 {
616            let bit = 1u64 << diff;
617            if self.seen & bit != 0 {
618                // Already seen
619                return false;
620            }
621            self.seen |= bit;
622            return true;
623        }
624
625        // Too old
626        false
627    }
628}
629
630impl ReplayTracker {
631    /// Create new replay tracker
632    pub fn new(window_size: usize) -> Self {
633        Self {
634            windows: RwLock::new(HashMap::new()),
635            window_size: window_size.min(64),
636        }
637    }
638
639    /// Check if message is valid (not a replay)
640    /// Returns Ok(()) if valid, Err if replay detected
641    pub fn check(&self, source: &IpAddr, sequence: u8) -> Result<()> {
642        let mut windows = self.windows.write().unwrap_or_else(|e| e.into_inner());
643        let window = windows.entry(*source).or_insert_with(ReplayWindow::new);
644
645        if window.check_and_update(sequence, self.window_size) {
646            Ok(())
647        } else {
648            Err(BypassError::ReplayDetected { sequence })
649        }
650    }
651
652    /// Clear tracking for a source
653    pub fn clear_source(&self, source: &IpAddr) {
654        self.windows
655            .write()
656            .unwrap_or_else(|e| e.into_inner())
657            .remove(source);
658    }
659
660    /// Clear all tracking
661    pub fn clear_all(&self) {
662        self.windows
663            .write()
664            .unwrap_or_else(|e| e.into_inner())
665            .clear();
666    }
667}
668
669// =============================================================================
670// Bypass Header (12 bytes)
671// =============================================================================
672
673/// Bypass message header
674///
675/// Compact 12-byte header for bypass messages:
676/// - Magic: 4 bytes ("PEAT")
677/// - Collection hash: 4 bytes (FNV-1a hash of collection name)
678/// - TTL: 2 bytes (milliseconds, max ~65s)
679/// - Flags: 1 byte
680/// - Sequence: 1 byte (wrapping counter)
681#[derive(Debug, Clone, Copy)]
682pub struct BypassHeader {
683    /// Magic number (0x50454154 = "PEAT")
684    pub magic: [u8; 4],
685    /// Collection name hash (FNV-1a)
686    pub collection_hash: u32,
687    /// TTL in milliseconds
688    pub ttl_ms: u16,
689    /// Flags
690    pub flags: u8,
691    /// Sequence number
692    pub sequence: u8,
693}
694
695impl BypassHeader {
696    /// Magic bytes: "PEAT"
697    pub const MAGIC: [u8; 4] = [0x45, 0x43, 0x48, 0x45];
698
699    /// Header size in bytes
700    pub const SIZE: usize = 12;
701
702    /// Flag: message is compressed
703    pub const FLAG_COMPRESSED: u8 = 0x01;
704    /// Flag: message is encrypted
705    pub const FLAG_ENCRYPTED: u8 = 0x02;
706    /// Flag: message is signed
707    pub const FLAG_SIGNED: u8 = 0x04;
708
709    /// Create a new header
710    pub fn new(collection: &str, ttl: Duration, sequence: u8) -> Self {
711        Self {
712            magic: Self::MAGIC,
713            collection_hash: Self::hash_collection(collection),
714            ttl_ms: ttl.as_millis().min(u16::MAX as u128) as u16,
715            flags: 0,
716            sequence,
717        }
718    }
719
720    /// Hash a collection name using FNV-1a
721    pub fn hash_collection(name: &str) -> u32 {
722        let mut hasher = fnv::FnvHasher::default();
723        name.hash(&mut hasher);
724        hasher.finish() as u32
725    }
726
727    /// Check if header has valid magic
728    pub fn is_valid(&self) -> bool {
729        self.magic == Self::MAGIC
730    }
731
732    /// Encode header to bytes
733    pub fn encode(&self) -> [u8; Self::SIZE] {
734        let mut buf = [0u8; Self::SIZE];
735        buf[0..4].copy_from_slice(&self.magic);
736        buf[4..8].copy_from_slice(&self.collection_hash.to_be_bytes());
737        buf[8..10].copy_from_slice(&self.ttl_ms.to_be_bytes());
738        buf[10] = self.flags;
739        buf[11] = self.sequence;
740        buf
741    }
742
743    /// Decode header from bytes
744    pub fn decode(buf: &[u8]) -> Result<Self> {
745        if buf.len() < Self::SIZE {
746            return Err(BypassError::InvalidHeader);
747        }
748
749        let mut magic = [0u8; 4];
750        magic.copy_from_slice(&buf[0..4]);
751
752        if magic != Self::MAGIC {
753            return Err(BypassError::InvalidHeader);
754        }
755
756        let collection_hash = u32::from_be_bytes([buf[4], buf[5], buf[6], buf[7]]);
757        let ttl_ms = u16::from_be_bytes([buf[8], buf[9]]);
758        let flags = buf[10];
759        let sequence = buf[11];
760
761        Ok(Self {
762            magic,
763            collection_hash,
764            ttl_ms,
765            flags,
766            sequence,
767        })
768    }
769
770    /// Check if message is stale based on TTL
771    pub fn is_stale(&self, received_at: Instant, sent_at: Instant) -> bool {
772        let elapsed = received_at.duration_since(sent_at);
773        elapsed > Duration::from_millis(self.ttl_ms as u64)
774    }
775}
776
777// =============================================================================
778// FNV-1a Hasher (simple, fast hash for collection names)
779// =============================================================================
780
781mod fnv {
782    use std::hash::Hasher;
783
784    const FNV_OFFSET_BASIS: u64 = 14695981039346656037;
785    const FNV_PRIME: u64 = 1099511628211;
786
787    #[derive(Default)]
788    pub struct FnvHasher(u64);
789
790    impl Hasher for FnvHasher {
791        fn write(&mut self, bytes: &[u8]) {
792            for byte in bytes {
793                self.0 ^= *byte as u64;
794                self.0 = self.0.wrapping_mul(FNV_PRIME);
795            }
796        }
797
798        fn finish(&self) -> u64 {
799            self.0
800        }
801    }
802
803    impl FnvHasher {
804        pub fn default() -> Self {
805            Self(FNV_OFFSET_BASIS)
806        }
807    }
808}
809
810// =============================================================================
811// Bypass Message
812// =============================================================================
813
814/// Incoming bypass message
815#[derive(Debug, Clone)]
816pub struct BypassMessage {
817    /// Source address
818    pub source: SocketAddr,
819    /// Collection hash (from header)
820    pub collection_hash: u32,
821    /// Message payload (decoded)
822    pub data: Vec<u8>,
823    /// When message was received
824    pub received_at: Instant,
825    /// Sequence number
826    pub sequence: u8,
827    /// Message priority (inferred from collection config)
828    pub priority: MessagePriority,
829}
830
831/// Target for bypass send
832#[derive(Debug, Clone)]
833pub enum BypassTarget {
834    /// Unicast to specific address
835    Unicast(SocketAddr),
836    /// Multicast to group
837    Multicast { group: IpAddr, port: u16 },
838    /// Broadcast on subnet
839    Broadcast { port: u16 },
840}
841
842// =============================================================================
843// Bypass Metrics
844// =============================================================================
845
846/// Metrics for bypass channel
847#[derive(Debug, Default)]
848pub struct BypassMetrics {
849    /// Messages sent
850    pub messages_sent: AtomicU64,
851    /// Messages received
852    pub messages_received: AtomicU64,
853    /// Bytes sent
854    pub bytes_sent: AtomicU64,
855    /// Bytes received
856    pub bytes_received: AtomicU64,
857    /// Messages dropped (stale)
858    pub stale_dropped: AtomicU64,
859    /// Messages dropped (invalid header)
860    pub invalid_dropped: AtomicU64,
861    /// Send errors
862    pub send_errors: AtomicU64,
863    /// Receive errors
864    pub receive_errors: AtomicU64,
865
866    // Security metrics (ADR-042 Phase 5)
867    /// Messages rejected due to invalid signature
868    pub signature_rejected: AtomicU64,
869    /// Messages rejected due to decryption failure
870    pub decryption_failed: AtomicU64,
871    /// Messages rejected due to unauthorized source
872    pub unauthorized_source: AtomicU64,
873    /// Messages rejected due to replay detection
874    pub replay_rejected: AtomicU64,
875}
876
877impl BypassMetrics {
878    /// Create snapshot of current metrics
879    pub fn snapshot(&self) -> BypassMetricsSnapshot {
880        BypassMetricsSnapshot {
881            messages_sent: self.messages_sent.load(Ordering::Relaxed),
882            messages_received: self.messages_received.load(Ordering::Relaxed),
883            bytes_sent: self.bytes_sent.load(Ordering::Relaxed),
884            bytes_received: self.bytes_received.load(Ordering::Relaxed),
885            stale_dropped: self.stale_dropped.load(Ordering::Relaxed),
886            invalid_dropped: self.invalid_dropped.load(Ordering::Relaxed),
887            send_errors: self.send_errors.load(Ordering::Relaxed),
888            receive_errors: self.receive_errors.load(Ordering::Relaxed),
889            signature_rejected: self.signature_rejected.load(Ordering::Relaxed),
890            decryption_failed: self.decryption_failed.load(Ordering::Relaxed),
891            unauthorized_source: self.unauthorized_source.load(Ordering::Relaxed),
892            replay_rejected: self.replay_rejected.load(Ordering::Relaxed),
893        }
894    }
895}
896
897/// Snapshot of bypass metrics
898#[derive(Debug, Clone, Default)]
899pub struct BypassMetricsSnapshot {
900    pub messages_sent: u64,
901    pub messages_received: u64,
902    pub bytes_sent: u64,
903    pub bytes_received: u64,
904    pub stale_dropped: u64,
905    pub invalid_dropped: u64,
906    pub send_errors: u64,
907    pub receive_errors: u64,
908
909    // Security metrics (ADR-042 Phase 5)
910    /// Messages rejected due to invalid signature
911    pub signature_rejected: u64,
912    /// Messages rejected due to decryption failure
913    pub decryption_failed: u64,
914    /// Messages rejected due to unauthorized source
915    pub unauthorized_source: u64,
916    /// Messages rejected due to replay detection
917    pub replay_rejected: u64,
918}
919
920// =============================================================================
921// UDP Bypass Channel
922// =============================================================================
923
924/// UDP Bypass Channel for ephemeral data
925///
926/// Provides direct UDP messaging that bypasses CRDT sync for
927/// low-latency, high-frequency data.
928pub struct UdpBypassChannel {
929    /// Configuration
930    config: BypassChannelConfig,
931
932    /// UDP socket for unicast/broadcast
933    socket: Option<Arc<UdpSocket>>,
934
935    /// Multicast sockets per group
936    multicast_sockets: RwLock<HashMap<IpAddr, Arc<UdpSocket>>>,
937
938    /// Collection hash to config mapping
939    collection_map: HashMap<u32, BypassCollectionConfig>,
940
941    /// Sequence counter
942    sequence: AtomicU8,
943
944    /// Metrics
945    metrics: Arc<BypassMetrics>,
946
947    /// Broadcast sender for incoming messages
948    incoming_tx: broadcast::Sender<BypassMessage>,
949
950    /// Running flag
951    running: Arc<AtomicBool>,
952}
953
954impl UdpBypassChannel {
955    /// Create a new bypass channel
956    pub async fn new(config: BypassChannelConfig) -> Result<Self> {
957        // Build collection hash map
958        let collection_map: HashMap<u32, BypassCollectionConfig> = config
959            .collections
960            .iter()
961            .map(|c| (BypassHeader::hash_collection(&c.collection), c.clone()))
962            .collect();
963
964        let (incoming_tx, _) = broadcast::channel(1024);
965
966        Ok(Self {
967            config,
968            socket: None,
969            multicast_sockets: RwLock::new(HashMap::new()),
970            collection_map,
971            sequence: AtomicU8::new(0),
972            metrics: Arc::new(BypassMetrics::default()),
973            incoming_tx,
974            running: Arc::new(AtomicBool::new(false)),
975        })
976    }
977
978    /// Start the bypass channel
979    pub async fn start(&mut self) -> Result<()> {
980        if self.running.load(Ordering::SeqCst) {
981            return Ok(());
982        }
983
984        // Bind UDP socket
985        let bind_addr = format!("0.0.0.0:{}", self.config.udp.bind_port);
986        let socket = UdpSocket::bind(&bind_addr).await?;
987        socket.set_broadcast(true)?;
988
989        let socket = Arc::new(socket);
990        self.socket = Some(socket.clone());
991
992        // Start receiver loop
993        let incoming_tx = self.incoming_tx.clone();
994        let metrics = self.metrics.clone();
995        let collection_map = self.collection_map.clone();
996        let buffer_size = self.config.udp.buffer_size;
997        let running = self.running.clone();
998
999        running.store(true, Ordering::SeqCst);
1000
1001        tokio::spawn(async move {
1002            let mut buf = vec![0u8; buffer_size];
1003
1004            while running.load(Ordering::SeqCst) {
1005                match tokio::time::timeout(Duration::from_millis(100), socket.recv_from(&mut buf))
1006                    .await
1007                {
1008                    Ok(Ok((len, src))) => {
1009                        let received_at = Instant::now();
1010
1011                        // Parse header
1012                        if len < BypassHeader::SIZE {
1013                            metrics.invalid_dropped.fetch_add(1, Ordering::Relaxed);
1014                            continue;
1015                        }
1016
1017                        let header = match BypassHeader::decode(&buf[..BypassHeader::SIZE]) {
1018                            Ok(h) => h,
1019                            Err(_) => {
1020                                metrics.invalid_dropped.fetch_add(1, Ordering::Relaxed);
1021                                continue;
1022                            }
1023                        };
1024
1025                        // Extract payload
1026                        let payload = buf[BypassHeader::SIZE..len].to_vec();
1027
1028                        // Look up collection config for priority
1029                        let priority = collection_map
1030                            .get(&header.collection_hash)
1031                            .map(|c| c.priority)
1032                            .unwrap_or(MessagePriority::Normal);
1033
1034                        let message = BypassMessage {
1035                            source: src,
1036                            collection_hash: header.collection_hash,
1037                            data: payload,
1038                            received_at,
1039                            sequence: header.sequence,
1040                            priority,
1041                        };
1042
1043                        metrics.messages_received.fetch_add(1, Ordering::Relaxed);
1044                        metrics
1045                            .bytes_received
1046                            .fetch_add(len as u64, Ordering::Relaxed);
1047
1048                        // Broadcast to subscribers (ignore if no subscribers)
1049                        let _ = incoming_tx.send(message);
1050                    }
1051                    Ok(Err(_e)) => {
1052                        metrics.receive_errors.fetch_add(1, Ordering::Relaxed);
1053                    }
1054                    Err(_) => {
1055                        // Timeout, just continue
1056                    }
1057                }
1058            }
1059        });
1060
1061        info!(
1062            "Bypass channel started on port {}",
1063            self.config.udp.bind_port
1064        );
1065        Ok(())
1066    }
1067
1068    /// Stop the bypass channel
1069    pub fn stop(&mut self) {
1070        self.running.store(false, Ordering::SeqCst);
1071        self.socket = None;
1072        self.multicast_sockets
1073            .write()
1074            .unwrap_or_else(|e| e.into_inner())
1075            .clear();
1076        info!("Bypass channel stopped");
1077    }
1078
1079    /// Check if channel is running
1080    pub fn is_running(&self) -> bool {
1081        self.running.load(Ordering::SeqCst)
1082    }
1083
1084    /// Get the next sequence number
1085    fn next_sequence(&self) -> u8 {
1086        self.sequence.fetch_add(1, Ordering::Relaxed)
1087    }
1088
1089    /// Send a message via bypass channel
1090    pub async fn send(&self, target: BypassTarget, collection: &str, data: &[u8]) -> Result<()> {
1091        let socket = self.socket.as_ref().ok_or(BypassError::NotStarted)?;
1092
1093        // Check message size (0 = unlimited)
1094        if self.config.max_message_size > 0 && data.len() > self.config.max_message_size {
1095            return Err(BypassError::MessageTooLarge {
1096                size: data.len(),
1097                max: self.config.max_message_size,
1098            });
1099        }
1100
1101        // Get TTL from collection config or use default
1102        let ttl = self
1103            .config
1104            .get_collection(collection)
1105            .map(|c| c.ttl())
1106            .unwrap_or(Duration::from_secs(5));
1107
1108        // Create header
1109        let header = BypassHeader::new(collection, ttl, self.next_sequence());
1110        let header_bytes = header.encode();
1111
1112        // Build frame
1113        let mut frame = Vec::with_capacity(BypassHeader::SIZE + data.len());
1114        frame.extend_from_slice(&header_bytes);
1115        frame.extend_from_slice(data);
1116
1117        // Send based on target
1118        let bytes_sent = match target {
1119            BypassTarget::Unicast(addr) => socket.send_to(&frame, addr).await?,
1120            BypassTarget::Multicast { group, port } => {
1121                let mcast_socket = self.get_or_create_multicast(group).await?;
1122                mcast_socket.send_to(&frame, (group, port)).await?
1123            }
1124            BypassTarget::Broadcast { port } => {
1125                let broadcast_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::BROADCAST), port);
1126                socket.send_to(&frame, broadcast_addr).await?
1127            }
1128        };
1129
1130        self.metrics.messages_sent.fetch_add(1, Ordering::Relaxed);
1131        self.metrics
1132            .bytes_sent
1133            .fetch_add(bytes_sent as u64, Ordering::Relaxed);
1134
1135        Ok(())
1136    }
1137
1138    /// Send with collection config (uses config's transport settings)
1139    pub async fn send_to_collection(
1140        &self,
1141        collection: &str,
1142        target_addr: Option<SocketAddr>,
1143        data: &[u8],
1144    ) -> Result<()> {
1145        let config = self
1146            .config
1147            .get_collection(collection)
1148            .ok_or_else(|| BypassError::Config(format!("Unknown collection: {}", collection)))?;
1149
1150        let target = match &config.transport {
1151            BypassTransport::Unicast => {
1152                let addr = target_addr.ok_or_else(|| {
1153                    BypassError::Config("Unicast requires target address".to_string())
1154                })?;
1155                BypassTarget::Unicast(addr)
1156            }
1157            BypassTransport::Multicast { group, port } => BypassTarget::Multicast {
1158                group: *group,
1159                port: *port,
1160            },
1161            BypassTransport::Broadcast => BypassTarget::Broadcast {
1162                port: self.config.udp.bind_port,
1163            },
1164        };
1165
1166        self.send(target, collection, data).await
1167    }
1168
1169    /// Subscribe to incoming bypass messages
1170    pub fn subscribe(&self) -> broadcast::Receiver<BypassMessage> {
1171        self.incoming_tx.subscribe()
1172    }
1173
1174    /// Subscribe to messages for a specific collection
1175    pub fn subscribe_collection(
1176        &self,
1177        collection: &str,
1178    ) -> (u32, broadcast::Receiver<BypassMessage>) {
1179        let hash = BypassHeader::hash_collection(collection);
1180        (hash, self.incoming_tx.subscribe())
1181    }
1182
1183    /// Get or create a multicast socket for a group
1184    async fn get_or_create_multicast(&self, group: IpAddr) -> Result<Arc<UdpSocket>> {
1185        // Check if already exists
1186        {
1187            let sockets = self
1188                .multicast_sockets
1189                .read()
1190                .unwrap_or_else(|e| e.into_inner());
1191            if let Some(socket) = sockets.get(&group) {
1192                return Ok(socket.clone());
1193            }
1194        }
1195
1196        // Create new multicast socket
1197        let socket = UdpSocket::bind("0.0.0.0:0").await?;
1198
1199        match group {
1200            IpAddr::V4(addr) => {
1201                socket.join_multicast_v4(addr, Ipv4Addr::UNSPECIFIED)?;
1202                socket.set_multicast_ttl_v4(self.config.udp.multicast_ttl)?;
1203            }
1204            IpAddr::V6(addr) => {
1205                socket.join_multicast_v6(&addr, 0)?;
1206            }
1207        }
1208
1209        let socket = Arc::new(socket);
1210        self.multicast_sockets
1211            .write()
1212            .unwrap()
1213            .insert(group, socket.clone());
1214
1215        debug!("Joined multicast group: {}", group);
1216        Ok(socket)
1217    }
1218
1219    /// Leave a multicast group
1220    pub fn leave_multicast(&self, group: IpAddr) -> Result<()> {
1221        if let Some(socket) = self
1222            .multicast_sockets
1223            .write()
1224            .unwrap_or_else(|e| e.into_inner())
1225            .remove(&group)
1226        {
1227            match group {
1228                IpAddr::V4(addr) => {
1229                    // Note: socket drop will leave the group, but explicit leave is cleaner
1230                    if let Ok(socket) = Arc::try_unwrap(socket) {
1231                        let _ = socket.leave_multicast_v4(addr, Ipv4Addr::UNSPECIFIED);
1232                    }
1233                }
1234                IpAddr::V6(addr) => {
1235                    if let Ok(socket) = Arc::try_unwrap(socket) {
1236                        let _ = socket.leave_multicast_v6(&addr, 0);
1237                    }
1238                }
1239            }
1240            debug!("Left multicast group: {}", group);
1241        }
1242        Ok(())
1243    }
1244
1245    /// Get current metrics
1246    pub fn metrics(&self) -> BypassMetricsSnapshot {
1247        self.metrics.snapshot()
1248    }
1249
1250    /// Get configuration
1251    pub fn config(&self) -> &BypassChannelConfig {
1252        &self.config
1253    }
1254
1255    /// Check if a collection is configured for bypass
1256    pub fn is_bypass_collection(&self, name: &str) -> bool {
1257        self.config.is_bypass_collection(name)
1258    }
1259
1260    /// Get collection config by hash
1261    pub fn get_collection_by_hash(&self, hash: u32) -> Option<&BypassCollectionConfig> {
1262        self.collection_map.get(&hash)
1263    }
1264}
1265
1266// =============================================================================
1267// Tests
1268// =============================================================================
1269
1270#[cfg(test)]
1271mod tests {
1272    use super::*;
1273
1274    #[test]
1275    fn test_bypass_header_encode_decode() {
1276        let header = BypassHeader::new("test_collection", Duration::from_millis(1000), 42);
1277        let encoded = header.encode();
1278        let decoded = BypassHeader::decode(&encoded).unwrap();
1279
1280        assert_eq!(decoded.magic, BypassHeader::MAGIC);
1281        assert_eq!(decoded.collection_hash, header.collection_hash);
1282        assert_eq!(decoded.ttl_ms, 1000);
1283        assert_eq!(decoded.sequence, 42);
1284        assert!(decoded.is_valid());
1285    }
1286
1287    #[test]
1288    fn test_bypass_header_invalid_magic() {
1289        let mut data = [0u8; 12];
1290        data[0..4].copy_from_slice(&[0, 0, 0, 0]);
1291        let result = BypassHeader::decode(&data);
1292        assert!(result.is_err());
1293    }
1294
1295    #[test]
1296    fn test_bypass_header_too_short() {
1297        let data = [0u8; 8];
1298        let result = BypassHeader::decode(&data);
1299        assert!(result.is_err());
1300    }
1301
1302    #[test]
1303    fn test_collection_hash_consistency() {
1304        let hash1 = BypassHeader::hash_collection("position_updates");
1305        let hash2 = BypassHeader::hash_collection("position_updates");
1306        let hash3 = BypassHeader::hash_collection("sensor_data");
1307
1308        assert_eq!(hash1, hash2);
1309        assert_ne!(hash1, hash3);
1310    }
1311
1312    #[test]
1313    fn test_bypass_config() {
1314        let config = BypassChannelConfig::new()
1315            .with_collection(BypassCollectionConfig {
1316                collection: "positions".into(),
1317                transport: BypassTransport::Multicast {
1318                    group: "239.1.1.100".parse().unwrap(),
1319                    port: 5150,
1320                },
1321                encoding: MessageEncoding::Protobuf,
1322                ttl_ms: 200,
1323                priority: MessagePriority::High,
1324            })
1325            .with_collection(BypassCollectionConfig {
1326                collection: "telemetry".into(),
1327                transport: BypassTransport::Unicast,
1328                encoding: MessageEncoding::Cbor,
1329                ttl_ms: 5000,
1330                priority: MessagePriority::Normal,
1331            });
1332
1333        assert!(config.is_bypass_collection("positions"));
1334        assert!(config.is_bypass_collection("telemetry"));
1335        assert!(!config.is_bypass_collection("unknown"));
1336
1337        let pos_config = config.get_collection("positions").unwrap();
1338        assert_eq!(pos_config.priority, MessagePriority::High);
1339    }
1340
1341    #[test]
1342    fn test_ttl_clamping() {
1343        // TTL greater than u16::MAX should be clamped
1344        let header = BypassHeader::new("test", Duration::from_secs(1000), 0);
1345        assert_eq!(header.ttl_ms, u16::MAX);
1346    }
1347
1348    #[tokio::test]
1349    async fn test_bypass_channel_creation() {
1350        let config = BypassChannelConfig::new().with_collection(BypassCollectionConfig {
1351            collection: "test".into(),
1352            ..Default::default()
1353        });
1354
1355        let channel = UdpBypassChannel::new(config).await.unwrap();
1356        assert!(!channel.is_running());
1357        assert!(channel.is_bypass_collection("test"));
1358    }
1359
1360    #[tokio::test]
1361    async fn test_bypass_channel_start_stop() {
1362        let config = BypassChannelConfig {
1363            udp: UdpConfig {
1364                bind_port: 0, // Ephemeral port
1365                ..Default::default()
1366            },
1367            ..Default::default()
1368        };
1369
1370        let mut channel = UdpBypassChannel::new(config).await.unwrap();
1371
1372        channel.start().await.unwrap();
1373        assert!(channel.is_running());
1374
1375        channel.stop();
1376        assert!(!channel.is_running());
1377    }
1378
1379    #[tokio::test]
1380    async fn test_bypass_send_receive() {
1381        // Create two channels on different ports
1382        let config1 = BypassChannelConfig {
1383            udp: UdpConfig {
1384                bind_port: 0,
1385                ..Default::default()
1386            },
1387            collections: vec![BypassCollectionConfig {
1388                collection: "test".into(),
1389                ttl_ms: 5000,
1390                ..Default::default()
1391            }],
1392            ..Default::default()
1393        };
1394
1395        let config2 = BypassChannelConfig {
1396            udp: UdpConfig {
1397                bind_port: 0,
1398                ..Default::default()
1399            },
1400            collections: vec![BypassCollectionConfig {
1401                collection: "test".into(),
1402                ttl_ms: 5000,
1403                ..Default::default()
1404            }],
1405            ..Default::default()
1406        };
1407
1408        let mut channel1 = UdpBypassChannel::new(config1).await.unwrap();
1409        let mut channel2 = UdpBypassChannel::new(config2).await.unwrap();
1410
1411        channel1.start().await.unwrap();
1412        channel2.start().await.unwrap();
1413
1414        // Get channel2's port and construct localhost address
1415        let socket2_port = channel2
1416            .socket
1417            .as_ref()
1418            .unwrap()
1419            .local_addr()
1420            .unwrap()
1421            .port();
1422        let socket2_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), socket2_port);
1423
1424        // Subscribe to messages on channel2
1425        let mut rx = channel2.subscribe();
1426
1427        // Send from channel1 to channel2
1428        let test_data = b"Hello, bypass!";
1429        channel1
1430            .send(BypassTarget::Unicast(socket2_addr), "test", test_data)
1431            .await
1432            .unwrap();
1433
1434        // Receive on channel2
1435        let msg = tokio::time::timeout(Duration::from_millis(500), rx.recv())
1436            .await
1437            .expect("timeout")
1438            .expect("receive error");
1439
1440        assert_eq!(msg.data, test_data);
1441        assert_eq!(msg.collection_hash, BypassHeader::hash_collection("test"));
1442
1443        // Check metrics
1444        let metrics1 = channel1.metrics();
1445        assert_eq!(metrics1.messages_sent, 1);
1446        assert!(metrics1.bytes_sent > 0);
1447
1448        let metrics2 = channel2.metrics();
1449        assert_eq!(metrics2.messages_received, 1);
1450        assert!(metrics2.bytes_received > 0);
1451
1452        channel1.stop();
1453        channel2.stop();
1454    }
1455
1456    #[test]
1457    fn test_message_too_large() {
1458        // This test doesn't need async since we're just testing the error condition
1459        let _config = BypassChannelConfig {
1460            max_message_size: 100,
1461            ..Default::default()
1462        };
1463
1464        // Create error manually since we can't easily test async in sync context
1465        let err = BypassError::MessageTooLarge {
1466            size: 200,
1467            max: 100,
1468        };
1469        assert!(err.to_string().contains("200"));
1470        assert!(err.to_string().contains("100"));
1471    }
1472
1473    // =========================================================================
1474    // Security Tests (ADR-042 Phase 5)
1475    // =========================================================================
1476
1477    #[test]
1478    fn test_security_config_none() {
1479        let config = BypassSecurityConfig::none();
1480        assert!(!config.require_signature);
1481        assert!(!config.encrypt_payload);
1482        assert!(config.source_allowlist.is_none());
1483        assert!(!config.replay_protection);
1484        assert!(!config.is_enabled());
1485    }
1486
1487    #[test]
1488    fn test_security_config_signed() {
1489        let config = BypassSecurityConfig::signed();
1490        assert!(config.require_signature);
1491        assert!(!config.encrypt_payload);
1492        assert!(config.is_enabled());
1493    }
1494
1495    #[test]
1496    fn test_security_config_full() {
1497        let config = BypassSecurityConfig::full_security();
1498        assert!(config.require_signature);
1499        assert!(config.encrypt_payload);
1500        assert!(config.replay_protection);
1501        assert_eq!(config.replay_window_size, 64);
1502        assert!(config.is_enabled());
1503    }
1504
1505    #[test]
1506    fn test_security_config_allowlist() {
1507        let ip1: IpAddr = "192.168.1.1".parse().unwrap();
1508        let ip2: IpAddr = "192.168.1.2".parse().unwrap();
1509        let ip3: IpAddr = "10.0.0.1".parse().unwrap();
1510
1511        // No allowlist - all allowed
1512        let config = BypassSecurityConfig::none();
1513        assert!(config.is_source_allowed(&ip1));
1514        assert!(config.is_source_allowed(&ip2));
1515        assert!(config.is_source_allowed(&ip3));
1516
1517        // With allowlist - only listed IPs allowed
1518        let config = BypassSecurityConfig {
1519            source_allowlist: Some(vec![ip1, ip2]),
1520            ..Default::default()
1521        };
1522        assert!(config.is_source_allowed(&ip1));
1523        assert!(config.is_source_allowed(&ip2));
1524        assert!(!config.is_source_allowed(&ip3));
1525    }
1526
1527    #[test]
1528    fn test_credentials_signing() {
1529        use ed25519_dalek::SigningKey;
1530
1531        // Use fixed seed for deterministic testing
1532        let seed: [u8; 32] = [1u8; 32];
1533        let signing_key = SigningKey::from_bytes(&seed);
1534        let verifying_key = signing_key.verifying_key();
1535
1536        // Create credentials
1537        let peer_ip: IpAddr = "192.168.1.1".parse().unwrap();
1538        let creds = BypassSecurityCredentials::new()
1539            .with_signing_key(signing_key)
1540            .with_peer_key(peer_ip.to_string(), verifying_key);
1541
1542        // Sign a message
1543        let message = b"test message for signing";
1544        let signature = creds.sign(message).expect("signing should succeed");
1545
1546        // Verify the signature
1547        creds
1548            .verify_by_ip(&peer_ip, message, &signature)
1549            .expect("verification should succeed");
1550    }
1551
1552    #[test]
1553    fn test_credentials_invalid_signature() {
1554        use ed25519_dalek::SigningKey;
1555
1556        // Use different fixed seeds for two key pairs
1557        let seed1: [u8; 32] = [1u8; 32];
1558        let seed2: [u8; 32] = [2u8; 32];
1559        let signing_key1 = SigningKey::from_bytes(&seed1);
1560        let signing_key2 = SigningKey::from_bytes(&seed2);
1561        let verifying_key2 = signing_key2.verifying_key();
1562
1563        // Create credentials with mismatched keys
1564        let peer_ip: IpAddr = "192.168.1.1".parse().unwrap();
1565        let creds = BypassSecurityCredentials::new()
1566            .with_signing_key(signing_key1)
1567            .with_peer_key(peer_ip.to_string(), verifying_key2);
1568
1569        // Sign with key1
1570        let message = b"test message";
1571        let signature = creds.sign(message).expect("signing should succeed");
1572
1573        // Verification with key2 should fail
1574        let result = creds.verify_by_ip(&peer_ip, message, &signature);
1575        assert!(matches!(result, Err(BypassError::InvalidSignature)));
1576    }
1577
1578    #[test]
1579    fn test_credentials_encryption() {
1580        let encryption_key = [42u8; 32];
1581        let creds = BypassSecurityCredentials::new().with_encryption_key(encryption_key);
1582
1583        let plaintext = b"secret message for encryption";
1584        let nonce = [0u8; 12];
1585
1586        // Encrypt
1587        let ciphertext = creds
1588            .encrypt(plaintext, &nonce)
1589            .expect("encryption should succeed");
1590        assert_ne!(ciphertext.as_slice(), plaintext);
1591        // Ciphertext should be plaintext + 16 bytes (Poly1305 tag)
1592        assert_eq!(ciphertext.len(), plaintext.len() + 16);
1593
1594        // Decrypt
1595        let decrypted = creds
1596            .decrypt(&ciphertext, &nonce)
1597            .expect("decryption should succeed");
1598        assert_eq!(decrypted, plaintext);
1599    }
1600
1601    #[test]
1602    fn test_credentials_decryption_wrong_key() {
1603        let key1 = [1u8; 32];
1604        let key2 = [2u8; 32];
1605
1606        let creds1 = BypassSecurityCredentials::new().with_encryption_key(key1);
1607        let creds2 = BypassSecurityCredentials::new().with_encryption_key(key2);
1608
1609        let plaintext = b"secret message";
1610        let nonce = [0u8; 12];
1611
1612        // Encrypt with key1
1613        let ciphertext = creds1
1614            .encrypt(plaintext, &nonce)
1615            .expect("encryption should succeed");
1616
1617        // Decrypt with key2 should fail
1618        let result = creds2.decrypt(&ciphertext, &nonce);
1619        assert!(matches!(result, Err(BypassError::DecryptionFailed)));
1620    }
1621
1622    #[test]
1623    fn test_credentials_missing_signing_key() {
1624        let creds = BypassSecurityCredentials::new();
1625        let result = creds.sign(b"message");
1626        assert!(matches!(result, Err(BypassError::MissingCredential(_))));
1627    }
1628
1629    #[test]
1630    fn test_credentials_missing_encryption_key() {
1631        let creds = BypassSecurityCredentials::new();
1632        let result = creds.encrypt(b"message", &[0u8; 12]);
1633        assert!(matches!(result, Err(BypassError::MissingCredential(_))));
1634    }
1635
1636    #[test]
1637    fn test_replay_tracker_first_message() {
1638        let tracker = ReplayTracker::new(64);
1639        let source: IpAddr = "192.168.1.1".parse().unwrap();
1640
1641        // First message should always succeed
1642        tracker
1643            .check(&source, 0)
1644            .expect("first message should succeed");
1645    }
1646
1647    #[test]
1648    fn test_replay_tracker_sequential() {
1649        let tracker = ReplayTracker::new(64);
1650        let source: IpAddr = "192.168.1.1".parse().unwrap();
1651
1652        // Sequential messages should all succeed
1653        for seq in 0..10u8 {
1654            tracker
1655                .check(&source, seq)
1656                .unwrap_or_else(|_| panic!("sequence {} should succeed", seq));
1657        }
1658    }
1659
1660    #[test]
1661    fn test_replay_tracker_replay_detected() {
1662        let tracker = ReplayTracker::new(64);
1663        let source: IpAddr = "192.168.1.1".parse().unwrap();
1664
1665        // First message succeeds
1666        tracker.check(&source, 5).expect("first should succeed");
1667
1668        // Replay should be detected
1669        let result = tracker.check(&source, 5);
1670        assert!(matches!(
1671            result,
1672            Err(BypassError::ReplayDetected { sequence: 5 })
1673        ));
1674    }
1675
1676    #[test]
1677    fn test_replay_tracker_out_of_order() {
1678        let tracker = ReplayTracker::new(64);
1679        let source: IpAddr = "192.168.1.1".parse().unwrap();
1680
1681        // Messages can arrive out of order within window
1682        tracker.check(&source, 10).expect("10 should succeed");
1683        tracker
1684            .check(&source, 8)
1685            .expect("8 should succeed (within window)");
1686        tracker.check(&source, 12).expect("12 should succeed");
1687        tracker
1688            .check(&source, 9)
1689            .expect("9 should succeed (within window)");
1690
1691        // But not replayed
1692        let result = tracker.check(&source, 8);
1693        assert!(matches!(
1694            result,
1695            Err(BypassError::ReplayDetected { sequence: 8 })
1696        ));
1697    }
1698
1699    #[test]
1700    fn test_replay_tracker_multiple_sources() {
1701        let tracker = ReplayTracker::new(64);
1702        let source1: IpAddr = "192.168.1.1".parse().unwrap();
1703        let source2: IpAddr = "192.168.1.2".parse().unwrap();
1704
1705        // Same sequence from different sources should both succeed
1706        tracker
1707            .check(&source1, 5)
1708            .expect("source1 seq 5 should succeed");
1709        tracker
1710            .check(&source2, 5)
1711            .expect("source2 seq 5 should succeed");
1712
1713        // Replay from same source should fail
1714        let result = tracker.check(&source1, 5);
1715        assert!(matches!(result, Err(BypassError::ReplayDetected { .. })));
1716
1717        // But different sequence should work
1718        tracker
1719            .check(&source1, 6)
1720            .expect("source1 seq 6 should succeed");
1721    }
1722
1723    #[test]
1724    fn test_replay_tracker_window_advance() {
1725        let tracker = ReplayTracker::new(64);
1726        let source: IpAddr = "192.168.1.1".parse().unwrap();
1727
1728        // Start with a sequence in the middle of the range
1729        tracker.check(&source, 100).expect("100 should succeed");
1730        tracker.check(&source, 110).expect("110 should succeed");
1731        tracker.check(&source, 120).expect("120 should succeed");
1732
1733        // Much later sequence should succeed and advance the window
1734        tracker.check(&source, 200).expect("200 should succeed");
1735
1736        // Sequence 100 is now outside the window (more than 64 behind)
1737        // But we already marked it, so this tests window advancement
1738        tracker.check(&source, 201).expect("201 should succeed");
1739    }
1740
1741    #[test]
1742    fn test_replay_tracker_clear() {
1743        let tracker = ReplayTracker::new(64);
1744        let source: IpAddr = "192.168.1.1".parse().unwrap();
1745
1746        tracker.check(&source, 5).expect("initial should succeed");
1747
1748        // Clear and replay should now succeed
1749        tracker.clear_source(&source);
1750        tracker
1751            .check(&source, 5)
1752            .expect("after clear should succeed");
1753    }
1754
1755    #[test]
1756    fn test_metrics_snapshot_includes_security() {
1757        let metrics = BypassMetrics::default();
1758
1759        // Increment security metrics
1760        metrics.signature_rejected.fetch_add(1, Ordering::Relaxed);
1761        metrics.decryption_failed.fetch_add(2, Ordering::Relaxed);
1762        metrics.unauthorized_source.fetch_add(3, Ordering::Relaxed);
1763        metrics.replay_rejected.fetch_add(4, Ordering::Relaxed);
1764
1765        // Snapshot should include them
1766        let snapshot = metrics.snapshot();
1767        assert_eq!(snapshot.signature_rejected, 1);
1768        assert_eq!(snapshot.decryption_failed, 2);
1769        assert_eq!(snapshot.unauthorized_source, 3);
1770        assert_eq!(snapshot.replay_rejected, 4);
1771    }
1772
1773    #[test]
1774    fn test_bypass_error_display_all_variants() {
1775        let io_err = BypassError::Io(std::io::Error::new(std::io::ErrorKind::Other, "test io"));
1776        assert!(io_err.to_string().contains("IO error"));
1777        assert!(io_err.to_string().contains("test io"));
1778
1779        let encode_err = BypassError::Encode("bad data".to_string());
1780        assert!(encode_err.to_string().contains("Encode error"));
1781
1782        let decode_err = BypassError::Decode("corrupt".to_string());
1783        assert!(decode_err.to_string().contains("Decode error"));
1784
1785        let config_err = BypassError::Config("bad config".to_string());
1786        assert!(config_err.to_string().contains("Config error"));
1787
1788        let not_started = BypassError::NotStarted;
1789        assert!(not_started.to_string().contains("not started"));
1790
1791        let invalid_header = BypassError::InvalidHeader;
1792        assert!(invalid_header.to_string().contains("Invalid bypass header"));
1793
1794        let stale = BypassError::StaleMessage;
1795        assert!(stale.to_string().contains("stale"));
1796
1797        let sig_err = BypassError::InvalidSignature;
1798        assert!(sig_err.to_string().contains("Invalid message signature"));
1799
1800        let decrypt_err = BypassError::DecryptionFailed;
1801        assert!(decrypt_err.to_string().contains("decryption failed"));
1802
1803        let unauth = BypassError::UnauthorizedSource("10.0.0.1".parse().unwrap());
1804        assert!(unauth.to_string().contains("10.0.0.1"));
1805
1806        let replay = BypassError::ReplayDetected { sequence: 42 };
1807        assert!(replay.to_string().contains("42"));
1808
1809        let missing = BypassError::MissingCredential("encryption key".to_string());
1810        assert!(missing.to_string().contains("encryption key"));
1811    }
1812
1813    #[test]
1814    fn test_bypass_error_source() {
1815        let io_err = BypassError::Io(std::io::Error::new(std::io::ErrorKind::Other, "test"));
1816        assert!(std::error::Error::source(&io_err).is_some());
1817
1818        let encode_err = BypassError::Encode("test".into());
1819        assert!(std::error::Error::source(&encode_err).is_none());
1820
1821        let not_started = BypassError::NotStarted;
1822        assert!(std::error::Error::source(&not_started).is_none());
1823    }
1824
1825    #[test]
1826    fn test_bypass_error_from_io() {
1827        let io_err = std::io::Error::new(std::io::ErrorKind::NotFound, "gone");
1828        let bypass_err: BypassError = io_err.into();
1829        assert!(bypass_err.to_string().contains("gone"));
1830    }
1831
1832    #[test]
1833    fn test_message_encoding_display() {
1834        assert_eq!(MessageEncoding::Protobuf.to_string(), "protobuf");
1835        assert_eq!(MessageEncoding::Json.to_string(), "json");
1836        assert_eq!(MessageEncoding::Raw.to_string(), "raw");
1837        assert_eq!(MessageEncoding::Cbor.to_string(), "cbor");
1838    }
1839
1840    #[test]
1841    fn test_bypass_collection_config_ttl() {
1842        let config = BypassCollectionConfig {
1843            ttl_ms: 200,
1844            ..Default::default()
1845        };
1846        assert_eq!(config.ttl(), Duration::from_millis(200));
1847    }
1848
1849    #[test]
1850    fn test_bypass_collection_config_default() {
1851        let config = BypassCollectionConfig::default();
1852        assert!(config.collection.is_empty());
1853        assert_eq!(config.transport, BypassTransport::Unicast);
1854        assert_eq!(config.encoding, MessageEncoding::Protobuf);
1855        assert_eq!(config.ttl_ms, 5000);
1856        assert_eq!(config.priority, MessagePriority::Normal);
1857    }
1858
1859    #[test]
1860    fn test_udp_config_default() {
1861        let config = UdpConfig::default();
1862        assert_eq!(config.bind_port, 5150);
1863        assert_eq!(config.buffer_size, 65536);
1864        assert_eq!(config.multicast_ttl, 32);
1865    }
1866
1867    #[test]
1868    fn test_bypass_channel_config_default() {
1869        let config = BypassChannelConfig::default();
1870        assert!(config.collections.is_empty());
1871        assert!(!config.multicast_enabled);
1872        assert_eq!(config.max_message_size, 0);
1873    }
1874
1875    #[test]
1876    fn test_bypass_channel_config_new() {
1877        let config = BypassChannelConfig::new();
1878        assert!(config.multicast_enabled);
1879        assert_eq!(config.max_message_size, 65000);
1880    }
1881
1882    #[test]
1883    fn test_bypass_security_config_is_enabled_allowlist() {
1884        let config = BypassSecurityConfig {
1885            source_allowlist: Some(vec!["10.0.0.1".parse().unwrap()]),
1886            ..Default::default()
1887        };
1888        assert!(config.is_enabled());
1889    }
1890
1891    #[test]
1892    fn test_bypass_security_config_is_enabled_replay() {
1893        let config = BypassSecurityConfig {
1894            replay_protection: true,
1895            ..Default::default()
1896        };
1897        assert!(config.is_enabled());
1898    }
1899
1900    #[test]
1901    fn test_bypass_security_credentials_debug() {
1902        let creds = BypassSecurityCredentials::new();
1903        let debug = format!("{:?}", creds);
1904        assert!(debug.contains("has_signing_key"));
1905        assert!(debug.contains("false"));
1906        assert!(debug.contains("peer_keys_count"));
1907    }
1908
1909    #[test]
1910    fn test_bypass_security_credentials_verifying_key() {
1911        // Without signing key
1912        let creds = BypassSecurityCredentials::new();
1913        assert!(creds.verifying_key().is_none());
1914
1915        // With signing key
1916        let seed: [u8; 32] = [42u8; 32];
1917        let signing_key = ed25519_dalek::SigningKey::from_bytes(&seed);
1918        let creds = BypassSecurityCredentials::new().with_signing_key(signing_key.clone());
1919        let vk = creds.verifying_key();
1920        assert!(vk.is_some());
1921        assert_eq!(vk.unwrap(), signing_key.verifying_key());
1922    }
1923
1924    #[test]
1925    fn test_bypass_security_credentials_verify_unknown_peer() {
1926        let creds = BypassSecurityCredentials::new();
1927        let sig = ed25519_dalek::Signature::from_bytes(&[0u8; 64]);
1928        let result = creds.verify("unknown-peer", b"message", &sig);
1929        assert!(matches!(result, Err(BypassError::MissingCredential(_))));
1930    }
1931
1932    #[test]
1933    fn test_bypass_security_credentials_decrypt_no_key() {
1934        let creds = BypassSecurityCredentials::new();
1935        let result = creds.decrypt(b"ciphertext", &[0u8; 12]);
1936        assert!(matches!(result, Err(BypassError::MissingCredential(_))));
1937    }
1938
1939    #[test]
1940    fn test_replay_tracker_clear_all() {
1941        let tracker = ReplayTracker::new(64);
1942        let source1: IpAddr = "192.168.1.1".parse().unwrap();
1943        let source2: IpAddr = "192.168.1.2".parse().unwrap();
1944
1945        tracker.check(&source1, 1).unwrap();
1946        tracker.check(&source2, 1).unwrap();
1947
1948        tracker.clear_all();
1949
1950        // After clear_all, same sequences should succeed
1951        tracker.check(&source1, 1).unwrap();
1952        tracker.check(&source2, 1).unwrap();
1953    }
1954
1955    #[test]
1956    fn test_bypass_header_is_stale() {
1957        let header = BypassHeader::new("test", Duration::from_millis(100), 0);
1958        let now = Instant::now();
1959
1960        // Not stale: sent_at = now, received_at = now
1961        assert!(!header.is_stale(now, now));
1962
1963        // Stale: sent 1 second ago, TTL is 100ms
1964        let sent_at = now - Duration::from_secs(1);
1965        assert!(header.is_stale(now, sent_at));
1966    }
1967
1968    #[test]
1969    fn test_bypass_metrics_snapshot_default() {
1970        let snapshot = BypassMetricsSnapshot::default();
1971        assert_eq!(snapshot.messages_sent, 0);
1972        assert_eq!(snapshot.messages_received, 0);
1973        assert_eq!(snapshot.bytes_sent, 0);
1974        assert_eq!(snapshot.bytes_received, 0);
1975        assert_eq!(snapshot.stale_dropped, 0);
1976        assert_eq!(snapshot.invalid_dropped, 0);
1977        assert_eq!(snapshot.send_errors, 0);
1978        assert_eq!(snapshot.receive_errors, 0);
1979    }
1980
1981    #[tokio::test]
1982    async fn test_bypass_channel_subscribe_collection() {
1983        let config = BypassChannelConfig::new().with_collection(BypassCollectionConfig {
1984            collection: "positions".into(),
1985            ..Default::default()
1986        });
1987
1988        let channel = UdpBypassChannel::new(config).await.unwrap();
1989        let (hash, _rx) = channel.subscribe_collection("positions");
1990        assert_eq!(hash, BypassHeader::hash_collection("positions"));
1991    }
1992
1993    #[tokio::test]
1994    async fn test_bypass_channel_get_collection_by_hash() {
1995        let config = BypassChannelConfig::new().with_collection(BypassCollectionConfig {
1996            collection: "telemetry".into(),
1997            ttl_ms: 200,
1998            ..Default::default()
1999        });
2000
2001        let channel = UdpBypassChannel::new(config).await.unwrap();
2002        let hash = BypassHeader::hash_collection("telemetry");
2003        let col_config = channel.get_collection_by_hash(hash);
2004        assert!(col_config.is_some());
2005        assert_eq!(col_config.unwrap().ttl_ms, 200);
2006
2007        // Unknown hash
2008        assert!(channel.get_collection_by_hash(12345).is_none());
2009    }
2010
2011    #[tokio::test]
2012    async fn test_bypass_channel_config_accessor() {
2013        let config = BypassChannelConfig {
2014            max_message_size: 1024,
2015            ..BypassChannelConfig::new()
2016        };
2017        let channel = UdpBypassChannel::new(config).await.unwrap();
2018        assert_eq!(channel.config().max_message_size, 1024);
2019    }
2020
2021    #[tokio::test]
2022    async fn test_bypass_send_not_started() {
2023        let config = BypassChannelConfig::new();
2024        let channel = UdpBypassChannel::new(config).await.unwrap();
2025
2026        let result = channel
2027            .send(
2028                BypassTarget::Unicast("127.0.0.1:5000".parse().unwrap()),
2029                "test",
2030                b"data",
2031            )
2032            .await;
2033        assert!(matches!(result, Err(BypassError::NotStarted)));
2034    }
2035
2036    #[tokio::test]
2037    async fn test_bypass_send_message_too_large() {
2038        let config = BypassChannelConfig {
2039            max_message_size: 10,
2040            udp: UdpConfig {
2041                bind_port: 0,
2042                ..Default::default()
2043            },
2044            ..BypassChannelConfig::new()
2045        };
2046        let mut channel = UdpBypassChannel::new(config).await.unwrap();
2047        channel.start().await.unwrap();
2048
2049        let big_data = vec![0u8; 100];
2050        let result = channel
2051            .send(
2052                BypassTarget::Unicast("127.0.0.1:5000".parse().unwrap()),
2053                "test",
2054                &big_data,
2055            )
2056            .await;
2057        assert!(matches!(
2058            result,
2059            Err(BypassError::MessageTooLarge { size: 100, max: 10 })
2060        ));
2061
2062        channel.stop();
2063    }
2064
2065    #[tokio::test]
2066    async fn test_bypass_send_to_collection_unknown() {
2067        let config = BypassChannelConfig {
2068            udp: UdpConfig {
2069                bind_port: 0,
2070                ..Default::default()
2071            },
2072            ..BypassChannelConfig::new()
2073        };
2074        let mut channel = UdpBypassChannel::new(config).await.unwrap();
2075        channel.start().await.unwrap();
2076
2077        let result = channel.send_to_collection("unknown", None, b"data").await;
2078        assert!(matches!(result, Err(BypassError::Config(_))));
2079
2080        channel.stop();
2081    }
2082
2083    #[tokio::test]
2084    async fn test_bypass_send_to_collection_unicast_no_addr() {
2085        let config = BypassChannelConfig {
2086            udp: UdpConfig {
2087                bind_port: 0,
2088                ..Default::default()
2089            },
2090            collections: vec![BypassCollectionConfig {
2091                collection: "test".into(),
2092                transport: BypassTransport::Unicast,
2093                ..Default::default()
2094            }],
2095            ..BypassChannelConfig::new()
2096        };
2097        let mut channel = UdpBypassChannel::new(config).await.unwrap();
2098        channel.start().await.unwrap();
2099
2100        // Unicast without target address should error
2101        let result = channel.send_to_collection("test", None, b"data").await;
2102        assert!(matches!(result, Err(BypassError::Config(_))));
2103
2104        channel.stop();
2105    }
2106
2107    #[tokio::test]
2108    async fn test_bypass_send_to_collection_broadcast() {
2109        let config = BypassChannelConfig {
2110            udp: UdpConfig {
2111                bind_port: 0,
2112                ..Default::default()
2113            },
2114            collections: vec![BypassCollectionConfig {
2115                collection: "bcast".into(),
2116                transport: BypassTransport::Broadcast,
2117                ..Default::default()
2118            }],
2119            ..BypassChannelConfig::new()
2120        };
2121        let mut channel = UdpBypassChannel::new(config).await.unwrap();
2122        channel.start().await.unwrap();
2123
2124        // Broadcast should work without target address
2125        // Note: actual broadcast send may fail on some systems, but the path is exercised
2126        let _result = channel.send_to_collection("bcast", None, b"data").await;
2127
2128        channel.stop();
2129    }
2130
2131    #[tokio::test]
2132    async fn test_bypass_leave_multicast_no_socket() {
2133        let config = BypassChannelConfig::new();
2134        let channel = UdpBypassChannel::new(config).await.unwrap();
2135
2136        // Leaving a group we never joined should be ok
2137        let result = channel.leave_multicast("239.1.1.1".parse().unwrap());
2138        assert!(result.is_ok());
2139    }
2140
2141    #[test]
2142    fn test_bypass_transport_serde() {
2143        let unicast = BypassTransport::Unicast;
2144        let json = serde_json::to_string(&unicast).unwrap();
2145        let parsed: BypassTransport = serde_json::from_str(&json).unwrap();
2146        assert_eq!(parsed, BypassTransport::Unicast);
2147
2148        let multicast = BypassTransport::Multicast {
2149            group: "239.1.1.100".parse().unwrap(),
2150            port: 5150,
2151        };
2152        let json = serde_json::to_string(&multicast).unwrap();
2153        let parsed: BypassTransport = serde_json::from_str(&json).unwrap();
2154        assert_eq!(parsed, multicast);
2155    }
2156
2157    #[test]
2158    fn test_message_encoding_serde() {
2159        for encoding in &[
2160            MessageEncoding::Protobuf,
2161            MessageEncoding::Json,
2162            MessageEncoding::Raw,
2163            MessageEncoding::Cbor,
2164        ] {
2165            let json = serde_json::to_string(encoding).unwrap();
2166            let parsed: MessageEncoding = serde_json::from_str(&json).unwrap();
2167            assert_eq!(parsed, *encoding);
2168        }
2169    }
2170
2171    #[test]
2172    fn test_bypass_header_flags() {
2173        let mut header = BypassHeader::new("test", Duration::from_millis(1000), 0);
2174
2175        // No flags by default
2176        assert_eq!(header.flags, 0);
2177        assert_eq!(header.flags & BypassHeader::FLAG_SIGNED, 0);
2178        assert_eq!(header.flags & BypassHeader::FLAG_ENCRYPTED, 0);
2179
2180        // Set signed flag
2181        header.flags |= BypassHeader::FLAG_SIGNED;
2182        assert_ne!(header.flags & BypassHeader::FLAG_SIGNED, 0);
2183        assert_eq!(header.flags & BypassHeader::FLAG_ENCRYPTED, 0);
2184
2185        // Set encrypted flag
2186        header.flags |= BypassHeader::FLAG_ENCRYPTED;
2187        assert_ne!(header.flags & BypassHeader::FLAG_SIGNED, 0);
2188        assert_ne!(header.flags & BypassHeader::FLAG_ENCRYPTED, 0);
2189
2190        // Encode/decode preserves flags
2191        let encoded = header.encode();
2192        let decoded = BypassHeader::decode(&encoded).unwrap();
2193        assert_eq!(decoded.flags, header.flags);
2194    }
2195}