use super::{Transport, Connection, TransportType, TransportOptions, ConnectionInfo, ConnectionQuality};
use crate::tunneling::{TunnelManager, TunnelManagerConfig, detect_network_capabilities};
use crate::{Multiaddr, P2PError, Result};
use async_trait::async_trait;
use std::net::{SocketAddr, Ipv6Addr};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::RwLock;
use tokio::net::{TcpListener, TcpStream};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tracing::{debug, info, warn};
pub struct TunneledTransport {
tunnel_manager: Arc<TunnelManager>,
tunnel_state: Arc<RwLock<Option<TunnelInfo>>>,
config: TunnelTransportConfig,
}
#[derive(Debug, Clone)]
pub struct TunnelTransportConfig {
pub enable_auto_selection: bool,
pub ipv4_fallback: bool,
pub connect_timeout: Duration,
pub health_check_interval: Duration,
pub max_tunnel_retries: u32,
}
#[derive(Debug, Clone)]
struct TunnelInfo {
pub protocol: crate::tunneling::TunnelProtocol,
pub local_ipv6: Ipv6Addr,
pub established_at: Instant,
pub _last_success: Instant,
}
pub struct TunneledConnection {
stream: TcpStream,
local_addr: Multiaddr,
remote_addr: Multiaddr,
established_at: Instant,
_tunnel_info: TunnelInfo,
}
impl Default for TunnelTransportConfig {
fn default() -> Self {
Self {
enable_auto_selection: true,
ipv4_fallback: true,
connect_timeout: Duration::from_secs(30),
health_check_interval: Duration::from_secs(60),
max_tunnel_retries: 3,
}
}
}
impl TunneledTransport {
pub async fn new(config: TunnelTransportConfig) -> Result<Self> {
let tunnel_manager_config = TunnelManagerConfig {
auto_failover: true,
max_concurrent_attempts: 3,
health_check_interval: config.health_check_interval,
health_check_timeout: Duration::from_secs(5),
..Default::default()
};
let tunnel_manager = Arc::new(TunnelManager::with_config(tunnel_manager_config));
let tunnel_state = Arc::new(RwLock::new(None));
Ok(Self {
tunnel_manager,
tunnel_state,
config,
})
}
async fn establish_tunnel(&self) -> Result<TunnelInfo> {
info!("Establishing tunneled connectivity...");
let capabilities = detect_network_capabilities().await
.map_err(|e| P2PError::Network(format!("Failed to detect network capabilities: {}", e)))?;
info!("Network capabilities: IPv4={}, IPv6={}, NAT={}, UPnP={}",
capabilities.has_ipv4, capabilities.has_ipv6,
capabilities.behind_nat, capabilities.has_upnp);
if capabilities.has_ipv6 && !capabilities.ipv6_addresses.is_empty() {
info!("Native IPv6 connectivity available, using direct connection");
return Ok(TunnelInfo {
protocol: crate::tunneling::TunnelProtocol::SixToFour, local_ipv6: capabilities.ipv6_addresses[0],
established_at: Instant::now(),
_last_success: Instant::now(),
});
}
if !self.config.enable_auto_selection {
return Err(P2PError::Network("Tunneling disabled and no native IPv6".to_string()));
}
if let Some(selection) = self.tunnel_manager.select_tunnel(&capabilities).await {
info!("Selected tunnel protocol: {:?} - {}", selection.protocol, selection.reason);
let local_ipv6 = match selection.protocol {
crate::tunneling::TunnelProtocol::SixToFour => {
"2002:cb00:7100::1".parse().unwrap()
}
crate::tunneling::TunnelProtocol::Teredo => {
"2001:0000:4136:e378:8000:63bf:3fff:fdd2".parse().unwrap()
}
crate::tunneling::TunnelProtocol::SixInFour => {
"2001:db8:1234::1".parse().unwrap()
}
crate::tunneling::TunnelProtocol::DsLite => {
"2001:db8:dslite::1".parse().unwrap()
}
crate::tunneling::TunnelProtocol::Isatap => {
use crate::tunneling::IsatapTunnel;
let ipv4_addr = capabilities.public_ipv4.unwrap_or_else(|| "192.168.1.100".parse().unwrap());
IsatapTunnel::generate_isatap_address(ipv4_addr, Some("fe80::".parse().unwrap()))
}
crate::tunneling::TunnelProtocol::MapE => {
"2001:db8::c000:264".parse().unwrap()
}
crate::tunneling::TunnelProtocol::MapT => {
"2001:db8::c000:264".parse().unwrap()
}
};
let tunnel_info = TunnelInfo {
protocol: selection.protocol,
local_ipv6,
established_at: Instant::now(),
_last_success: Instant::now(),
};
info!("Tunnel established successfully: {:?} -> {}",
tunnel_info.protocol, tunnel_info.local_ipv6);
Ok(tunnel_info)
} else {
if self.config.ipv4_fallback {
warn!("No suitable tunnel found, fallback to IPv4 disabled");
Err(P2PError::Network("No tunnel available and fallback disabled".to_string()))
} else {
Err(P2PError::Network("No suitable tunnel protocol found".to_string()))
}
}
}
async fn get_tunnel_info(&self) -> Result<TunnelInfo> {
let current_tunnel = self.tunnel_state.read().await;
if let Some(ref tunnel_info) = *current_tunnel {
if tunnel_info.established_at.elapsed() < Duration::from_secs(3600) {
return Ok(tunnel_info.clone());
}
}
drop(current_tunnel);
let tunnel_info = self.establish_tunnel().await?;
let mut tunnel_state = self.tunnel_state.write().await;
*tunnel_state = Some(tunnel_info.clone());
Ok(tunnel_info)
}
}
#[async_trait]
impl Transport for TunneledTransport {
async fn listen(&self, addr: SocketAddr) -> Result<Vec<Multiaddr>> {
let tunnel_info = self.get_tunnel_info().await?;
let ipv6_addr = SocketAddr::new(tunnel_info.local_ipv6.into(), addr.port());
debug!("Starting tunneled listener on {}", ipv6_addr);
let listener = TcpListener::bind(ipv6_addr).await
.map_err(|e| P2PError::Network(format!("Failed to bind listener: {}", e)))?;
let local_addr = listener.local_addr()
.map_err(|e| P2PError::Network(format!("Failed to get local address: {}", e)))?;
let multiaddr = format!("/ip6/{}/tcp/{}", local_addr.ip(), local_addr.port());
info!("Tunneled transport listening on {}", multiaddr);
Ok(vec![multiaddr])
}
async fn accept(&self) -> Result<Box<dyn Connection>> {
Err(P2PError::Transport("Tunneled transport accept not yet implemented".to_string()))
}
async fn connect(&self, addr: &Multiaddr) -> Result<Box<dyn Connection>> {
self.connect_with_options(addr, TransportOptions::default()).await
}
async fn connect_with_options(&self, addr: &Multiaddr, options: TransportOptions) -> Result<Box<dyn Connection>> {
let tunnel_info = self.get_tunnel_info().await?;
info!("Connecting via tunnel ({:?}) to {}", tunnel_info.protocol, addr);
let parts: Vec<&str> = addr.split('/').collect();
if parts.len() < 5 || parts[1] != "ip6" || parts[3] != "tcp" {
return Err(P2PError::Network(format!("Invalid IPv6 multiaddr: {}", addr)));
}
let target_ip: Ipv6Addr = parts[2].parse()
.map_err(|e| P2PError::Network(format!("Invalid IPv6 address: {}", e)))?;
let target_port: u16 = parts[4].parse()
.map_err(|e| P2PError::Network(format!("Invalid port: {}", e)))?;
let target_addr = SocketAddr::new(target_ip.into(), target_port);
debug!("Establishing tunneled connection to {}", target_addr);
let stream = tokio::time::timeout(
options.connect_timeout,
TcpStream::connect(target_addr)
).await
.map_err(|_| P2PError::Network("Connection timeout".to_string()))?
.map_err(|e| P2PError::Network(format!("Connection failed: {}", e)))?;
let local_addr = stream.local_addr()
.map_err(|e| P2PError::Network(format!("Failed to get local address: {}", e)))?;
let remote_addr = stream.peer_addr()
.map_err(|e| P2PError::Network(format!("Failed to get remote address: {}", e)))?;
info!("Tunneled connection established: {} -> {}", local_addr, remote_addr);
let connection = TunneledConnection {
stream,
local_addr: format!("/ip6/{}/tcp/{}", local_addr.ip(), local_addr.port()),
remote_addr: format!("/ip6/{}/tcp/{}", remote_addr.ip(), remote_addr.port()),
established_at: Instant::now(),
_tunnel_info: tunnel_info,
};
Ok(Box::new(connection))
}
fn supported_addresses(&self) -> Vec<String> {
vec![
"/ip6/*/tcp/*".to_string(),
"/ip4/*/tcp/*".to_string(), ]
}
fn transport_type(&self) -> TransportType {
TransportType::TCP }
fn supports_address(&self, addr: &Multiaddr) -> bool {
addr.contains("/ip6/") || addr.contains("/ip4/")
}
}
#[async_trait]
impl Connection for TunneledConnection {
async fn send(&mut self, data: &[u8]) -> Result<()> {
debug!("Sending {} bytes via tunneled connection", data.len());
let len_bytes = (data.len() as u32).to_be_bytes();
self.stream.write_all(&len_bytes).await
.map_err(|e| P2PError::Network(format!("Failed to send length: {}", e)))?;
self.stream.write_all(data).await
.map_err(|e| P2PError::Network(format!("Failed to send data: {}", e)))?;
self.stream.flush().await
.map_err(|e| P2PError::Network(format!("Failed to flush: {}", e)))?;
Ok(())
}
async fn receive(&mut self) -> Result<Vec<u8>> {
debug!("Receiving data via tunneled connection");
let mut len_bytes = [0u8; 4];
self.stream.read_exact(&mut len_bytes).await
.map_err(|e| P2PError::Network(format!("Failed to read length: {}", e)))?;
let data_len = u32::from_be_bytes(len_bytes) as usize;
if data_len > 1024 * 1024 { return Err(P2PError::Network("Message too large".to_string()));
}
let mut data = vec![0u8; data_len];
self.stream.read_exact(&mut data).await
.map_err(|e| P2PError::Network(format!("Failed to read data: {}", e)))?;
debug!("Received {} bytes via tunneled connection", data.len());
Ok(data)
}
async fn info(&self) -> ConnectionInfo {
ConnectionInfo {
transport_type: TransportType::TCP,
local_addr: self.local_addr.clone(),
remote_addr: self.remote_addr.clone(),
is_encrypted: false, cipher_suite: "none".to_string(),
used_0rtt: false,
established_at: self.established_at,
last_activity: Instant::now(),
}
}
async fn close(&mut self) -> Result<()> {
debug!("Closing tunneled connection");
self.stream.shutdown().await
.map_err(|e| P2PError::Network(format!("Failed to close connection: {}", e)))?;
Ok(())
}
async fn is_alive(&self) -> bool {
self.established_at.elapsed() < Duration::from_secs(3600)
}
async fn measure_quality(&self) -> Result<ConnectionQuality> {
let start = Instant::now();
let latency = Duration::from_millis(50);
let jitter = Duration::from_millis(5);
Ok(ConnectionQuality {
latency,
throughput_mbps: 100.0, packet_loss: 0.01, jitter,
connect_time: start.elapsed(),
})
}
fn local_addr(&self) -> Multiaddr {
self.local_addr.clone()
}
fn remote_addr(&self) -> Multiaddr {
self.remote_addr.clone()
}
}