Skip to main content

vcl_protocol/
connection.rs

1//! # VCL Connection
2//!
3//! [`VCLConnection`] is the main entry point for VCL Protocol.
4//! It manages the full lifecycle of a secure UDP connection:
5//!
6//! - X25519 ephemeral handshake
7//! - Packet encryption, signing, and chain validation
8//! - Replay protection
9//! - Session management (close, timeout)
10//! - Connection events via async mpsc channel
11//! - Ping / heartbeat with latency measurement
12//! - Mid-session key rotation
13//!
14//! ## Logging
15//!
16//! This module uses the [`tracing`] crate. To see logs in your application,
17//! add a subscriber such as `tracing-subscriber`:
18//!
19//! ```no_run
20//! tracing_subscriber::fmt::init();
21//! ```
22
23use crate::packet::{VCLPacket, PacketType};
24use crate::crypto::{KeyPair, encrypt_payload, decrypt_payload};
25use crate::handshake::{HandshakeMessage, create_client_hello, process_client_hello, process_server_hello};
26use crate::error::VCLError;
27use crate::event::VCLEvent;
28use ed25519_dalek::SigningKey;
29use x25519_dalek::{EphemeralSecret, PublicKey};
30use rand::rngs::OsRng;
31use tokio::net::UdpSocket;
32use tokio::sync::mpsc;
33use std::net::SocketAddr;
34use std::collections::HashSet;
35use std::time::Instant;
36use tracing::{debug, info, warn, error};
37
38/// A secure VCL Protocol connection over UDP.
39///
40/// Each connection manages its own cryptographic state:
41/// independent send/receive hash chains, nonce tracking,
42/// shared secret, and Ed25519 key pair.
43///
44/// # Example — Server
45///
46/// ```no_run
47/// use vcl_protocol::connection::VCLConnection;
48///
49/// #[tokio::main]
50/// async fn main() {
51///     let mut server = VCLConnection::bind("127.0.0.1:8080").await.unwrap();
52///     server.accept_handshake().await.unwrap();
53///
54///     loop {
55///         match server.recv().await {
56///             Ok(packet) => println!("{}", String::from_utf8_lossy(&packet.payload)),
57///             Err(e)     => { eprintln!("{}", e); break; }
58///         }
59///     }
60/// }
61/// ```
62///
63/// # Example — Client
64///
65/// ```no_run
66/// use vcl_protocol::connection::VCLConnection;
67///
68/// #[tokio::main]
69/// async fn main() {
70///     let mut client = VCLConnection::bind("127.0.0.1:0").await.unwrap();
71///     client.connect("127.0.0.1:8080").await.unwrap();
72///     client.send(b"Hello!").await.unwrap();
73///     client.close().unwrap();
74/// }
75/// ```
76pub struct VCLConnection {
77    socket: UdpSocket,
78    keypair: KeyPair,
79    send_sequence: u64,
80    send_hash: Vec<u8>,
81    recv_hash: Vec<u8>,
82    last_sequence: u64,
83    seen_nonces: HashSet<[u8; 24]>,
84    peer_addr: Option<SocketAddr>,
85    peer_public_key: Option<Vec<u8>>,
86    shared_secret: Option<[u8; 32]>,
87    #[allow(dead_code)]
88    is_server: bool,
89    closed: bool,
90    last_activity: Instant,
91    timeout_secs: u64,
92    event_tx: Option<mpsc::Sender<VCLEvent>>,
93    ping_sent_at: Option<Instant>,
94}
95
96impl VCLConnection {
97    /// Bind a new VCL connection to a local UDP address.
98    ///
99    /// Use `"127.0.0.1:0"` to let the OS assign a port (typical for clients).
100    ///
101    /// # Errors
102    /// Returns [`VCLError::IoError`] if the socket cannot be bound.
103    pub async fn bind(addr: &str) -> Result<Self, VCLError> {
104        let socket = UdpSocket::bind(addr).await?;
105        let local_addr = socket.local_addr()
106            .map(|a| a.to_string())
107            .unwrap_or_else(|_| addr.to_string());
108        info!(addr = %local_addr, "VCLConnection bound");
109        Ok(VCLConnection {
110            socket,
111            keypair: KeyPair::generate(),
112            send_sequence: 0,
113            send_hash: vec![0; 32],
114            recv_hash: vec![0; 32],
115            last_sequence: 0,
116            seen_nonces: HashSet::new(),
117            peer_addr: None,
118            peer_public_key: None,
119            shared_secret: None,
120            is_server: false,
121            closed: false,
122            last_activity: Instant::now(),
123            timeout_secs: 60,
124            event_tx: None,
125            ping_sent_at: None,
126        })
127    }
128
129    // ─── Events ──────────────────────────────────────────────────────────────
130
131    /// Subscribe to connection events.
132    ///
133    /// Returns an async `mpsc::Receiver<VCLEvent>` with a channel capacity of 64.
134    /// Call this **before** `connect()` or `accept_handshake()` to receive
135    /// the [`VCLEvent::Connected`] event.
136    ///
137    /// Events are sent with `try_send` — if the channel is full, events are dropped silently.
138    pub fn subscribe(&mut self) -> mpsc::Receiver<VCLEvent> {
139        debug!("Event subscription registered");
140        let (tx, rx) = mpsc::channel(64);
141        self.event_tx = Some(tx);
142        rx
143    }
144
145    fn emit(&self, event: VCLEvent) {
146        if let Some(tx) = &self.event_tx {
147            let _ = tx.try_send(event);
148        }
149    }
150
151    // ─── Configuration ────────────────────────────────────────────────────────
152
153    /// Set the inactivity timeout in seconds (default: 60).
154    ///
155    /// If no `send()` or `recv()` occurs within this duration,
156    /// the next operation returns [`VCLError::Timeout`].
157    /// Set to `0` to disable the timeout.
158    pub fn set_timeout(&mut self, secs: u64) {
159        debug!(timeout_secs = secs, "Inactivity timeout updated");
160        self.timeout_secs = secs;
161    }
162
163    /// Get the current inactivity timeout in seconds.
164    pub fn get_timeout(&self) -> u64 {
165        self.timeout_secs
166    }
167
168    /// Get the [`Instant`] of the last `send()` or `recv()` activity.
169    pub fn last_activity(&self) -> Instant {
170        self.last_activity
171    }
172
173    /// Override the Ed25519 signing key with a pre-shared key.
174    ///
175    /// ⚠️ **For testing only.** Never use a pre-shared key in production.
176    pub fn set_shared_key(&mut self, private_key: &[u8]) {
177        debug!("Pre-shared key set (testing mode)");
178        let key_bytes: &[u8; 32] = private_key.try_into().unwrap();
179        let signing_key = SigningKey::from_bytes(key_bytes);
180        let verifying_key = signing_key.verifying_key();
181        self.keypair.private_key = private_key.to_vec();
182        self.keypair.public_key = verifying_key.to_bytes().to_vec();
183    }
184
185    // ─── Handshake ────────────────────────────────────────────────────────────
186
187    /// Connect to a remote VCL server and perform the X25519 handshake.
188    ///
189    /// After this returns `Ok(())`, the connection is ready to `send()` and `recv()`.
190    /// Emits [`VCLEvent::Connected`] if subscribed.
191    ///
192    /// # Errors
193    /// - [`VCLError::IoError`] — socket or address error
194    /// - [`VCLError::HandshakeFailed`] — key exchange failed
195    /// - [`VCLError::ExpectedServerHello`] — unexpected handshake message
196    pub async fn connect(&mut self, addr: &str) -> Result<(), VCLError> {
197        info!(peer = %addr, "Initiating handshake (client)");
198        let parsed: SocketAddr = addr.parse()?;
199        self.peer_addr = Some(parsed);
200
201        let (hello_msg, ephemeral) = create_client_hello();
202        let hello_bytes = bincode::serialize(&hello_msg)?;
203        self.socket.send_to(&hello_bytes, parsed).await?;
204        debug!(peer = %addr, "ClientHello sent");
205
206        let mut buf = vec![0u8; 65535];
207        let (len, _) = self.socket.recv_from(&mut buf).await?;
208        let server_hello: HandshakeMessage = bincode::deserialize(&buf[..len])?;
209
210        match server_hello {
211            HandshakeMessage::ServerHello { public_key } => {
212                let shared = process_server_hello(ephemeral, public_key)
213                    .ok_or_else(|| VCLError::HandshakeFailed("Key exchange failed".to_string()))?;
214                self.shared_secret = Some(shared);
215                debug!(peer = %addr, "ServerHello received, shared secret established");
216            }
217            _ => {
218                warn!(peer = %addr, "Expected ServerHello, got unexpected message");
219                return Err(VCLError::ExpectedServerHello);
220            }
221        }
222
223        self.last_activity = Instant::now();
224        info!(peer = %addr, "Handshake complete (client)");
225        self.emit(VCLEvent::Connected);
226        Ok(())
227    }
228
229    /// Accept an incoming X25519 handshake from a client (server side).
230    ///
231    /// Blocks until a `ClientHello` is received.
232    /// After this returns `Ok(())`, the connection is ready to `send()` and `recv()`.
233    /// Emits [`VCLEvent::Connected`] if subscribed.
234    ///
235    /// # Errors
236    /// - [`VCLError::IoError`] — socket error
237    /// - [`VCLError::HandshakeFailed`] — key exchange failed
238    /// - [`VCLError::ExpectedClientHello`] — unexpected handshake message
239    pub async fn accept_handshake(&mut self) -> Result<(), VCLError> {
240        info!("Waiting for ClientHello (server)");
241        let ephemeral = EphemeralSecret::random_from_rng(OsRng);
242
243        let mut buf = vec![0u8; 65535];
244        let (len, addr) = self.socket.recv_from(&mut buf).await?;
245        self.peer_addr = Some(addr);
246        debug!(peer = %addr, "ClientHello received");
247
248        let client_hello: HandshakeMessage = bincode::deserialize(&buf[..len])?;
249
250        match client_hello {
251            HandshakeMessage::ClientHello { public_key } => {
252                let (server_hello, shared) = process_client_hello(ephemeral, public_key);
253                let hello_bytes = bincode::serialize(&server_hello)?;
254                self.socket.send_to(&hello_bytes, addr).await?;
255                debug!(peer = %addr, "ServerHello sent");
256                self.shared_secret = Some(
257                    shared.ok_or_else(|| VCLError::HandshakeFailed("Key exchange failed".to_string()))?
258                );
259                self.is_server = true;
260            }
261            _ => {
262                warn!(peer = %addr, "Expected ClientHello, got unexpected message");
263                return Err(VCLError::ExpectedClientHello);
264            }
265        }
266
267        self.last_activity = Instant::now();
268        info!(peer = %addr, "Handshake complete (server)");
269        self.emit(VCLEvent::Connected);
270        Ok(())
271    }
272
273    // ─── Internal send ────────────────────────────────────────────────────────
274
275    async fn send_internal(&mut self, data: &[u8], packet_type: PacketType) -> Result<(), VCLError> {
276        let key = self.shared_secret.ok_or(VCLError::NoSharedSecret)?;
277        let (encrypted_payload, nonce) = encrypt_payload(data, &key)?;
278
279        let mut packet = VCLPacket::new_typed(
280            self.send_sequence,
281            self.send_hash.clone(),
282            encrypted_payload,
283            nonce,
284            packet_type,
285        );
286        packet.sign(&self.keypair.private_key)?;
287
288        let serialized = packet.serialize();
289        let addr = self.peer_addr.ok_or(VCLError::NoPeerAddress)?;
290        self.socket.send_to(&serialized, addr).await?;
291
292        debug!(
293            peer = %addr,
294            seq = self.send_sequence,
295            size = data.len(),
296            packet_type = ?packet.packet_type,
297            "Packet sent"
298        );
299
300        self.send_hash = packet.compute_hash();
301        self.send_sequence += 1;
302        self.last_activity = Instant::now();
303        Ok(())
304    }
305
306    // ─── Public send ──────────────────────────────────────────────────────────
307
308    /// Encrypt, sign, and send a data packet to the peer.
309    ///
310    /// # Errors
311    /// - [`VCLError::ConnectionClosed`] — connection was closed
312    /// - [`VCLError::Timeout`] — inactivity timeout exceeded
313    /// - [`VCLError::NoSharedSecret`] — handshake not completed
314    /// - [`VCLError::NoPeerAddress`] — peer address unknown
315    /// - [`VCLError::IoError`] — socket error
316    pub async fn send(&mut self, data: &[u8]) -> Result<(), VCLError> {
317        if self.closed {
318            error!("send() called on closed connection");
319            return Err(VCLError::ConnectionClosed);
320        }
321        self.check_timeout()?;
322        self.send_internal(data, PacketType::Data).await
323    }
324
325    // ─── Ping / Heartbeat ─────────────────────────────────────────────────────
326
327    /// Send a ping to the peer to check liveness and measure round-trip latency.
328    ///
329    /// The pong reply is handled **transparently inside `recv()`** — you never
330    /// see Pong packets directly. Subscribe to events to receive
331    /// [`VCLEvent::PongReceived { latency }`](VCLEvent::PongReceived).
332    ///
333    /// ⚠️ You must keep calling `recv()` for the pong to be processed.
334    ///
335    /// # Errors
336    /// Same as [`send()`](VCLConnection::send).
337    pub async fn ping(&mut self) -> Result<(), VCLError> {
338        if self.closed {
339            error!("ping() called on closed connection");
340            return Err(VCLError::ConnectionClosed);
341        }
342        self.check_timeout()?;
343        debug!("Ping sent");
344        self.ping_sent_at = Some(Instant::now());
345        self.send_internal(&[], PacketType::Ping).await
346    }
347
348    async fn handle_ping(&mut self) -> Result<(), VCLError> {
349        debug!("Ping received, sending Pong");
350        self.send_internal(&[], PacketType::Pong).await?;
351        self.emit(VCLEvent::PingReceived);
352        Ok(())
353    }
354
355    fn handle_pong(&mut self) {
356        if let Some(sent_at) = self.ping_sent_at.take() {
357            let latency = sent_at.elapsed();
358            debug!(latency_us = latency.as_micros(), "Pong received");
359            self.emit(VCLEvent::PongReceived { latency });
360        }
361    }
362
363    // ─── Key Rotation ──────────────────────────────────────────────────────────
364
365    /// Initiate a mid-session key rotation using a fresh X25519 ephemeral exchange.
366    ///
367    /// Sends our new public key to the peer (encrypted with the **current** key),
368    /// waits for the peer's new public key, and atomically switches to the new
369    /// shared secret on both sides.
370    ///
371    /// Emits [`VCLEvent::KeyRotated`] on success.
372    ///
373    /// ⚠️ The peer must be actively calling `recv()` during rotation.
374    /// ⚠️ Do not call `send()` while `rotate_keys()` is awaiting a response.
375    ///
376    /// # Errors
377    /// - [`VCLError::ConnectionClosed`] / [`VCLError::Timeout`]
378    /// - [`VCLError::ChainValidationFailed`] / [`VCLError::SignatureInvalid`]
379    /// - [`VCLError::HandshakeFailed`] — unexpected packet type in response
380    /// - [`VCLError::InvalidPacket`] — malformed public key payload
381    pub async fn rotate_keys(&mut self) -> Result<(), VCLError> {
382        if self.closed {
383            error!("rotate_keys() called on closed connection");
384            return Err(VCLError::ConnectionClosed);
385        }
386        self.check_timeout()?;
387        info!("Initiating key rotation");
388
389        let our_ephemeral = EphemeralSecret::random_from_rng(OsRng);
390        let our_public = PublicKey::from(&our_ephemeral);
391
392        self.send_internal(&our_public.to_bytes(), PacketType::KeyRotation).await?;
393        debug!("KeyRotation request sent, waiting for response");
394
395        let mut buf = vec![0u8; 65535];
396        let (len, _) = self.socket.recv_from(&mut buf).await?;
397        let packet = VCLPacket::deserialize(&buf[..len])?;
398
399        if self.seen_nonces.contains(&packet.nonce) {
400            warn!("Replay detected during key rotation: duplicate nonce");
401            return Err(VCLError::ReplayDetected("Duplicate nonce in key rotation".to_string()));
402        }
403        self.seen_nonces.insert(packet.nonce);
404
405        if !packet.validate_chain(&self.recv_hash) {
406            warn!("Chain validation failed during key rotation");
407            return Err(VCLError::ChainValidationFailed);
408        }
409
410        let verify_key = self.peer_public_key.as_ref().unwrap_or(&self.keypair.public_key);
411        if !packet.verify(verify_key)? {
412            warn!("Signature invalid during key rotation");
413            return Err(VCLError::SignatureInvalid);
414        }
415
416        self.recv_hash = packet.compute_hash();
417        self.last_sequence = packet.sequence;
418        self.last_activity = Instant::now();
419
420        let old_key = self.shared_secret.ok_or(VCLError::NoSharedSecret)?;
421        let decrypted = decrypt_payload(&packet.payload, &old_key, &packet.nonce)?;
422
423        if packet.packet_type != PacketType::KeyRotation {
424            warn!("Expected KeyRotation response, got {:?}", packet.packet_type);
425            return Err(VCLError::HandshakeFailed("Expected KeyRotation response".to_string()));
426        }
427        if decrypted.len() != 32 {
428            warn!("KeyRotation payload has wrong length: {}", decrypted.len());
429            return Err(VCLError::InvalidPacket("KeyRotation payload must be 32 bytes".to_string()));
430        }
431
432        let their_bytes: [u8; 32] = decrypted
433            .try_into()
434            .map_err(|_| VCLError::InvalidPacket("Invalid peer pubkey".to_string()))?;
435        let their_pubkey = PublicKey::from(their_bytes);
436        let new_secret = our_ephemeral.diffie_hellman(&their_pubkey);
437        self.shared_secret = Some(new_secret.to_bytes());
438        info!("Key rotation complete");
439        self.emit(VCLEvent::KeyRotated);
440        Ok(())
441    }
442
443    async fn handle_key_rotation_request(&mut self, their_pubkey_bytes: &[u8]) -> Result<(), VCLError> {
444        debug!("KeyRotation request received, processing");
445        if their_pubkey_bytes.len() != 32 {
446            warn!("KeyRotation payload has wrong length: {}", their_pubkey_bytes.len());
447            return Err(VCLError::InvalidPacket("KeyRotation payload must be 32 bytes".to_string()));
448        }
449
450        let their_bytes: [u8; 32] = their_pubkey_bytes
451            .try_into()
452            .map_err(|_| VCLError::InvalidPacket("Invalid peer pubkey".to_string()))?;
453        let their_pubkey = PublicKey::from(their_bytes);
454
455        let our_ephemeral = EphemeralSecret::random_from_rng(OsRng);
456        let our_public = PublicKey::from(&our_ephemeral);
457        let new_secret = our_ephemeral.diffie_hellman(&their_pubkey);
458
459        self.send_internal(&our_public.to_bytes(), PacketType::KeyRotation).await?;
460        debug!("KeyRotation response sent");
461
462        self.shared_secret = Some(new_secret.to_bytes());
463        info!("Key rotation complete (responder)");
464        self.emit(VCLEvent::KeyRotated);
465        Ok(())
466    }
467
468    // ─── Receive ──────────────────────────────────────────────────────────────
469
470    /// Receive the next data packet from the peer.
471    ///
472    /// Control packets (`Ping`, `Pong`, `KeyRotation`) are handled
473    /// **transparently** — this method loops internally until a `Data` packet arrives.
474    ///
475    /// On success, `packet.payload` contains the **decrypted** data.
476    ///
477    /// # Errors
478    /// - [`VCLError::ConnectionClosed`] — connection was closed
479    /// - [`VCLError::Timeout`] — inactivity timeout exceeded
480    /// - [`VCLError::ReplayDetected`] — duplicate sequence or nonce
481    /// - [`VCLError::ChainValidationFailed`] — hash chain broken
482    /// - [`VCLError::SignatureInvalid`] — Ed25519 signature mismatch
483    /// - [`VCLError::CryptoError`] — decryption failed
484    /// - [`VCLError::IoError`] — socket error
485    pub async fn recv(&mut self) -> Result<VCLPacket, VCLError> {
486        if self.closed {
487            error!("recv() called on closed connection");
488            return Err(VCLError::ConnectionClosed);
489        }
490
491        loop {
492            self.check_timeout()?;
493
494            let mut buf = vec![0u8; 65535];
495            let (len, addr) = self.socket.recv_from(&mut buf).await?;
496            if self.peer_addr.is_none() {
497                self.peer_addr = Some(addr);
498            }
499
500            let packet = VCLPacket::deserialize(&buf[..len])?;
501
502            if self.last_sequence > 0 && packet.sequence <= self.last_sequence {
503                warn!(
504                    seq = packet.sequence,
505                    last_seq = self.last_sequence,
506                    "Replay detected: old sequence number"
507                );
508                return Err(VCLError::ReplayDetected("Old sequence number".to_string()));
509            }
510            if self.seen_nonces.contains(&packet.nonce) {
511                warn!(seq = packet.sequence, "Replay detected: duplicate nonce");
512                return Err(VCLError::ReplayDetected("Duplicate nonce".to_string()));
513            }
514            self.seen_nonces.insert(packet.nonce);
515            if self.seen_nonces.len() > 1000 {
516                debug!("Nonce window full, clearing");
517                self.seen_nonces.clear();
518            }
519
520            if !packet.validate_chain(&self.recv_hash) {
521                warn!(seq = packet.sequence, "Chain validation failed");
522                return Err(VCLError::ChainValidationFailed);
523            }
524
525            let verify_key = self.peer_public_key.as_ref().unwrap_or(&self.keypair.public_key);
526            if !packet.verify(verify_key)? {
527                warn!(seq = packet.sequence, "Signature invalid");
528                return Err(VCLError::SignatureInvalid);
529            }
530
531            self.recv_hash = packet.compute_hash();
532            self.last_sequence = packet.sequence;
533            self.last_activity = Instant::now();
534
535            let key = self.shared_secret.ok_or(VCLError::NoSharedSecret)?;
536            let decrypted = decrypt_payload(&packet.payload, &key, &packet.nonce)?;
537
538            match packet.packet_type {
539                PacketType::Data => {
540                    debug!(
541                        peer = %addr,
542                        seq = packet.sequence,
543                        size = decrypted.len(),
544                        "Data packet received"
545                    );
546                    self.emit(VCLEvent::PacketReceived {
547                        sequence: packet.sequence,
548                        size: decrypted.len(),
549                    });
550                    return Ok(VCLPacket {
551                        version: packet.version,
552                        packet_type: PacketType::Data,
553                        sequence: packet.sequence,
554                        prev_hash: packet.prev_hash,
555                        nonce: packet.nonce,
556                        payload: decrypted,
557                        signature: packet.signature,
558                    });
559                }
560                PacketType::Ping => { self.handle_ping().await?; }
561                PacketType::Pong => { self.handle_pong(); }
562                PacketType::KeyRotation => {
563                    self.handle_key_rotation_request(&decrypted).await?;
564                }
565            }
566        }
567    }
568
569    // ─── Session management ───────────────────────────────────────────────────
570
571    fn check_timeout(&self) -> Result<(), VCLError> {
572        if self.last_activity.elapsed().as_secs() > self.timeout_secs {
573            warn!(
574                elapsed_secs = self.last_activity.elapsed().as_secs(),
575                timeout_secs = self.timeout_secs,
576                "Connection timed out"
577            );
578            return Err(VCLError::Timeout);
579        }
580        Ok(())
581    }
582
583    /// Gracefully close the connection and clear all cryptographic state.
584    ///
585    /// After calling `close()`, all further operations return [`VCLError::ConnectionClosed`].
586    /// Emits [`VCLEvent::Disconnected`] if subscribed.
587    ///
588    /// # Errors
589    /// Returns [`VCLError::ConnectionClosed`] if already closed.
590    pub fn close(&mut self) -> Result<(), VCLError> {
591        if self.closed {
592            warn!("close() called on already closed connection");
593            return Err(VCLError::ConnectionClosed);
594        }
595        info!("Connection closed");
596        self.closed = true;
597        self.send_sequence = 0;
598        self.send_hash = vec![0; 32];
599        self.recv_hash = vec![0; 32];
600        self.last_sequence = 0;
601        self.seen_nonces.clear();
602        self.shared_secret = None;
603        self.ping_sent_at = None;
604        self.emit(VCLEvent::Disconnected);
605        Ok(())
606    }
607
608    /// Returns `true` if the connection has been closed.
609    pub fn is_closed(&self) -> bool {
610        self.closed
611    }
612
613    /// Get the local Ed25519 public key (32 bytes).
614    pub fn get_public_key(&self) -> Vec<u8> {
615        self.keypair.public_key.clone()
616    }
617
618    /// Get the current X25519 shared secret, or `None` if the handshake
619    /// has not completed or the connection is closed.
620    pub fn get_shared_secret(&self) -> Option<[u8; 32]> {
621        self.shared_secret
622    }
623}