1use crate::packet::{VCLPacket, PacketType};
24use crate::crypto::{KeyPair, encrypt_payload, decrypt_payload};
25use crate::handshake::{HandshakeMessage, create_client_hello, process_client_hello, process_server_hello};
26use crate::error::VCLError;
27use crate::event::VCLEvent;
28use ed25519_dalek::SigningKey;
29use x25519_dalek::{EphemeralSecret, PublicKey};
30use rand::rngs::OsRng;
31use tokio::net::UdpSocket;
32use tokio::sync::mpsc;
33use std::net::SocketAddr;
34use std::collections::HashSet;
35use std::time::Instant;
36use tracing::{debug, info, warn, error};
37
38pub struct VCLConnection {
77 socket: UdpSocket,
78 keypair: KeyPair,
79 send_sequence: u64,
80 send_hash: Vec<u8>,
81 recv_hash: Vec<u8>,
82 last_sequence: u64,
83 seen_nonces: HashSet<[u8; 24]>,
84 peer_addr: Option<SocketAddr>,
85 peer_public_key: Option<Vec<u8>>,
86 shared_secret: Option<[u8; 32]>,
87 #[allow(dead_code)]
88 is_server: bool,
89 closed: bool,
90 last_activity: Instant,
91 timeout_secs: u64,
92 event_tx: Option<mpsc::Sender<VCLEvent>>,
93 ping_sent_at: Option<Instant>,
94}
95
96impl VCLConnection {
97 pub async fn bind(addr: &str) -> Result<Self, VCLError> {
104 let socket = UdpSocket::bind(addr).await?;
105 let local_addr = socket.local_addr()
106 .map(|a| a.to_string())
107 .unwrap_or_else(|_| addr.to_string());
108 info!(addr = %local_addr, "VCLConnection bound");
109 Ok(VCLConnection {
110 socket,
111 keypair: KeyPair::generate(),
112 send_sequence: 0,
113 send_hash: vec![0; 32],
114 recv_hash: vec![0; 32],
115 last_sequence: 0,
116 seen_nonces: HashSet::new(),
117 peer_addr: None,
118 peer_public_key: None,
119 shared_secret: None,
120 is_server: false,
121 closed: false,
122 last_activity: Instant::now(),
123 timeout_secs: 60,
124 event_tx: None,
125 ping_sent_at: None,
126 })
127 }
128
129 pub fn subscribe(&mut self) -> mpsc::Receiver<VCLEvent> {
139 debug!("Event subscription registered");
140 let (tx, rx) = mpsc::channel(64);
141 self.event_tx = Some(tx);
142 rx
143 }
144
145 fn emit(&self, event: VCLEvent) {
146 if let Some(tx) = &self.event_tx {
147 let _ = tx.try_send(event);
148 }
149 }
150
151 pub fn set_timeout(&mut self, secs: u64) {
159 debug!(timeout_secs = secs, "Inactivity timeout updated");
160 self.timeout_secs = secs;
161 }
162
163 pub fn get_timeout(&self) -> u64 {
165 self.timeout_secs
166 }
167
168 pub fn last_activity(&self) -> Instant {
170 self.last_activity
171 }
172
173 pub fn set_shared_key(&mut self, private_key: &[u8]) {
177 debug!("Pre-shared key set (testing mode)");
178 let key_bytes: &[u8; 32] = private_key.try_into().unwrap();
179 let signing_key = SigningKey::from_bytes(key_bytes);
180 let verifying_key = signing_key.verifying_key();
181 self.keypair.private_key = private_key.to_vec();
182 self.keypair.public_key = verifying_key.to_bytes().to_vec();
183 }
184
185 pub async fn connect(&mut self, addr: &str) -> Result<(), VCLError> {
197 info!(peer = %addr, "Initiating handshake (client)");
198 let parsed: SocketAddr = addr.parse()?;
199 self.peer_addr = Some(parsed);
200
201 let (hello_msg, ephemeral) = create_client_hello();
202 let hello_bytes = bincode::serialize(&hello_msg)?;
203 self.socket.send_to(&hello_bytes, parsed).await?;
204 debug!(peer = %addr, "ClientHello sent");
205
206 let mut buf = vec![0u8; 65535];
207 let (len, _) = self.socket.recv_from(&mut buf).await?;
208 let server_hello: HandshakeMessage = bincode::deserialize(&buf[..len])?;
209
210 match server_hello {
211 HandshakeMessage::ServerHello { public_key } => {
212 let shared = process_server_hello(ephemeral, public_key)
213 .ok_or_else(|| VCLError::HandshakeFailed("Key exchange failed".to_string()))?;
214 self.shared_secret = Some(shared);
215 debug!(peer = %addr, "ServerHello received, shared secret established");
216 }
217 _ => {
218 warn!(peer = %addr, "Expected ServerHello, got unexpected message");
219 return Err(VCLError::ExpectedServerHello);
220 }
221 }
222
223 self.last_activity = Instant::now();
224 info!(peer = %addr, "Handshake complete (client)");
225 self.emit(VCLEvent::Connected);
226 Ok(())
227 }
228
229 pub async fn accept_handshake(&mut self) -> Result<(), VCLError> {
240 info!("Waiting for ClientHello (server)");
241 let ephemeral = EphemeralSecret::random_from_rng(OsRng);
242
243 let mut buf = vec![0u8; 65535];
244 let (len, addr) = self.socket.recv_from(&mut buf).await?;
245 self.peer_addr = Some(addr);
246 debug!(peer = %addr, "ClientHello received");
247
248 let client_hello: HandshakeMessage = bincode::deserialize(&buf[..len])?;
249
250 match client_hello {
251 HandshakeMessage::ClientHello { public_key } => {
252 let (server_hello, shared) = process_client_hello(ephemeral, public_key);
253 let hello_bytes = bincode::serialize(&server_hello)?;
254 self.socket.send_to(&hello_bytes, addr).await?;
255 debug!(peer = %addr, "ServerHello sent");
256 self.shared_secret = Some(
257 shared.ok_or_else(|| VCLError::HandshakeFailed("Key exchange failed".to_string()))?
258 );
259 self.is_server = true;
260 }
261 _ => {
262 warn!(peer = %addr, "Expected ClientHello, got unexpected message");
263 return Err(VCLError::ExpectedClientHello);
264 }
265 }
266
267 self.last_activity = Instant::now();
268 info!(peer = %addr, "Handshake complete (server)");
269 self.emit(VCLEvent::Connected);
270 Ok(())
271 }
272
273 async fn send_internal(&mut self, data: &[u8], packet_type: PacketType) -> Result<(), VCLError> {
276 let key = self.shared_secret.ok_or(VCLError::NoSharedSecret)?;
277 let (encrypted_payload, nonce) = encrypt_payload(data, &key)?;
278
279 let mut packet = VCLPacket::new_typed(
280 self.send_sequence,
281 self.send_hash.clone(),
282 encrypted_payload,
283 nonce,
284 packet_type,
285 );
286 packet.sign(&self.keypair.private_key)?;
287
288 let serialized = packet.serialize();
289 let addr = self.peer_addr.ok_or(VCLError::NoPeerAddress)?;
290 self.socket.send_to(&serialized, addr).await?;
291
292 debug!(
293 peer = %addr,
294 seq = self.send_sequence,
295 size = data.len(),
296 packet_type = ?packet.packet_type,
297 "Packet sent"
298 );
299
300 self.send_hash = packet.compute_hash();
301 self.send_sequence += 1;
302 self.last_activity = Instant::now();
303 Ok(())
304 }
305
306 pub async fn send(&mut self, data: &[u8]) -> Result<(), VCLError> {
317 if self.closed {
318 error!("send() called on closed connection");
319 return Err(VCLError::ConnectionClosed);
320 }
321 self.check_timeout()?;
322 self.send_internal(data, PacketType::Data).await
323 }
324
325 pub async fn ping(&mut self) -> Result<(), VCLError> {
338 if self.closed {
339 error!("ping() called on closed connection");
340 return Err(VCLError::ConnectionClosed);
341 }
342 self.check_timeout()?;
343 debug!("Ping sent");
344 self.ping_sent_at = Some(Instant::now());
345 self.send_internal(&[], PacketType::Ping).await
346 }
347
348 async fn handle_ping(&mut self) -> Result<(), VCLError> {
349 debug!("Ping received, sending Pong");
350 self.send_internal(&[], PacketType::Pong).await?;
351 self.emit(VCLEvent::PingReceived);
352 Ok(())
353 }
354
355 fn handle_pong(&mut self) {
356 if let Some(sent_at) = self.ping_sent_at.take() {
357 let latency = sent_at.elapsed();
358 debug!(latency_us = latency.as_micros(), "Pong received");
359 self.emit(VCLEvent::PongReceived { latency });
360 }
361 }
362
363 pub async fn rotate_keys(&mut self) -> Result<(), VCLError> {
382 if self.closed {
383 error!("rotate_keys() called on closed connection");
384 return Err(VCLError::ConnectionClosed);
385 }
386 self.check_timeout()?;
387 info!("Initiating key rotation");
388
389 let our_ephemeral = EphemeralSecret::random_from_rng(OsRng);
390 let our_public = PublicKey::from(&our_ephemeral);
391
392 self.send_internal(&our_public.to_bytes(), PacketType::KeyRotation).await?;
393 debug!("KeyRotation request sent, waiting for response");
394
395 let mut buf = vec![0u8; 65535];
396 let (len, _) = self.socket.recv_from(&mut buf).await?;
397 let packet = VCLPacket::deserialize(&buf[..len])?;
398
399 if self.seen_nonces.contains(&packet.nonce) {
400 warn!("Replay detected during key rotation: duplicate nonce");
401 return Err(VCLError::ReplayDetected("Duplicate nonce in key rotation".to_string()));
402 }
403 self.seen_nonces.insert(packet.nonce);
404
405 if !packet.validate_chain(&self.recv_hash) {
406 warn!("Chain validation failed during key rotation");
407 return Err(VCLError::ChainValidationFailed);
408 }
409
410 let verify_key = self.peer_public_key.as_ref().unwrap_or(&self.keypair.public_key);
411 if !packet.verify(verify_key)? {
412 warn!("Signature invalid during key rotation");
413 return Err(VCLError::SignatureInvalid);
414 }
415
416 self.recv_hash = packet.compute_hash();
417 self.last_sequence = packet.sequence;
418 self.last_activity = Instant::now();
419
420 let old_key = self.shared_secret.ok_or(VCLError::NoSharedSecret)?;
421 let decrypted = decrypt_payload(&packet.payload, &old_key, &packet.nonce)?;
422
423 if packet.packet_type != PacketType::KeyRotation {
424 warn!("Expected KeyRotation response, got {:?}", packet.packet_type);
425 return Err(VCLError::HandshakeFailed("Expected KeyRotation response".to_string()));
426 }
427 if decrypted.len() != 32 {
428 warn!("KeyRotation payload has wrong length: {}", decrypted.len());
429 return Err(VCLError::InvalidPacket("KeyRotation payload must be 32 bytes".to_string()));
430 }
431
432 let their_bytes: [u8; 32] = decrypted
433 .try_into()
434 .map_err(|_| VCLError::InvalidPacket("Invalid peer pubkey".to_string()))?;
435 let their_pubkey = PublicKey::from(their_bytes);
436 let new_secret = our_ephemeral.diffie_hellman(&their_pubkey);
437 self.shared_secret = Some(new_secret.to_bytes());
438 info!("Key rotation complete");
439 self.emit(VCLEvent::KeyRotated);
440 Ok(())
441 }
442
443 async fn handle_key_rotation_request(&mut self, their_pubkey_bytes: &[u8]) -> Result<(), VCLError> {
444 debug!("KeyRotation request received, processing");
445 if their_pubkey_bytes.len() != 32 {
446 warn!("KeyRotation payload has wrong length: {}", their_pubkey_bytes.len());
447 return Err(VCLError::InvalidPacket("KeyRotation payload must be 32 bytes".to_string()));
448 }
449
450 let their_bytes: [u8; 32] = their_pubkey_bytes
451 .try_into()
452 .map_err(|_| VCLError::InvalidPacket("Invalid peer pubkey".to_string()))?;
453 let their_pubkey = PublicKey::from(their_bytes);
454
455 let our_ephemeral = EphemeralSecret::random_from_rng(OsRng);
456 let our_public = PublicKey::from(&our_ephemeral);
457 let new_secret = our_ephemeral.diffie_hellman(&their_pubkey);
458
459 self.send_internal(&our_public.to_bytes(), PacketType::KeyRotation).await?;
460 debug!("KeyRotation response sent");
461
462 self.shared_secret = Some(new_secret.to_bytes());
463 info!("Key rotation complete (responder)");
464 self.emit(VCLEvent::KeyRotated);
465 Ok(())
466 }
467
468 pub async fn recv(&mut self) -> Result<VCLPacket, VCLError> {
486 if self.closed {
487 error!("recv() called on closed connection");
488 return Err(VCLError::ConnectionClosed);
489 }
490
491 loop {
492 self.check_timeout()?;
493
494 let mut buf = vec![0u8; 65535];
495 let (len, addr) = self.socket.recv_from(&mut buf).await?;
496 if self.peer_addr.is_none() {
497 self.peer_addr = Some(addr);
498 }
499
500 let packet = VCLPacket::deserialize(&buf[..len])?;
501
502 if self.last_sequence > 0 && packet.sequence <= self.last_sequence {
503 warn!(
504 seq = packet.sequence,
505 last_seq = self.last_sequence,
506 "Replay detected: old sequence number"
507 );
508 return Err(VCLError::ReplayDetected("Old sequence number".to_string()));
509 }
510 if self.seen_nonces.contains(&packet.nonce) {
511 warn!(seq = packet.sequence, "Replay detected: duplicate nonce");
512 return Err(VCLError::ReplayDetected("Duplicate nonce".to_string()));
513 }
514 self.seen_nonces.insert(packet.nonce);
515 if self.seen_nonces.len() > 1000 {
516 debug!("Nonce window full, clearing");
517 self.seen_nonces.clear();
518 }
519
520 if !packet.validate_chain(&self.recv_hash) {
521 warn!(seq = packet.sequence, "Chain validation failed");
522 return Err(VCLError::ChainValidationFailed);
523 }
524
525 let verify_key = self.peer_public_key.as_ref().unwrap_or(&self.keypair.public_key);
526 if !packet.verify(verify_key)? {
527 warn!(seq = packet.sequence, "Signature invalid");
528 return Err(VCLError::SignatureInvalid);
529 }
530
531 self.recv_hash = packet.compute_hash();
532 self.last_sequence = packet.sequence;
533 self.last_activity = Instant::now();
534
535 let key = self.shared_secret.ok_or(VCLError::NoSharedSecret)?;
536 let decrypted = decrypt_payload(&packet.payload, &key, &packet.nonce)?;
537
538 match packet.packet_type {
539 PacketType::Data => {
540 debug!(
541 peer = %addr,
542 seq = packet.sequence,
543 size = decrypted.len(),
544 "Data packet received"
545 );
546 self.emit(VCLEvent::PacketReceived {
547 sequence: packet.sequence,
548 size: decrypted.len(),
549 });
550 return Ok(VCLPacket {
551 version: packet.version,
552 packet_type: PacketType::Data,
553 sequence: packet.sequence,
554 prev_hash: packet.prev_hash,
555 nonce: packet.nonce,
556 payload: decrypted,
557 signature: packet.signature,
558 });
559 }
560 PacketType::Ping => { self.handle_ping().await?; }
561 PacketType::Pong => { self.handle_pong(); }
562 PacketType::KeyRotation => {
563 self.handle_key_rotation_request(&decrypted).await?;
564 }
565 }
566 }
567 }
568
569 fn check_timeout(&self) -> Result<(), VCLError> {
572 if self.last_activity.elapsed().as_secs() > self.timeout_secs {
573 warn!(
574 elapsed_secs = self.last_activity.elapsed().as_secs(),
575 timeout_secs = self.timeout_secs,
576 "Connection timed out"
577 );
578 return Err(VCLError::Timeout);
579 }
580 Ok(())
581 }
582
583 pub fn close(&mut self) -> Result<(), VCLError> {
591 if self.closed {
592 warn!("close() called on already closed connection");
593 return Err(VCLError::ConnectionClosed);
594 }
595 info!("Connection closed");
596 self.closed = true;
597 self.send_sequence = 0;
598 self.send_hash = vec![0; 32];
599 self.recv_hash = vec![0; 32];
600 self.last_sequence = 0;
601 self.seen_nonces.clear();
602 self.shared_secret = None;
603 self.ping_sent_at = None;
604 self.emit(VCLEvent::Disconnected);
605 Ok(())
606 }
607
608 pub fn is_closed(&self) -> bool {
610 self.closed
611 }
612
613 pub fn get_public_key(&self) -> Vec<u8> {
615 self.keypair.public_key.clone()
616 }
617
618 pub fn get_shared_secret(&self) -> Option<[u8; 32]> {
621 self.shared_secret
622 }
623}