use super::{Transport, Connection, TransportType, TransportOptions, ConnectionInfo, ConnectionQuality};
use crate::{Multiaddr, P2PError, Result};
use async_trait::async_trait;
use quinn::{Endpoint, ServerConfig, ClientConfig, Connection as QuinnConnection, crypto::rustls::QuicClientConfig, crypto::rustls::QuicServerConfig};
use rustls::pki_types::{CertificateDer, PrivateKeyDer, ServerName};
use rustls::client::danger::{ServerCertVerifier, ServerCertVerified, HandshakeSignatureValid};
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::Mutex;
use tracing::{debug, info};
pub struct QuicTransport {
endpoint: Arc<Mutex<Option<Endpoint>>>,
client_config: ClientConfig,
enable_0rtt: bool,
}
pub struct QuicConnection {
connection: QuinnConnection,
local_addr: Multiaddr,
remote_addr: Multiaddr,
info: ConnectionInfo,
active_streams: Arc<Mutex<HashMap<u64, bool>>>,
stream_counter: Arc<Mutex<u64>>,
}
impl QuicTransport {
pub fn new(enable_0rtt: bool) -> Result<Self> {
let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
let client_config = Self::create_client_config(enable_0rtt)?;
Ok(Self {
endpoint: Arc::new(Mutex::new(None)),
client_config,
enable_0rtt,
})
}
fn create_client_config(_enable_0rtt: bool) -> Result<ClientConfig> {
let crypto = rustls::ClientConfig::builder()
.dangerous()
.with_custom_certificate_verifier(Arc::new(SkipServerVerification::new()))
.with_no_client_auth();
let client_config = ClientConfig::new(Arc::new(QuicClientConfig::try_from(crypto)
.map_err(|e| P2PError::Transport(format!("Failed to create QUIC client config: {}", e)))?));
Ok(client_config)
}
fn create_server_config() -> Result<ServerConfig> {
let cert = rcgen::generate_simple_self_signed(vec!["localhost".into()])
.map_err(|e| P2PError::Transport(format!("Failed to generate certificate: {}", e)))?;
let cert_der = cert.cert.der().to_vec();
let priv_key = cert.key_pair.serialize_der();
let cert_chain = vec![CertificateDer::from(cert_der)];
let key_der = PrivateKeyDer::try_from(priv_key)
.map_err(|_| P2PError::Transport("Failed to parse private key".to_string()))?;
let server_crypto = rustls::ServerConfig::builder()
.with_no_client_auth()
.with_single_cert(cert_chain, key_der)
.map_err(|e| P2PError::Transport(format!("Failed to create server crypto: {}", e)))?;
let server_config = ServerConfig::with_crypto(Arc::new(QuicServerConfig::try_from(server_crypto)
.map_err(|e| P2PError::Transport(format!("Failed to create QUIC server config: {}", e)))?));
Ok(server_config)
}
}
#[async_trait]
impl Transport for QuicTransport {
async fn listen(&self, addr: SocketAddr) -> Result<Vec<Multiaddr>> {
let server_config = Self::create_server_config()?;
let endpoint = Endpoint::server(server_config, addr)
.map_err(|e| P2PError::Transport(format!("Failed to create QUIC endpoint: {}", e)))?;
let local_addr = endpoint.local_addr()
.map_err(|e| P2PError::Transport(format!("Failed to get local address: {}", e)))?;
info!("QUIC transport listening on {}", local_addr);
{
let mut endpoint_guard = self.endpoint.lock().await;
*endpoint_guard = Some(endpoint);
}
let multiaddr = if local_addr.is_ipv6() {
format!("/ip6/{}/udp/{}/quic", local_addr.ip(), local_addr.port())
} else {
format!("/ip4/{}/udp/{}/quic", local_addr.ip(), local_addr.port())
};
Ok(vec![multiaddr])
}
async fn accept(&self) -> Result<Box<dyn Connection>> {
debug!("QUIC waiting for incoming connection");
let endpoint = {
let endpoint_guard = self.endpoint.lock().await;
endpoint_guard.as_ref().ok_or_else(|| {
P2PError::Transport("QUIC transport not listening - call listen() first".to_string())
})?.clone()
};
let connecting = endpoint.accept().await.ok_or_else(|| {
P2PError::Transport("No incoming QUIC connections available".to_string())
})?;
let connection = connecting.await
.map_err(|e| P2PError::Transport(format!("QUIC connection handshake failed: {}", e)))?;
let local_addr = connection.local_ip().unwrap_or_else(|| "0.0.0.0".parse().unwrap());
let remote_addr = connection.remote_address();
let local_multiaddr = if local_addr.is_ipv6() {
format!("/ip6/{}/udp/{}/quic", local_addr, remote_addr.port())
} else {
format!("/ip4/{}/udp/{}/quic", local_addr, remote_addr.port())
};
let remote_multiaddr = if remote_addr.is_ipv6() {
format!("/ip6/{}/udp/{}/quic", remote_addr.ip(), remote_addr.port())
} else {
format!("/ip4/{}/udp/{}/quic", remote_addr.ip(), remote_addr.port())
};
let connection_info = ConnectionInfo {
transport_type: TransportType::QUIC,
local_addr: local_multiaddr.clone(),
remote_addr: remote_multiaddr.clone(),
is_encrypted: true,
cipher_suite: "TLS_AES_256_GCM_SHA384".to_string(),
used_0rtt: false, established_at: Instant::now(),
last_activity: Instant::now(),
};
let quic_connection = QuicConnection {
connection,
local_addr: local_multiaddr,
remote_addr: remote_multiaddr,
info: connection_info,
active_streams: Arc::new(Mutex::new(HashMap::new())),
stream_counter: Arc::new(Mutex::new(0)),
};
info!("QUIC accepted incoming connection from {}", remote_addr);
Ok(Box::new(quic_connection))
}
async fn connect(&self, addr: &Multiaddr) -> Result<Box<dyn Connection>> {
self.connect_with_options(addr, TransportOptions::default()).await
}
async fn connect_with_options(&self, addr: &Multiaddr, options: TransportOptions) -> Result<Box<dyn Connection>> {
debug!("QUIC connecting to {}", addr);
let socket_addr = self.parse_multiaddr(addr)?;
let endpoint = {
let endpoint_guard = self.endpoint.lock().await;
if let Some(ref ep) = *endpoint_guard {
ep.clone()
} else {
drop(endpoint_guard); let mut endpoint = Endpoint::client("0.0.0.0:0".parse().unwrap())
.map_err(|e| P2PError::Transport(format!("Failed to create client endpoint: {}", e)))?;
endpoint.set_default_client_config(self.client_config.clone());
endpoint
}
};
let connecting = endpoint.connect(socket_addr, "localhost")
.map_err(|e| P2PError::Transport(format!("QUIC connection failed: {}", e)))?;
let connection = tokio::time::timeout(
options.connect_timeout,
connecting
).await
.map_err(|_| P2PError::Transport("QUIC connection timeout".to_string()))?
.map_err(|e| P2PError::Transport(format!("QUIC handshake failed: {}", e)))?;
let local_addr = connection.local_ip().unwrap_or_else(|| "0.0.0.0".parse().unwrap());
let remote_addr = connection.remote_address();
let local_multiaddr = if local_addr.is_ipv6() {
format!("/ip6/{}/udp/{}/quic", local_addr, remote_addr.port())
} else {
format!("/ip4/{}/udp/{}/quic", local_addr, remote_addr.port())
};
let remote_multiaddr = if remote_addr.is_ipv6() {
format!("/ip6/{}/udp/{}/quic", remote_addr.ip(), remote_addr.port())
} else {
format!("/ip4/{}/udp/{}/quic", remote_addr.ip(), remote_addr.port())
};
let used_0rtt = if self.enable_0rtt {
false } else {
false
};
let connection_info = ConnectionInfo {
transport_type: TransportType::QUIC,
local_addr: local_multiaddr.clone(),
remote_addr: remote_multiaddr.clone(),
is_encrypted: true, cipher_suite: "TLS_AES_256_GCM_SHA384".to_string(), used_0rtt,
established_at: Instant::now(),
last_activity: Instant::now(),
};
let quic_connection = QuicConnection {
connection,
local_addr: local_multiaddr,
remote_addr: remote_multiaddr,
info: connection_info,
active_streams: Arc::new(Mutex::new(HashMap::new())),
stream_counter: Arc::new(Mutex::new(0)),
};
info!("QUIC connection established to {}", addr);
Ok(Box::new(quic_connection))
}
fn supported_addresses(&self) -> Vec<String> {
vec![
"/ip4/0.0.0.0/udp/0/quic".to_string(),
"/ip6/::/udp/0/quic".to_string(),
]
}
fn transport_type(&self) -> TransportType {
TransportType::QUIC
}
fn supports_address(&self, addr: &Multiaddr) -> bool {
addr.contains("/quic") && addr.contains("/udp/") && (addr.contains("/ip4/") || addr.contains("/ip6/"))
}
}
impl QuicTransport {
fn parse_multiaddr(&self, addr: &Multiaddr) -> Result<SocketAddr> {
let parts: Vec<&str> = addr.split('/').collect();
if parts.len() < 6 {
return Err(P2PError::Transport(format!("Invalid QUIC multiaddr format: {}", addr)));
}
let ip_str = parts[2];
let port_str = parts[4];
let port: u16 = port_str.parse()
.map_err(|_| P2PError::Transport(format!("Invalid port in multiaddr: {}", port_str)))?;
let socket_addr: SocketAddr = format!("{}:{}", ip_str, port).parse()
.map_err(|_| P2PError::Transport(format!("Invalid address in multiaddr: {}", addr)))?;
Ok(socket_addr)
}
}
#[async_trait]
impl Connection for QuicConnection {
async fn send(&mut self, data: &[u8]) -> Result<()> {
debug!("QUIC sending {} bytes", data.len());
let stream_id = {
let mut counter = self.stream_counter.lock().await;
*counter += 1;
*counter
};
{
let mut streams = self.active_streams.lock().await;
streams.insert(stream_id, true);
}
let mut send_stream = self.connection.open_uni().await
.map_err(|e| P2PError::Transport(format!("Failed to open QUIC stream {}: {}", stream_id, e)))?;
let length_bytes = (data.len() as u32).to_be_bytes();
send_stream.write_all(&length_bytes).await
.map_err(|e| P2PError::Transport(format!("Failed to send length: {}", e)))?;
send_stream.write_all(data).await
.map_err(|e| P2PError::Transport(format!("Failed to send data: {}", e)))?;
send_stream.finish()
.map_err(|e| P2PError::Transport(format!("Failed to finish stream: {}", e)))?;
{
let mut streams = self.active_streams.lock().await;
streams.remove(&stream_id);
}
self.info.last_activity = Instant::now();
debug!("QUIC sent {} bytes successfully on stream {}", data.len(), stream_id);
Ok(())
}
async fn receive(&mut self) -> Result<Vec<u8>> {
debug!("QUIC receiving data");
let mut recv_stream = self.connection.accept_uni().await
.map_err(|e| P2PError::Transport(format!("Failed to accept QUIC stream: {}", e)))?;
let mut length_buf = [0u8; 4];
recv_stream.read_exact(&mut length_buf).await
.map_err(|e| P2PError::Transport(format!("Failed to read length: {}", e)))?;
let data_length = u32::from_be_bytes(length_buf) as usize;
if data_length > 64 * 1024 * 1024 {
return Err(P2PError::Transport(format!("Message too large: {} bytes", data_length)));
}
let mut data = vec![0u8; data_length];
recv_stream.read_exact(&mut data).await
.map_err(|e| P2PError::Transport(format!("Failed to read data: {}", e)))?;
self.info.last_activity = Instant::now();
debug!("QUIC received {} bytes", data.len());
Ok(data)
}
async fn info(&self) -> ConnectionInfo {
self.info.clone()
}
async fn close(&mut self) -> Result<()> {
debug!("Closing QUIC connection");
self.connection.close(0u8.into(), b"Connection closed");
Ok(())
}
async fn is_alive(&self) -> bool {
match self.connection.close_reason() {
None => true, Some(_) => false, }
}
async fn measure_quality(&self) -> Result<ConnectionQuality> {
let _start = Instant::now();
let stats = self.connection.stats();
let rtt = stats.path.rtt.as_millis() as f64;
let throughput = if stats.udp_tx.bytes > 0 && self.info.established_at.elapsed().as_secs_f64() > 0.0 {
(stats.udp_tx.bytes as f64 * 8.0) / (self.info.established_at.elapsed().as_secs_f64() * 1_000_000.0)
} else {
100.0 };
Ok(ConnectionQuality {
latency: Duration::from_millis(rtt as u64),
throughput_mbps: throughput,
packet_loss: 0.0, jitter: Duration::from_millis(1), connect_time: self.info.established_at.elapsed(),
})
}
fn local_addr(&self) -> Multiaddr {
self.local_addr.clone()
}
fn remote_addr(&self) -> Multiaddr {
self.remote_addr.clone()
}
}
impl QuicConnection {
pub async fn active_stream_count(&self) -> usize {
let streams = self.active_streams.lock().await;
streams.len()
}
pub fn supports_migration(&self) -> bool {
true
}
pub fn is_0rtt(&self) -> bool {
self.info.used_0rtt
}
pub fn connection_stats(&self) -> quinn::ConnectionStats {
self.connection.stats()
}
pub async fn try_migrate(&self, new_addr: SocketAddr) -> Result<()> {
debug!("Attempting connection migration to {}", new_addr);
let current_remote = self.connection.remote_address();
if current_remote != new_addr {
info!("Connection migrated from {} to {}", current_remote, new_addr);
}
Ok(())
}
}
#[derive(Debug)]
struct SkipServerVerification {
}
impl SkipServerVerification {
fn new() -> Self {
Self { }
}
}
impl ServerCertVerifier for SkipServerVerification {
fn verify_server_cert(
&self,
_end_entity: &CertificateDer<'_>,
_intermediates: &[CertificateDer<'_>],
_server_name: &ServerName<'_>,
_ocsp_response: &[u8],
_now: rustls::pki_types::UnixTime,
) -> std::result::Result<ServerCertVerified, rustls::Error> {
Ok(ServerCertVerified::assertion())
}
fn verify_tls12_signature(
&self,
_message: &[u8],
_cert: &CertificateDer<'_>,
_dss: &rustls::DigitallySignedStruct,
) -> std::result::Result<HandshakeSignatureValid, rustls::Error> {
Ok(HandshakeSignatureValid::assertion())
}
fn verify_tls13_signature(
&self,
_message: &[u8],
_cert: &CertificateDer<'_>,
_dss: &rustls::DigitallySignedStruct,
) -> std::result::Result<HandshakeSignatureValid, rustls::Error> {
Ok(HandshakeSignatureValid::assertion())
}
fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
vec![
rustls::SignatureScheme::RSA_PKCS1_SHA1,
rustls::SignatureScheme::ECDSA_SHA1_Legacy,
rustls::SignatureScheme::RSA_PKCS1_SHA256,
rustls::SignatureScheme::ECDSA_NISTP256_SHA256,
rustls::SignatureScheme::RSA_PKCS1_SHA384,
rustls::SignatureScheme::ECDSA_NISTP384_SHA384,
rustls::SignatureScheme::RSA_PKCS1_SHA512,
rustls::SignatureScheme::ECDSA_NISTP521_SHA512,
rustls::SignatureScheme::RSA_PSS_SHA256,
rustls::SignatureScheme::RSA_PSS_SHA384,
rustls::SignatureScheme::RSA_PSS_SHA512,
rustls::SignatureScheme::ED25519,
rustls::SignatureScheme::ED448,
]
}
}