1use crate::packet::{VCLPacket, PacketType};
24use crate::crypto::{KeyPair, encrypt_payload, decrypt_payload};
25use crate::handshake::{HandshakeMessage, create_client_hello, process_client_hello, process_server_hello};
26use crate::error::VCLError;
27use crate::event::VCLEvent;
28use crate::config::VCLConfig;
29use crate::flow::FlowController;
30use crate::fragment::{Fragment, Fragmenter, Reassembler};
31use ed25519_dalek::SigningKey;
32use x25519_dalek::{EphemeralSecret, PublicKey};
33use rand::rngs::OsRng;
34use tokio::net::UdpSocket;
35use tokio::sync::mpsc;
36use std::net::SocketAddr;
37use std::collections::HashSet;
38use std::time::Instant;
39use tracing::{debug, info, warn, error};
40
41pub struct VCLConnection {
80 socket: UdpSocket,
81 config: VCLConfig,
82 keypair: KeyPair,
83 send_sequence: u64,
84 send_hash: Vec<u8>,
85 recv_hash: Vec<u8>,
86 last_sequence: u64,
87 seen_nonces: HashSet<[u8; 24]>,
88 peer_addr: Option<SocketAddr>,
89 peer_public_key: Option<Vec<u8>>,
90 shared_secret: Option<[u8; 32]>,
91 #[allow(dead_code)]
92 is_server: bool,
93 closed: bool,
94 last_activity: Instant,
95 timeout_secs: u64,
96 event_tx: Option<mpsc::Sender<VCLEvent>>,
97 ping_sent_at: Option<Instant>,
98 flow: FlowController,
99 reassembler: Reassembler,
100 fragment_id: u64,
101}
102
103impl VCLConnection {
104 pub async fn bind(addr: &str) -> Result<Self, VCLError> {
111 Self::bind_with_config(addr, VCLConfig::default()).await
112 }
113
114 pub async fn bind_with_config(addr: &str, config: VCLConfig) -> Result<Self, VCLError> {
122 let socket = UdpSocket::bind(addr).await?;
123 let local_addr = socket.local_addr()
124 .map(|a| a.to_string())
125 .unwrap_or_else(|_| addr.to_string());
126 info!(
127 addr = %local_addr,
128 transport = "udp",
129 reliability = ?config.reliability,
130 fragment_size = config.fragment_size,
131 "VCLConnection bound"
132 );
133 let flow = FlowController::new(config.flow_window_size);
134 Ok(VCLConnection {
135 socket,
136 config,
137 keypair: KeyPair::generate(),
138 send_sequence: 0,
139 send_hash: vec![0; 32],
140 recv_hash: vec![0; 32],
141 last_sequence: 0,
142 seen_nonces: HashSet::new(),
143 peer_addr: None,
144 peer_public_key: None,
145 shared_secret: None,
146 is_server: false,
147 closed: false,
148 last_activity: Instant::now(),
149 timeout_secs: 60,
150 event_tx: None,
151 ping_sent_at: None,
152 flow,
153 reassembler: Reassembler::new(),
154 fragment_id: 0,
155 })
156 }
157
158 pub fn subscribe(&mut self) -> mpsc::Receiver<VCLEvent> {
168 debug!("Event subscription registered");
169 let (tx, rx) = mpsc::channel(64);
170 self.event_tx = Some(tx);
171 rx
172 }
173
174 fn emit(&self, event: VCLEvent) {
175 if let Some(tx) = &self.event_tx {
176 let _ = tx.try_send(event);
177 }
178 }
179
180 pub fn set_timeout(&mut self, secs: u64) {
186 debug!(timeout_secs = secs, "Inactivity timeout updated");
187 self.timeout_secs = secs;
188 }
189
190 pub fn get_timeout(&self) -> u64 {
192 self.timeout_secs
193 }
194
195 pub fn last_activity(&self) -> Instant {
197 self.last_activity
198 }
199
200 pub fn get_config(&self) -> &VCLConfig {
202 &self.config
203 }
204
205 pub fn flow(&self) -> &FlowController {
207 &self.flow
208 }
209
210 pub fn ack_packet(&mut self, sequence: u64) -> bool {
214 self.flow.on_ack(sequence)
215 }
216
217 pub fn set_shared_key(&mut self, private_key: &[u8]) {
221 debug!("Pre-shared key set (testing mode)");
222 let key_bytes: &[u8; 32] = private_key.try_into().unwrap();
223 let signing_key = SigningKey::from_bytes(key_bytes);
224 let verifying_key = signing_key.verifying_key();
225 self.keypair.private_key = private_key.to_vec();
226 self.keypair.public_key = verifying_key.to_bytes().to_vec();
227 }
228
229 pub async fn connect(&mut self, addr: &str) -> Result<(), VCLError> {
241 info!(peer = %addr, "Initiating handshake (client)");
242 let parsed: SocketAddr = addr.parse()?;
243 self.peer_addr = Some(parsed);
244
245 let (hello_msg, ephemeral) = create_client_hello();
246 let hello_bytes = bincode::serialize(&hello_msg)?;
247 self.socket.send_to(&hello_bytes, parsed).await?;
248 debug!(peer = %addr, "ClientHello sent");
249
250 let mut buf = vec![0u8; 65535];
251 let (len, _) = self.socket.recv_from(&mut buf).await?;
252 let server_hello: HandshakeMessage = bincode::deserialize(&buf[..len])?;
253
254 match server_hello {
255 HandshakeMessage::ServerHello { public_key } => {
256 let shared = process_server_hello(ephemeral, public_key)
257 .ok_or_else(|| VCLError::HandshakeFailed("Key exchange failed".to_string()))?;
258 self.shared_secret = Some(shared);
259 debug!(peer = %addr, "ServerHello received, shared secret established");
260 }
261 _ => {
262 warn!(peer = %addr, "Expected ServerHello, got unexpected message");
263 return Err(VCLError::ExpectedServerHello);
264 }
265 }
266
267 self.last_activity = Instant::now();
268 info!(peer = %addr, "Handshake complete (client)");
269 self.emit(VCLEvent::Connected);
270 Ok(())
271 }
272
273 pub async fn accept_handshake(&mut self) -> Result<(), VCLError> {
284 info!("Waiting for ClientHello (server)");
285 let ephemeral = EphemeralSecret::random_from_rng(OsRng);
286
287 let mut buf = vec![0u8; 65535];
288 let (len, addr) = self.socket.recv_from(&mut buf).await?;
289 self.peer_addr = Some(addr);
290 debug!(peer = %addr, "ClientHello received");
291
292 let client_hello: HandshakeMessage = bincode::deserialize(&buf[..len])?;
293
294 match client_hello {
295 HandshakeMessage::ClientHello { public_key } => {
296 let (server_hello, shared) = process_client_hello(ephemeral, public_key);
297 let hello_bytes = bincode::serialize(&server_hello)?;
298 self.socket.send_to(&hello_bytes, addr).await?;
299 debug!(peer = %addr, "ServerHello sent");
300 self.shared_secret = Some(
301 shared.ok_or_else(|| VCLError::HandshakeFailed("Key exchange failed".to_string()))?
302 );
303 self.is_server = true;
304 }
305 _ => {
306 warn!(peer = %addr, "Expected ClientHello, got unexpected message");
307 return Err(VCLError::ExpectedClientHello);
308 }
309 }
310
311 self.last_activity = Instant::now();
312 info!(peer = %addr, "Handshake complete (server)");
313 self.emit(VCLEvent::Connected);
314 Ok(())
315 }
316
317 async fn send_internal(&mut self, data: &[u8], packet_type: PacketType) -> Result<(), VCLError> {
320 let key = self.shared_secret.ok_or(VCLError::NoSharedSecret)?;
321 let (encrypted_payload, nonce) = encrypt_payload(data, &key)?;
322
323 let mut packet = VCLPacket::new_typed(
324 self.send_sequence,
325 self.send_hash.clone(),
326 encrypted_payload,
327 nonce,
328 packet_type,
329 );
330 packet.sign(&self.keypair.private_key)?;
331
332 let serialized = packet.serialize();
333 let addr = self.peer_addr.ok_or(VCLError::NoPeerAddress)?;
334 self.socket.send_to(&serialized, addr).await?;
335
336 debug!(
337 peer = %addr,
338 seq = self.send_sequence,
339 size = data.len(),
340 packet_type = ?packet.packet_type,
341 "Packet sent"
342 );
343
344 self.flow.on_send(self.send_sequence, data.to_vec());
345 self.send_hash = packet.compute_hash();
346 self.send_sequence += 1;
347 self.last_activity = Instant::now();
348 Ok(())
349 }
350
351 pub async fn send(&mut self, data: &[u8]) -> Result<(), VCLError> {
365 if self.closed {
366 error!("send() called on closed connection");
367 return Err(VCLError::ConnectionClosed);
368 }
369 self.check_timeout()?;
370
371 if !self.flow.can_send() {
372 warn!(
373 in_flight = self.flow.in_flight_count(),
374 window = self.flow.window_size(),
375 "Flow control window full"
376 );
377 }
378
379 if self.config.needs_fragmentation(data.len()) {
380 debug!(
381 size = data.len(),
382 fragment_size = self.config.fragment_size,
383 "Auto-fragmenting payload"
384 );
385 let frags = Fragmenter::split(data, self.config.fragment_size, self.fragment_id);
386 self.fragment_id += 1;
387 for frag in frags {
388 let frag_bytes = bincode::serialize(&frag)
389 .map_err(|e| VCLError::SerializationError(e.to_string()))?;
390 self.send_internal(&frag_bytes, PacketType::Fragment).await?;
391 }
392 } else {
393 self.send_internal(data, PacketType::Data).await?;
394 }
395 Ok(())
396 }
397
398 pub async fn ping(&mut self) -> Result<(), VCLError> {
407 if self.closed {
408 error!("ping() called on closed connection");
409 return Err(VCLError::ConnectionClosed);
410 }
411 self.check_timeout()?;
412 debug!("Ping sent");
413 self.ping_sent_at = Some(Instant::now());
414 self.send_internal(&[], PacketType::Ping).await
415 }
416
417 async fn handle_ping(&mut self) -> Result<(), VCLError> {
418 debug!("Ping received, sending Pong");
419 self.send_internal(&[], PacketType::Pong).await?;
420 self.emit(VCLEvent::PingReceived);
421 Ok(())
422 }
423
424 fn handle_pong(&mut self) {
425 if let Some(sent_at) = self.ping_sent_at.take() {
426 let latency = sent_at.elapsed();
427 debug!(latency_us = latency.as_micros(), "Pong received");
428 self.emit(VCLEvent::PongReceived { latency });
429 }
430 }
431
432 pub async fn rotate_keys(&mut self) -> Result<(), VCLError> {
441 if self.closed {
442 error!("rotate_keys() called on closed connection");
443 return Err(VCLError::ConnectionClosed);
444 }
445 self.check_timeout()?;
446 info!("Initiating key rotation");
447
448 let our_ephemeral = EphemeralSecret::random_from_rng(OsRng);
449 let our_public = PublicKey::from(&our_ephemeral);
450
451 self.send_internal(&our_public.to_bytes(), PacketType::KeyRotation).await?;
452 debug!("KeyRotation request sent, waiting for response");
453
454 let mut buf = vec![0u8; 65535];
455 let (len, _) = self.socket.recv_from(&mut buf).await?;
456 let packet = VCLPacket::deserialize(&buf[..len])?;
457
458 if self.seen_nonces.contains(&packet.nonce) {
459 warn!("Replay detected during key rotation: duplicate nonce");
460 return Err(VCLError::ReplayDetected("Duplicate nonce in key rotation".to_string()));
461 }
462 self.seen_nonces.insert(packet.nonce);
463
464 if !packet.validate_chain(&self.recv_hash) {
465 warn!("Chain validation failed during key rotation");
466 return Err(VCLError::ChainValidationFailed);
467 }
468
469 let verify_key = self.peer_public_key.as_ref().unwrap_or(&self.keypair.public_key);
470 if !packet.verify(verify_key)? {
471 warn!("Signature invalid during key rotation");
472 return Err(VCLError::SignatureInvalid);
473 }
474
475 self.recv_hash = packet.compute_hash();
476 self.last_sequence = packet.sequence;
477 self.last_activity = Instant::now();
478
479 let old_key = self.shared_secret.ok_or(VCLError::NoSharedSecret)?;
480 let decrypted = decrypt_payload(&packet.payload, &old_key, &packet.nonce)?;
481
482 if packet.packet_type != PacketType::KeyRotation {
483 warn!("Expected KeyRotation response, got {:?}", packet.packet_type);
484 return Err(VCLError::HandshakeFailed("Expected KeyRotation response".to_string()));
485 }
486 if decrypted.len() != 32 {
487 warn!("KeyRotation payload has wrong length: {}", decrypted.len());
488 return Err(VCLError::InvalidPacket("KeyRotation payload must be 32 bytes".to_string()));
489 }
490
491 let their_bytes: [u8; 32] = decrypted
492 .try_into()
493 .map_err(|_| VCLError::InvalidPacket("Invalid peer pubkey".to_string()))?;
494 let their_pubkey = PublicKey::from(their_bytes);
495 let new_secret = our_ephemeral.diffie_hellman(&their_pubkey);
496 self.shared_secret = Some(new_secret.to_bytes());
497 info!("Key rotation complete");
498 self.emit(VCLEvent::KeyRotated);
499 Ok(())
500 }
501
502 async fn handle_key_rotation_request(&mut self, their_pubkey_bytes: &[u8]) -> Result<(), VCLError> {
503 debug!("KeyRotation request received, processing");
504 if their_pubkey_bytes.len() != 32 {
505 warn!("KeyRotation payload has wrong length: {}", their_pubkey_bytes.len());
506 return Err(VCLError::InvalidPacket("KeyRotation payload must be 32 bytes".to_string()));
507 }
508
509 let their_bytes: [u8; 32] = their_pubkey_bytes
510 .try_into()
511 .map_err(|_| VCLError::InvalidPacket("Invalid peer pubkey".to_string()))?;
512 let their_pubkey = PublicKey::from(their_bytes);
513
514 let our_ephemeral = EphemeralSecret::random_from_rng(OsRng);
515 let our_public = PublicKey::from(&our_ephemeral);
516 let new_secret = our_ephemeral.diffie_hellman(&their_pubkey);
517
518 self.send_internal(&our_public.to_bytes(), PacketType::KeyRotation).await?;
519 debug!("KeyRotation response sent");
520
521 self.shared_secret = Some(new_secret.to_bytes());
522 info!("Key rotation complete (responder)");
523 self.emit(VCLEvent::KeyRotated);
524 Ok(())
525 }
526
527 pub async fn recv(&mut self) -> Result<VCLPacket, VCLError> {
546 if self.closed {
547 error!("recv() called on closed connection");
548 return Err(VCLError::ConnectionClosed);
549 }
550
551 loop {
552 self.check_timeout()?;
553
554 let mut buf = vec![0u8; 65535];
555 let (len, addr) = self.socket.recv_from(&mut buf).await?;
556 if self.peer_addr.is_none() {
557 self.peer_addr = Some(addr);
558 }
559
560 let packet = VCLPacket::deserialize(&buf[..len])?;
561
562 if self.last_sequence > 0 && packet.sequence <= self.last_sequence {
563 warn!(
564 seq = packet.sequence,
565 last_seq = self.last_sequence,
566 "Replay detected: old sequence number"
567 );
568 return Err(VCLError::ReplayDetected("Old sequence number".to_string()));
569 }
570 if self.seen_nonces.contains(&packet.nonce) {
571 warn!(seq = packet.sequence, "Replay detected: duplicate nonce");
572 return Err(VCLError::ReplayDetected("Duplicate nonce".to_string()));
573 }
574 self.seen_nonces.insert(packet.nonce);
575 if self.seen_nonces.len() > 1000 {
576 debug!("Nonce window full, clearing");
577 self.seen_nonces.clear();
578 }
579
580 if !packet.validate_chain(&self.recv_hash) {
581 warn!(seq = packet.sequence, "Chain validation failed");
582 return Err(VCLError::ChainValidationFailed);
583 }
584
585 let verify_key = self.peer_public_key.as_ref().unwrap_or(&self.keypair.public_key);
586 if !packet.verify(verify_key)? {
587 warn!(seq = packet.sequence, "Signature invalid");
588 return Err(VCLError::SignatureInvalid);
589 }
590
591 self.recv_hash = packet.compute_hash();
592 self.last_sequence = packet.sequence;
593 self.last_activity = Instant::now();
594
595 let key = self.shared_secret.ok_or(VCLError::NoSharedSecret)?;
596 let decrypted = decrypt_payload(&packet.payload, &key, &packet.nonce)?;
597
598 match packet.packet_type {
599 PacketType::Data => {
600 if let Some(seq) = self.flow.oldest_unacked_sequence() {
602 self.flow.on_ack(seq);
603 }
604 debug!(
605 peer = %addr,
606 seq = packet.sequence,
607 size = decrypted.len(),
608 "Data packet received"
609 );
610 self.emit(VCLEvent::PacketReceived {
611 sequence: packet.sequence,
612 size: decrypted.len(),
613 });
614 return Ok(VCLPacket {
615 version: packet.version,
616 packet_type: PacketType::Data,
617 sequence: packet.sequence,
618 prev_hash: packet.prev_hash,
619 nonce: packet.nonce,
620 payload: decrypted,
621 signature: packet.signature,
622 });
623 }
624 PacketType::Ping => { self.handle_ping().await?; }
625 PacketType::Pong => { self.handle_pong(); }
626 PacketType::KeyRotation => {
627 self.handle_key_rotation_request(&decrypted).await?;
628 }
629 PacketType::Fragment => {
630 if let Some(seq) = self.flow.oldest_unacked_sequence() {
632 self.flow.on_ack(seq);
633 }
634 let frag: Fragment = bincode::deserialize(&decrypted)
635 .map_err(|e| VCLError::SerializationError(e.to_string()))?;
636 debug!(
637 fragment_id = frag.fragment_id,
638 index = frag.fragment_index,
639 total = frag.total_fragments,
640 "Fragment received"
641 );
642 if let Some(reassembled) = self.reassembler.add(frag) {
643 info!(size = reassembled.len(), "Fragment reassembly complete");
644 self.emit(VCLEvent::PacketReceived {
645 sequence: packet.sequence,
646 size: reassembled.len(),
647 });
648 return Ok(VCLPacket {
649 version: packet.version,
650 packet_type: PacketType::Data,
651 sequence: packet.sequence,
652 prev_hash: packet.prev_hash,
653 nonce: packet.nonce,
654 payload: reassembled,
655 signature: packet.signature,
656 });
657 }
658 }
660 }
661 }
662 }
663
664 fn check_timeout(&self) -> Result<(), VCLError> {
667 if self.last_activity.elapsed().as_secs() > self.timeout_secs {
668 warn!(
669 elapsed_secs = self.last_activity.elapsed().as_secs(),
670 timeout_secs = self.timeout_secs,
671 "Connection timed out"
672 );
673 return Err(VCLError::Timeout);
674 }
675 Ok(())
676 }
677
678 pub fn close(&mut self) -> Result<(), VCLError> {
686 if self.closed {
687 warn!("close() called on already closed connection");
688 return Err(VCLError::ConnectionClosed);
689 }
690 info!("Connection closed");
691 self.closed = true;
692 self.send_sequence = 0;
693 self.send_hash = vec![0; 32];
694 self.recv_hash = vec![0; 32];
695 self.last_sequence = 0;
696 self.seen_nonces.clear();
697 self.shared_secret = None;
698 self.ping_sent_at = None;
699 self.flow.reset();
700 self.reassembler.cleanup();
701 self.emit(VCLEvent::Disconnected);
702 Ok(())
703 }
704
705 pub fn is_closed(&self) -> bool {
707 self.closed
708 }
709
710 pub fn get_public_key(&self) -> Vec<u8> {
712 self.keypair.public_key.clone()
713 }
714
715 pub fn get_shared_secret(&self) -> Option<[u8; 32]> {
718 self.shared_secret
719 }
720}
721
722#[cfg(test)]
723mod tests {
724 use super::*;
725
726 #[tokio::test]
727 async fn test_bind_default_config() {
728 let conn = VCLConnection::bind("127.0.0.1:0").await.unwrap();
729 assert!(!conn.is_closed());
730 assert_eq!(conn.get_config().flow_window_size, 64);
731 }
732
733 #[tokio::test]
734 async fn test_bind_with_vpn_config() {
735 let conn = VCLConnection::bind_with_config("127.0.0.1:0", VCLConfig::vpn()).await.unwrap();
736 assert!(!conn.is_closed());
737 assert_eq!(conn.get_config().fragment_size, 1200);
738 }
739
740 #[tokio::test]
741 async fn test_flow_initial_state() {
742 let conn = VCLConnection::bind("127.0.0.1:0").await.unwrap();
743 assert!(conn.flow().can_send());
744 assert_eq!(conn.flow().in_flight_count(), 0);
745 assert_eq!(conn.flow().total_sent(), 0);
746 }
747
748 #[tokio::test]
749 async fn test_ack_packet() {
750 let mut conn = VCLConnection::bind("127.0.0.1:0").await.unwrap();
751 assert!(!conn.ack_packet(0));
753 }
754
755 #[tokio::test]
756 async fn test_close_resets_flow() {
757 let mut conn = VCLConnection::bind("127.0.0.1:0").await.unwrap();
758 conn.close().unwrap();
759 assert!(conn.is_closed());
760 }
761}