Skip to main content

vcl_protocol/
connection.rs

1//! # VCL Connection
2//!
3//! [`VCLConnection`] is the main entry point for VCL Protocol.
4//! It manages the full lifecycle of a secure UDP connection:
5//!
6//! - X25519 ephemeral handshake
7//! - Packet encryption, signing, and chain validation
8//! - Replay protection
9//! - Session management (close, timeout)
10//! - Connection events via async mpsc channel
11//! - Ping / heartbeat with latency measurement
12//! - Mid-session key rotation
13//! - Automatic packet fragmentation and reassembly
14//! - Flow control with sliding window
15//!
16//! ## Logging
17//!
18//! This module uses the [`tracing`] crate. Enable with:
19//! ```no_run
20//! tracing_subscriber::fmt::init();
21//! ```
22
23use crate::packet::{VCLPacket, PacketType};
24use crate::crypto::{KeyPair, encrypt_payload, decrypt_payload};
25use crate::handshake::{HandshakeMessage, create_client_hello, process_client_hello, process_server_hello};
26use crate::error::VCLError;
27use crate::event::VCLEvent;
28use crate::config::VCLConfig;
29use crate::flow::FlowController;
30use crate::fragment::{Fragment, Fragmenter, Reassembler};
31use ed25519_dalek::SigningKey;
32use x25519_dalek::{EphemeralSecret, PublicKey};
33use rand::rngs::OsRng;
34use tokio::net::UdpSocket;
35use tokio::sync::mpsc;
36use std::net::SocketAddr;
37use std::collections::HashSet;
38use std::time::Instant;
39use tracing::{debug, info, warn, error};
40
41/// A secure VCL Protocol connection over UDP.
42///
43/// Each connection manages its own cryptographic state:
44/// independent send/receive hash chains, nonce tracking,
45/// shared secret, Ed25519 key pair, flow controller, and fragment reassembler.
46///
47/// # Example — Server
48///
49/// ```no_run
50/// use vcl_protocol::connection::VCLConnection;
51///
52/// #[tokio::main]
53/// async fn main() {
54///     let mut server = VCLConnection::bind("127.0.0.1:8080").await.unwrap();
55///     server.accept_handshake().await.unwrap();
56///
57///     loop {
58///         match server.recv().await {
59///             Ok(packet) => println!("{}", String::from_utf8_lossy(&packet.payload)),
60///             Err(e)     => { eprintln!("{}", e); break; }
61///         }
62///     }
63/// }
64/// ```
65///
66/// # Example — Client
67///
68/// ```no_run
69/// use vcl_protocol::connection::VCLConnection;
70///
71/// #[tokio::main]
72/// async fn main() {
73///     let mut client = VCLConnection::bind("127.0.0.1:0").await.unwrap();
74///     client.connect("127.0.0.1:8080").await.unwrap();
75///     client.send(b"Hello!").await.unwrap();
76///     client.close().unwrap();
77/// }
78/// ```
79pub struct VCLConnection {
80    socket: UdpSocket,
81    config: VCLConfig,
82    keypair: KeyPair,
83    send_sequence: u64,
84    send_hash: Vec<u8>,
85    recv_hash: Vec<u8>,
86    last_sequence: u64,
87    seen_nonces: HashSet<[u8; 24]>,
88    peer_addr: Option<SocketAddr>,
89    peer_public_key: Option<Vec<u8>>,
90    shared_secret: Option<[u8; 32]>,
91    #[allow(dead_code)]
92    is_server: bool,
93    closed: bool,
94    last_activity: Instant,
95    timeout_secs: u64,
96    event_tx: Option<mpsc::Sender<VCLEvent>>,
97    ping_sent_at: Option<Instant>,
98    flow: FlowController,
99    reassembler: Reassembler,
100    fragment_id: u64,
101}
102
103impl VCLConnection {
104    /// Bind a new VCL connection to a local UDP address using the default config.
105    ///
106    /// Use `"127.0.0.1:0"` to let the OS assign a port (typical for clients).
107    ///
108    /// # Errors
109    /// Returns [`VCLError::IoError`] if the socket cannot be bound.
110    pub async fn bind(addr: &str) -> Result<Self, VCLError> {
111        Self::bind_with_config(addr, VCLConfig::default()).await
112    }
113
114    /// Bind a new VCL connection with a specific [`VCLConfig`].
115    ///
116    /// The config controls fragmentation threshold, flow window size,
117    /// reliability mode, and transport selection.
118    ///
119    /// # Errors
120    /// Returns [`VCLError::IoError`] if the socket cannot be bound.
121    pub async fn bind_with_config(addr: &str, config: VCLConfig) -> Result<Self, VCLError> {
122        let socket = UdpSocket::bind(addr).await?;
123        let local_addr = socket.local_addr()
124            .map(|a| a.to_string())
125            .unwrap_or_else(|_| addr.to_string());
126        info!(
127            addr = %local_addr,
128            transport = "udp",
129            reliability = ?config.reliability,
130            fragment_size = config.fragment_size,
131            "VCLConnection bound"
132        );
133        let flow = FlowController::new(config.flow_window_size);
134        Ok(VCLConnection {
135            socket,
136            config,
137            keypair: KeyPair::generate(),
138            send_sequence: 0,
139            send_hash: vec![0; 32],
140            recv_hash: vec![0; 32],
141            last_sequence: 0,
142            seen_nonces: HashSet::new(),
143            peer_addr: None,
144            peer_public_key: None,
145            shared_secret: None,
146            is_server: false,
147            closed: false,
148            last_activity: Instant::now(),
149            timeout_secs: 60,
150            event_tx: None,
151            ping_sent_at: None,
152            flow,
153            reassembler: Reassembler::new(),
154            fragment_id: 0,
155        })
156    }
157
158    // ─── Events ──────────────────────────────────────────────────────────────
159
160    /// Subscribe to connection events.
161    ///
162    /// Returns an async `mpsc::Receiver<VCLEvent>` with a channel capacity of 64.
163    /// Call this **before** `connect()` or `accept_handshake()` to receive
164    /// the [`VCLEvent::Connected`] event.
165    ///
166    /// Events are sent with `try_send` — if the channel is full, events are dropped silently.
167    pub fn subscribe(&mut self) -> mpsc::Receiver<VCLEvent> {
168        debug!("Event subscription registered");
169        let (tx, rx) = mpsc::channel(64);
170        self.event_tx = Some(tx);
171        rx
172    }
173
174    fn emit(&self, event: VCLEvent) {
175        if let Some(tx) = &self.event_tx {
176            let _ = tx.try_send(event);
177        }
178    }
179
180    // ─── Configuration ────────────────────────────────────────────────────────
181
182    /// Set the inactivity timeout in seconds (default: 60).
183    ///
184    /// Set to `0` to disable the timeout.
185    pub fn set_timeout(&mut self, secs: u64) {
186        debug!(timeout_secs = secs, "Inactivity timeout updated");
187        self.timeout_secs = secs;
188    }
189
190    /// Get the current inactivity timeout in seconds.
191    pub fn get_timeout(&self) -> u64 {
192        self.timeout_secs
193    }
194
195    /// Get the [`Instant`] of the last `send()` or `recv()` activity.
196    pub fn last_activity(&self) -> Instant {
197        self.last_activity
198    }
199
200    /// Get a reference to the current [`VCLConfig`].
201    pub fn get_config(&self) -> &VCLConfig {
202        &self.config
203    }
204
205    /// Get a reference to the [`FlowController`] for inspecting flow stats.
206    pub fn flow(&self) -> &FlowController {
207        &self.flow
208    }
209
210    /// Manually acknowledge a sent packet by sequence number.
211    ///
212    /// Returns `true` if the packet was found and removed from the in-flight window.
213    pub fn ack_packet(&mut self, sequence: u64) -> bool {
214        self.flow.on_ack(sequence)
215    }
216
217    /// Override the Ed25519 signing key with a pre-shared key.
218    ///
219    /// ⚠️ **For testing only.** Never use a pre-shared key in production.
220    pub fn set_shared_key(&mut self, private_key: &[u8]) {
221        debug!("Pre-shared key set (testing mode)");
222        let key_bytes: &[u8; 32] = private_key.try_into().unwrap();
223        let signing_key = SigningKey::from_bytes(key_bytes);
224        let verifying_key = signing_key.verifying_key();
225        self.keypair.private_key = private_key.to_vec();
226        self.keypair.public_key = verifying_key.to_bytes().to_vec();
227    }
228
229    // ─── Handshake ────────────────────────────────────────────────────────────
230
231    /// Connect to a remote VCL server and perform the X25519 handshake.
232    ///
233    /// After this returns `Ok(())`, the connection is ready to `send()` and `recv()`.
234    /// Emits [`VCLEvent::Connected`] if subscribed.
235    ///
236    /// # Errors
237    /// - [`VCLError::IoError`] — socket or address error
238    /// - [`VCLError::HandshakeFailed`] — key exchange failed
239    /// - [`VCLError::ExpectedServerHello`] — unexpected handshake message
240    pub async fn connect(&mut self, addr: &str) -> Result<(), VCLError> {
241        info!(peer = %addr, "Initiating handshake (client)");
242        let parsed: SocketAddr = addr.parse()?;
243        self.peer_addr = Some(parsed);
244
245        let (hello_msg, ephemeral) = create_client_hello();
246        let hello_bytes = bincode::serialize(&hello_msg)?;
247        self.socket.send_to(&hello_bytes, parsed).await?;
248        debug!(peer = %addr, "ClientHello sent");
249
250        let mut buf = vec![0u8; 65535];
251        let (len, _) = self.socket.recv_from(&mut buf).await?;
252        let server_hello: HandshakeMessage = bincode::deserialize(&buf[..len])?;
253
254        match server_hello {
255            HandshakeMessage::ServerHello { public_key } => {
256                let shared = process_server_hello(ephemeral, public_key)
257                    .ok_or_else(|| VCLError::HandshakeFailed("Key exchange failed".to_string()))?;
258                self.shared_secret = Some(shared);
259                debug!(peer = %addr, "ServerHello received, shared secret established");
260            }
261            _ => {
262                warn!(peer = %addr, "Expected ServerHello, got unexpected message");
263                return Err(VCLError::ExpectedServerHello);
264            }
265        }
266
267        self.last_activity = Instant::now();
268        info!(peer = %addr, "Handshake complete (client)");
269        self.emit(VCLEvent::Connected);
270        Ok(())
271    }
272
273    /// Accept an incoming X25519 handshake from a client (server side).
274    ///
275    /// Blocks until a `ClientHello` is received.
276    /// After this returns `Ok(())`, the connection is ready to `send()` and `recv()`.
277    /// Emits [`VCLEvent::Connected`] if subscribed.
278    ///
279    /// # Errors
280    /// - [`VCLError::IoError`] — socket error
281    /// - [`VCLError::HandshakeFailed`] — key exchange failed
282    /// - [`VCLError::ExpectedClientHello`] — unexpected handshake message
283    pub async fn accept_handshake(&mut self) -> Result<(), VCLError> {
284        info!("Waiting for ClientHello (server)");
285        let ephemeral = EphemeralSecret::random_from_rng(OsRng);
286
287        let mut buf = vec![0u8; 65535];
288        let (len, addr) = self.socket.recv_from(&mut buf).await?;
289        self.peer_addr = Some(addr);
290        debug!(peer = %addr, "ClientHello received");
291
292        let client_hello: HandshakeMessage = bincode::deserialize(&buf[..len])?;
293
294        match client_hello {
295            HandshakeMessage::ClientHello { public_key } => {
296                let (server_hello, shared) = process_client_hello(ephemeral, public_key);
297                let hello_bytes = bincode::serialize(&server_hello)?;
298                self.socket.send_to(&hello_bytes, addr).await?;
299                debug!(peer = %addr, "ServerHello sent");
300                self.shared_secret = Some(
301                    shared.ok_or_else(|| VCLError::HandshakeFailed("Key exchange failed".to_string()))?
302                );
303                self.is_server = true;
304            }
305            _ => {
306                warn!(peer = %addr, "Expected ClientHello, got unexpected message");
307                return Err(VCLError::ExpectedClientHello);
308            }
309        }
310
311        self.last_activity = Instant::now();
312        info!(peer = %addr, "Handshake complete (server)");
313        self.emit(VCLEvent::Connected);
314        Ok(())
315    }
316
317    // ─── Internal send ────────────────────────────────────────────────────────
318
319    async fn send_internal(&mut self, data: &[u8], packet_type: PacketType) -> Result<(), VCLError> {
320        let key = self.shared_secret.ok_or(VCLError::NoSharedSecret)?;
321        let (encrypted_payload, nonce) = encrypt_payload(data, &key)?;
322
323        let mut packet = VCLPacket::new_typed(
324            self.send_sequence,
325            self.send_hash.clone(),
326            encrypted_payload,
327            nonce,
328            packet_type,
329        );
330        packet.sign(&self.keypair.private_key)?;
331
332        let serialized = packet.serialize();
333        let addr = self.peer_addr.ok_or(VCLError::NoPeerAddress)?;
334        self.socket.send_to(&serialized, addr).await?;
335
336        debug!(
337            peer = %addr,
338            seq = self.send_sequence,
339            size = data.len(),
340            packet_type = ?packet.packet_type,
341            "Packet sent"
342        );
343
344        self.flow.on_send(self.send_sequence, data.to_vec());
345        self.send_hash = packet.compute_hash();
346        self.send_sequence += 1;
347        self.last_activity = Instant::now();
348        Ok(())
349    }
350
351    // ─── Public send ──────────────────────────────────────────────────────────
352
353    /// Encrypt, sign, and send a data packet to the peer.
354    ///
355    /// If the payload exceeds `config.fragment_size`, it is automatically split
356    /// into [`PacketType::Fragment`] packets and reassembled transparently on the receiver.
357    ///
358    /// # Errors
359    /// - [`VCLError::ConnectionClosed`] — connection was closed
360    /// - [`VCLError::Timeout`] — inactivity timeout exceeded
361    /// - [`VCLError::NoSharedSecret`] — handshake not completed
362    /// - [`VCLError::NoPeerAddress`] — peer address unknown
363    /// - [`VCLError::IoError`] — socket error
364    pub async fn send(&mut self, data: &[u8]) -> Result<(), VCLError> {
365        if self.closed {
366            error!("send() called on closed connection");
367            return Err(VCLError::ConnectionClosed);
368        }
369        self.check_timeout()?;
370
371        if !self.flow.can_send() {
372            warn!(
373                in_flight = self.flow.in_flight_count(),
374                window = self.flow.window_size(),
375                "Flow control window full"
376            );
377        }
378
379        if self.config.needs_fragmentation(data.len()) {
380            debug!(
381                size = data.len(),
382                fragment_size = self.config.fragment_size,
383                "Auto-fragmenting payload"
384            );
385            let frags = Fragmenter::split(data, self.config.fragment_size, self.fragment_id);
386            self.fragment_id += 1;
387            for frag in frags {
388                let frag_bytes = bincode::serialize(&frag)
389                    .map_err(|e| VCLError::SerializationError(e.to_string()))?;
390                self.send_internal(&frag_bytes, PacketType::Fragment).await?;
391            }
392        } else {
393            self.send_internal(data, PacketType::Data).await?;
394        }
395        Ok(())
396    }
397
398    // ─── Ping / Heartbeat ─────────────────────────────────────────────────────
399
400    /// Send a ping to the peer to check liveness and measure round-trip latency.
401    ///
402    /// The pong reply is handled **transparently inside `recv()`**.
403    /// Subscribe to events to receive [`VCLEvent::PongReceived { latency }`](VCLEvent::PongReceived).
404    ///
405    /// ⚠️ You must keep calling `recv()` for the pong to be processed.
406    pub async fn ping(&mut self) -> Result<(), VCLError> {
407        if self.closed {
408            error!("ping() called on closed connection");
409            return Err(VCLError::ConnectionClosed);
410        }
411        self.check_timeout()?;
412        debug!("Ping sent");
413        self.ping_sent_at = Some(Instant::now());
414        self.send_internal(&[], PacketType::Ping).await
415    }
416
417    async fn handle_ping(&mut self) -> Result<(), VCLError> {
418        debug!("Ping received, sending Pong");
419        self.send_internal(&[], PacketType::Pong).await?;
420        self.emit(VCLEvent::PingReceived);
421        Ok(())
422    }
423
424    fn handle_pong(&mut self) {
425        if let Some(sent_at) = self.ping_sent_at.take() {
426            let latency = sent_at.elapsed();
427            debug!(latency_us = latency.as_micros(), "Pong received");
428            self.emit(VCLEvent::PongReceived { latency });
429        }
430    }
431
432    // ─── Key Rotation ──────────────────────────────────────────────────────────
433
434    /// Initiate a mid-session key rotation using a fresh X25519 ephemeral exchange.
435    ///
436    /// Emits [`VCLEvent::KeyRotated`] on success.
437    ///
438    /// ⚠️ The peer must be actively calling `recv()` during rotation.
439    /// ⚠️ Do not call `send()` while `rotate_keys()` is awaiting a response.
440    pub async fn rotate_keys(&mut self) -> Result<(), VCLError> {
441        if self.closed {
442            error!("rotate_keys() called on closed connection");
443            return Err(VCLError::ConnectionClosed);
444        }
445        self.check_timeout()?;
446        info!("Initiating key rotation");
447
448        let our_ephemeral = EphemeralSecret::random_from_rng(OsRng);
449        let our_public = PublicKey::from(&our_ephemeral);
450
451        self.send_internal(&our_public.to_bytes(), PacketType::KeyRotation).await?;
452        debug!("KeyRotation request sent, waiting for response");
453
454        let mut buf = vec![0u8; 65535];
455        let (len, _) = self.socket.recv_from(&mut buf).await?;
456        let packet = VCLPacket::deserialize(&buf[..len])?;
457
458        if self.seen_nonces.contains(&packet.nonce) {
459            warn!("Replay detected during key rotation: duplicate nonce");
460            return Err(VCLError::ReplayDetected("Duplicate nonce in key rotation".to_string()));
461        }
462        self.seen_nonces.insert(packet.nonce);
463
464        if !packet.validate_chain(&self.recv_hash) {
465            warn!("Chain validation failed during key rotation");
466            return Err(VCLError::ChainValidationFailed);
467        }
468
469        let verify_key = self.peer_public_key.as_ref().unwrap_or(&self.keypair.public_key);
470        if !packet.verify(verify_key)? {
471            warn!("Signature invalid during key rotation");
472            return Err(VCLError::SignatureInvalid);
473        }
474
475        self.recv_hash = packet.compute_hash();
476        self.last_sequence = packet.sequence;
477        self.last_activity = Instant::now();
478
479        let old_key = self.shared_secret.ok_or(VCLError::NoSharedSecret)?;
480        let decrypted = decrypt_payload(&packet.payload, &old_key, &packet.nonce)?;
481
482        if packet.packet_type != PacketType::KeyRotation {
483            warn!("Expected KeyRotation response, got {:?}", packet.packet_type);
484            return Err(VCLError::HandshakeFailed("Expected KeyRotation response".to_string()));
485        }
486        if decrypted.len() != 32 {
487            warn!("KeyRotation payload has wrong length: {}", decrypted.len());
488            return Err(VCLError::InvalidPacket("KeyRotation payload must be 32 bytes".to_string()));
489        }
490
491        let their_bytes: [u8; 32] = decrypted
492            .try_into()
493            .map_err(|_| VCLError::InvalidPacket("Invalid peer pubkey".to_string()))?;
494        let their_pubkey = PublicKey::from(their_bytes);
495        let new_secret = our_ephemeral.diffie_hellman(&their_pubkey);
496        self.shared_secret = Some(new_secret.to_bytes());
497        info!("Key rotation complete");
498        self.emit(VCLEvent::KeyRotated);
499        Ok(())
500    }
501
502    async fn handle_key_rotation_request(&mut self, their_pubkey_bytes: &[u8]) -> Result<(), VCLError> {
503        debug!("KeyRotation request received, processing");
504        if their_pubkey_bytes.len() != 32 {
505            warn!("KeyRotation payload has wrong length: {}", their_pubkey_bytes.len());
506            return Err(VCLError::InvalidPacket("KeyRotation payload must be 32 bytes".to_string()));
507        }
508
509        let their_bytes: [u8; 32] = their_pubkey_bytes
510            .try_into()
511            .map_err(|_| VCLError::InvalidPacket("Invalid peer pubkey".to_string()))?;
512        let their_pubkey = PublicKey::from(their_bytes);
513
514        let our_ephemeral = EphemeralSecret::random_from_rng(OsRng);
515        let our_public = PublicKey::from(&our_ephemeral);
516        let new_secret = our_ephemeral.diffie_hellman(&their_pubkey);
517
518        self.send_internal(&our_public.to_bytes(), PacketType::KeyRotation).await?;
519        debug!("KeyRotation response sent");
520
521        self.shared_secret = Some(new_secret.to_bytes());
522        info!("Key rotation complete (responder)");
523        self.emit(VCLEvent::KeyRotated);
524        Ok(())
525    }
526
527    // ─── Receive ──────────────────────────────────────────────────────────────
528
529    /// Receive the next data packet from the peer.
530    ///
531    /// Control packets (`Ping`, `Pong`, `KeyRotation`) and fragment packets are handled
532    /// **transparently** — this method loops internally until a complete `Data` packet arrives.
533    /// Large payloads that were fragmented by the sender are reassembled automatically.
534    ///
535    /// On success, `packet.payload` contains the **decrypted** (and reassembled) data.
536    ///
537    /// # Errors
538    /// - [`VCLError::ConnectionClosed`] — connection was closed
539    /// - [`VCLError::Timeout`] — inactivity timeout exceeded
540    /// - [`VCLError::ReplayDetected`] — duplicate sequence or nonce
541    /// - [`VCLError::ChainValidationFailed`] — hash chain broken
542    /// - [`VCLError::SignatureInvalid`] — Ed25519 signature mismatch
543    /// - [`VCLError::CryptoError`] — decryption failed
544    /// - [`VCLError::IoError`] — socket error
545    pub async fn recv(&mut self) -> Result<VCLPacket, VCLError> {
546        if self.closed {
547            error!("recv() called on closed connection");
548            return Err(VCLError::ConnectionClosed);
549        }
550
551        loop {
552            self.check_timeout()?;
553
554            let mut buf = vec![0u8; 65535];
555            let (len, addr) = self.socket.recv_from(&mut buf).await?;
556            if self.peer_addr.is_none() {
557                self.peer_addr = Some(addr);
558            }
559
560            let packet = VCLPacket::deserialize(&buf[..len])?;
561
562            if self.last_sequence > 0 && packet.sequence <= self.last_sequence {
563                warn!(
564                    seq = packet.sequence,
565                    last_seq = self.last_sequence,
566                    "Replay detected: old sequence number"
567                );
568                return Err(VCLError::ReplayDetected("Old sequence number".to_string()));
569            }
570            if self.seen_nonces.contains(&packet.nonce) {
571                warn!(seq = packet.sequence, "Replay detected: duplicate nonce");
572                return Err(VCLError::ReplayDetected("Duplicate nonce".to_string()));
573            }
574            self.seen_nonces.insert(packet.nonce);
575            if self.seen_nonces.len() > 1000 {
576                debug!("Nonce window full, clearing");
577                self.seen_nonces.clear();
578            }
579
580            if !packet.validate_chain(&self.recv_hash) {
581                warn!(seq = packet.sequence, "Chain validation failed");
582                return Err(VCLError::ChainValidationFailed);
583            }
584
585            let verify_key = self.peer_public_key.as_ref().unwrap_or(&self.keypair.public_key);
586            if !packet.verify(verify_key)? {
587                warn!(seq = packet.sequence, "Signature invalid");
588                return Err(VCLError::SignatureInvalid);
589            }
590
591            self.recv_hash = packet.compute_hash();
592            self.last_sequence = packet.sequence;
593            self.last_activity = Instant::now();
594
595            let key = self.shared_secret.ok_or(VCLError::NoSharedSecret)?;
596            let decrypted = decrypt_payload(&packet.payload, &key, &packet.nonce)?;
597
598            match packet.packet_type {
599                PacketType::Data => {
600                    // Auto-ack oldest in-flight on receive (simple flow model)
601                    if let Some(seq) = self.flow.oldest_unacked_sequence() {
602                        self.flow.on_ack(seq);
603                    }
604                    debug!(
605                        peer = %addr,
606                        seq = packet.sequence,
607                        size = decrypted.len(),
608                        "Data packet received"
609                    );
610                    self.emit(VCLEvent::PacketReceived {
611                        sequence: packet.sequence,
612                        size: decrypted.len(),
613                    });
614                    return Ok(VCLPacket {
615                        version: packet.version,
616                        packet_type: PacketType::Data,
617                        sequence: packet.sequence,
618                        prev_hash: packet.prev_hash,
619                        nonce: packet.nonce,
620                        payload: decrypted,
621                        signature: packet.signature,
622                    });
623                }
624                PacketType::Ping => { self.handle_ping().await?; }
625                PacketType::Pong => { self.handle_pong(); }
626                PacketType::KeyRotation => {
627                    self.handle_key_rotation_request(&decrypted).await?;
628                }
629                PacketType::Fragment => {
630                    // Auto-ack oldest in-flight on receive
631                    if let Some(seq) = self.flow.oldest_unacked_sequence() {
632                        self.flow.on_ack(seq);
633                    }
634                    let frag: Fragment = bincode::deserialize(&decrypted)
635                        .map_err(|e| VCLError::SerializationError(e.to_string()))?;
636                    debug!(
637                        fragment_id = frag.fragment_id,
638                        index = frag.fragment_index,
639                        total = frag.total_fragments,
640                        "Fragment received"
641                    );
642                    if let Some(reassembled) = self.reassembler.add(frag) {
643                        info!(size = reassembled.len(), "Fragment reassembly complete");
644                        self.emit(VCLEvent::PacketReceived {
645                            sequence: packet.sequence,
646                            size: reassembled.len(),
647                        });
648                        return Ok(VCLPacket {
649                            version: packet.version,
650                            packet_type: PacketType::Data,
651                            sequence: packet.sequence,
652                            prev_hash: packet.prev_hash,
653                            nonce: packet.nonce,
654                            payload: reassembled,
655                            signature: packet.signature,
656                        });
657                    }
658                    // More fragments expected, loop continues
659                }
660            }
661        }
662    }
663
664    // ─── Session management ───────────────────────────────────────────────────
665
666    fn check_timeout(&self) -> Result<(), VCLError> {
667        if self.last_activity.elapsed().as_secs() > self.timeout_secs {
668            warn!(
669                elapsed_secs = self.last_activity.elapsed().as_secs(),
670                timeout_secs = self.timeout_secs,
671                "Connection timed out"
672            );
673            return Err(VCLError::Timeout);
674        }
675        Ok(())
676    }
677
678    /// Gracefully close the connection and clear all cryptographic state.
679    ///
680    /// After calling `close()`, all further operations return [`VCLError::ConnectionClosed`].
681    /// Emits [`VCLEvent::Disconnected`] if subscribed.
682    ///
683    /// # Errors
684    /// Returns [`VCLError::ConnectionClosed`] if already closed.
685    pub fn close(&mut self) -> Result<(), VCLError> {
686        if self.closed {
687            warn!("close() called on already closed connection");
688            return Err(VCLError::ConnectionClosed);
689        }
690        info!("Connection closed");
691        self.closed = true;
692        self.send_sequence = 0;
693        self.send_hash = vec![0; 32];
694        self.recv_hash = vec![0; 32];
695        self.last_sequence = 0;
696        self.seen_nonces.clear();
697        self.shared_secret = None;
698        self.ping_sent_at = None;
699        self.flow.reset();
700        self.reassembler.cleanup();
701        self.emit(VCLEvent::Disconnected);
702        Ok(())
703    }
704
705    /// Returns `true` if the connection has been closed.
706    pub fn is_closed(&self) -> bool {
707        self.closed
708    }
709
710    /// Get the local Ed25519 public key (32 bytes).
711    pub fn get_public_key(&self) -> Vec<u8> {
712        self.keypair.public_key.clone()
713    }
714
715    /// Get the current X25519 shared secret, or `None` if the handshake
716    /// has not completed or the connection is closed.
717    pub fn get_shared_secret(&self) -> Option<[u8; 32]> {
718        self.shared_secret
719    }
720}
721
722#[cfg(test)]
723mod tests {
724    use super::*;
725
726    #[tokio::test]
727    async fn test_bind_default_config() {
728        let conn = VCLConnection::bind("127.0.0.1:0").await.unwrap();
729        assert!(!conn.is_closed());
730        assert_eq!(conn.get_config().flow_window_size, 64);
731    }
732
733    #[tokio::test]
734    async fn test_bind_with_vpn_config() {
735        let conn = VCLConnection::bind_with_config("127.0.0.1:0", VCLConfig::vpn()).await.unwrap();
736        assert!(!conn.is_closed());
737        assert_eq!(conn.get_config().fragment_size, 1200);
738    }
739
740    #[tokio::test]
741    async fn test_flow_initial_state() {
742        let conn = VCLConnection::bind("127.0.0.1:0").await.unwrap();
743        assert!(conn.flow().can_send());
744        assert_eq!(conn.flow().in_flight_count(), 0);
745        assert_eq!(conn.flow().total_sent(), 0);
746    }
747
748    #[tokio::test]
749    async fn test_ack_packet() {
750        let mut conn = VCLConnection::bind("127.0.0.1:0").await.unwrap();
751        // No in-flight packets, ack should return false
752        assert!(!conn.ack_packet(0));
753    }
754
755    #[tokio::test]
756    async fn test_close_resets_flow() {
757        let mut conn = VCLConnection::bind("127.0.0.1:0").await.unwrap();
758        conn.close().unwrap();
759        assert!(conn.is_closed());
760    }
761}