1use super::{Tunnel, TunnelConfig, TunnelMetrics, TunnelState, TunnelProtocol};
28use crate::{P2PError, Result};
29use async_trait::async_trait;
30use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV6};
31use std::time::{Duration, Instant};
32use tokio::sync::RwLock;
33use tokio::net::UdpSocket;
34use tracing::{debug, info, warn, error};
35
36const IPV4_IN_IPV6_PROTOCOL: u8 = 4;
38
39const AFTR_WELL_KNOWN_IPV4: Ipv4Addr = Ipv4Addr::new(192, 0, 0, 1);
41
42#[allow(dead_code)]
44const DEFAULT_DSLITE_MTU: u16 = 1520;
45
46pub struct DsLiteTunnel {
48 config: TunnelConfig,
50 state: RwLock<TunnelState>,
52 metrics: RwLock<TunnelMetrics>,
54 aftr_address: RwLock<Option<Ipv6Addr>>,
56 #[allow(dead_code)]
58 local_ipv6: Option<Ipv6Addr>,
59 socket: RwLock<Option<UdpSocket>>,
61 established_at: RwLock<Option<Instant>>,
63}
64
65impl DsLiteTunnel {
66 pub fn new(config: TunnelConfig) -> Result<Self> {
68 if config.protocol != TunnelProtocol::DsLite {
69 return Err(P2PError::Network(
70 "Invalid protocol for DS-Lite tunnel".to_string()
71 ).into());
72 }
73
74 Ok(Self {
75 config,
76 state: RwLock::new(TunnelState::Disconnected),
77 metrics: RwLock::new(TunnelMetrics::default()),
78 aftr_address: RwLock::new(None),
79 local_ipv6: None,
80 socket: RwLock::new(None),
81 established_at: RwLock::new(None),
82 })
83 }
84
85 pub async fn discover_aftr(&self) -> Result<Ipv6Addr> {
87 if let Some(aftr_addr) = self.config.aftr_ipv6 {
89 debug!("Using configured AFTR address: {}", aftr_addr);
90 return Ok(aftr_addr);
91 }
92
93 if let Some(aftr_name) = &self.config.aftr_name {
95 debug!("Attempting DNS resolution for AFTR: {}", aftr_name);
96 match self.resolve_aftr_dns(aftr_name).await {
97 Ok(addr) => {
98 info!("Resolved AFTR {} to {}", aftr_name, addr);
99 return Ok(addr);
100 }
101 Err(e) => {
102 warn!("Failed to resolve AFTR via DNS: {}", e);
103 }
104 }
105 }
106
107 let well_known_names = [
109 "aftr.example.com",
110 "dslite.example.com",
111 "b4.example.com",
112 ];
113
114 for name in &well_known_names {
115 debug!("Trying well-known AFTR name: {}", name);
116 match self.resolve_aftr_dns(name).await {
117 Ok(addr) => {
118 info!("Resolved well-known AFTR {} to {}", name, addr);
119 return Ok(addr);
120 }
121 Err(e) => {
122 debug!("Failed to resolve {}: {}", name, e);
123 }
124 }
125 }
126
127 Err(P2PError::Network(
128 "Could not discover AFTR address through any mechanism".to_string()
129 ).into())
130 }
131
132 async fn resolve_aftr_dns(&self, hostname: &str) -> Result<Ipv6Addr> {
134 use tokio::net::lookup_host;
135
136 let addresses: Vec<SocketAddr> = lookup_host(format!("{}:0", hostname))
137 .await
138 .map_err(|e| P2PError::Network(format!("DNS resolution failed: {}", e)))?
139 .collect();
140
141 for addr in addresses {
143 if let SocketAddr::V6(addr_v6) = addr {
144 return Ok(*addr_v6.ip());
145 }
146 }
147
148 Err(P2PError::Network(
149 format!("No IPv6 addresses found for AFTR {}", hostname)
150 ).into())
151 }
152
153 async fn get_local_ipv6(&self) -> Result<Ipv6Addr> {
155 if let Some(configured_ipv6) = self.config.ipv6_prefix {
162 Ok(configured_ipv6)
163 } else {
164 Ok("fe80::1".parse().unwrap())
166 }
167 }
168
169 pub fn encapsulate_ipv4_in_ipv6(&self, ipv4_packet: &[u8], aftr_addr: Ipv6Addr, local_ipv6: Ipv6Addr) -> Result<Vec<u8>> {
171 let mut ipv6_packet = Vec::with_capacity(40 + ipv4_packet.len());
173
174 ipv6_packet.push(0x60); ipv6_packet.push(0x00); ipv6_packet.push(0x00); ipv6_packet.push(0x00); let payload_len = ipv4_packet.len() as u16;
183 ipv6_packet.extend_from_slice(&payload_len.to_be_bytes());
184
185 ipv6_packet.push(IPV4_IN_IPV6_PROTOCOL);
187
188 ipv6_packet.push(64);
190
191 ipv6_packet.extend_from_slice(&local_ipv6.octets());
193
194 ipv6_packet.extend_from_slice(&aftr_addr.octets());
196
197 ipv6_packet.extend_from_slice(ipv4_packet);
199
200 debug!("Encapsulated {} byte IPv4 packet in {} byte IPv6 packet",
201 ipv4_packet.len(), ipv6_packet.len());
202
203 Ok(ipv6_packet)
204 }
205
206 pub fn decapsulate_ipv6_to_ipv4(&self, ipv6_packet: &[u8]) -> Result<Vec<u8>> {
208 if ipv6_packet.len() < 40 {
209 return Err(P2PError::Network("IPv6 packet too short".to_string()).into());
210 }
211
212 if (ipv6_packet[0] >> 4) != 6 {
214 return Err(P2PError::Network("Not an IPv6 packet".to_string()).into());
215 }
216
217 if ipv6_packet[6] != IPV4_IN_IPV6_PROTOCOL {
219 return Err(P2PError::Network(
220 format!("Unexpected protocol in IPv6 header: {}", ipv6_packet[6])
221 ).into());
222 }
223
224 let payload_len = u16::from_be_bytes([ipv6_packet[4], ipv6_packet[5]]) as usize;
226
227 if ipv6_packet.len() < 40 + payload_len {
228 return Err(P2PError::Network("IPv6 packet truncated".to_string()).into());
229 }
230
231 let ipv4_packet = ipv6_packet[40..40 + payload_len].to_vec();
233
234 debug!("Decapsulated {} byte IPv6 packet to {} byte IPv4 packet",
235 ipv6_packet.len(), ipv4_packet.len());
236
237 Ok(ipv4_packet)
238 }
239
240 async fn update_metrics(&self, bytes_sent: u64, bytes_received: u64) {
242 let mut metrics = self.metrics.write().await;
243 metrics.bytes_sent += bytes_sent;
244 metrics.bytes_received += bytes_received;
245 metrics.last_activity = Instant::now();
246 if bytes_sent > 0 {
247 metrics.packets_sent += 1;
248 }
249 if bytes_received > 0 {
250 metrics.packets_received += 1;
251 }
252 }
253}
254
255
256#[async_trait]
257impl Tunnel for DsLiteTunnel {
258 fn protocol(&self) -> TunnelProtocol {
259 TunnelProtocol::DsLite
260 }
261
262 fn config(&self) -> &TunnelConfig {
263 &self.config
264 }
265
266 async fn state(&self) -> TunnelState {
267 self.state.read().await.clone()
268 }
269
270 async fn metrics(&self) -> TunnelMetrics {
271 self.metrics.read().await.clone()
272 }
273
274 async fn connect(&mut self) -> Result<()> {
275 info!("Establishing DS-Lite tunnel connection");
276 *self.state.write().await = TunnelState::Connecting;
277
278 let start_time = Instant::now();
279
280 let aftr_addr = match self.discover_aftr().await {
282 Ok(addr) => {
283 info!("AFTR discovery successful: {}", addr);
284 addr
285 }
286 Err(e) => {
287 error!("AFTR discovery failed: {}", e);
288 *self.state.write().await = TunnelState::Failed(format!("AFTR discovery failed: {}", e));
289 return Err(e);
290 }
291 };
292
293 *self.aftr_address.write().await = Some(aftr_addr);
294
295 let local_ipv6 = match self.get_local_ipv6().await {
297 Ok(addr) => {
298 debug!("Using local IPv6 address: {}", addr);
299 addr
300 }
301 Err(e) => {
302 error!("Failed to get local IPv6 address: {}", e);
303 *self.state.write().await = TunnelState::Failed(format!("Local IPv6 setup failed: {}", e));
304 return Err(e);
305 }
306 };
307
308 let socket_addr = SocketAddrV6::new(local_ipv6, 0, 0, 0);
310 let socket = match UdpSocket::bind(socket_addr).await {
311 Ok(sock) => {
312 debug!("Created DS-Lite tunnel socket on {}", socket_addr);
313 sock
314 }
315 Err(e) => {
316 error!("Failed to create tunnel socket: {}", e);
317 *self.state.write().await = TunnelState::Failed(format!("Socket creation failed: {}", e));
318 return Err(P2PError::Network(format!("Socket creation failed: {}", e)).into());
319 }
320 };
321
322 *self.socket.write().await = Some(socket);
323 *self.established_at.write().await = Some(start_time);
324
325 let establishment_time = start_time.elapsed();
327 self.metrics.write().await.establishment_time = establishment_time;
328
329 *self.state.write().await = TunnelState::Connected;
330 info!("DS-Lite tunnel established successfully in {:?}", establishment_time);
331
332 Ok(())
333 }
334
335 async fn disconnect(&mut self) -> Result<()> {
336 info!("Disconnecting DS-Lite tunnel");
337 *self.state.write().await = TunnelState::Disconnecting;
338
339 *self.socket.write().await = None;
341 *self.aftr_address.write().await = None;
342 *self.established_at.write().await = None;
343
344 *self.state.write().await = TunnelState::Disconnected;
345 info!("DS-Lite tunnel disconnected");
346
347 Ok(())
348 }
349
350 async fn is_active(&self) -> bool {
351 matches!(*self.state.read().await, TunnelState::Connected)
352 }
353
354 async fn encapsulate(&self, _ipv6_packet: &[u8]) -> Result<Vec<u8>> {
355 return Err(P2PError::Network(
356 "DS-Lite does not encapsulate IPv6 packets - it encapsulates IPv4 packets in IPv6".to_string()
357 ).into());
358 }
359
360 async fn decapsulate(&self, _ipv4_packet: &[u8]) -> Result<Vec<u8>> {
361 return Err(P2PError::Network(
362 "DS-Lite does not decapsulate to IPv6 - it decapsulates IPv6 packets to IPv4".to_string()
363 ).into());
364 }
365
366 async fn send(&mut self, packet: &[u8]) -> Result<()> {
367 if !self.is_active().await {
368 return Err(P2PError::Network("Tunnel not connected".to_string()).into());
369 }
370
371 let aftr_addr = self.aftr_address.read().await
372 .ok_or_else(|| P2PError::Network("AFTR address not available".to_string()))?;
373
374 let local_ipv6 = self.get_local_ipv6().await?;
375
376 let ipv6_packet = self.encapsulate_ipv4_in_ipv6(packet, aftr_addr, local_ipv6)?;
378
379 let socket_guard = self.socket.read().await;
381 if let Some(socket) = socket_guard.as_ref() {
382 let aftr_socket_addr = SocketAddrV6::new(aftr_addr, 0, 0, 0);
383 socket.send_to(&ipv6_packet, aftr_socket_addr).await
384 .map_err(|e| P2PError::Network(format!("Send failed: {}", e)))?;
385
386 self.update_metrics(ipv6_packet.len() as u64, 0).await;
388
389 debug!("Sent {} byte packet through DS-Lite tunnel", packet.len());
390 Ok(())
391 } else {
392 Err(P2PError::Network("Socket not available".to_string()).into())
393 }
394 }
395
396 async fn receive(&mut self) -> Result<Vec<u8>> {
397 if !self.is_active().await {
398 return Err(P2PError::Network("Tunnel not connected".to_string()).into());
399 }
400
401 let socket_guard = self.socket.read().await;
402 if let Some(socket) = socket_guard.as_ref() {
403 let mut buffer = vec![0u8; 65536]; let (len, _addr) = socket.recv_from(&mut buffer).await
405 .map_err(|e| P2PError::Network(format!("Receive failed: {}", e)))?;
406
407 buffer.truncate(len);
408
409 let ipv4_packet = self.decapsulate_ipv6_to_ipv4(&buffer)?;
411
412 self.update_metrics(0, len as u64).await;
414
415 debug!("Received {} byte packet through DS-Lite tunnel", ipv4_packet.len());
416 Ok(ipv4_packet)
417 } else {
418 Err(P2PError::Network("Socket not available".to_string()).into())
419 }
420 }
421
422 async fn maintain(&mut self) -> Result<()> {
423 debug!("DS-Lite tunnel maintenance - no action required");
427 Ok(())
428 }
429
430 async fn local_ipv6_addr(&self) -> Result<Ipv6Addr> {
431 self.get_local_ipv6().await
432 }
433
434 async fn local_ipv4_addr(&self) -> Result<Ipv4Addr> {
435 Ok(AFTR_WELL_KNOWN_IPV4)
437 }
438
439 async fn ping(&mut self, timeout: Duration) -> Result<Duration> {
440 let start = Instant::now();
441
442 if let Some(aftr_addr) = *self.aftr_address.read().await {
444 let socket_addr = SocketAddrV6::new(aftr_addr, 0, 0, 0);
446
447 tokio::time::timeout(timeout, async {
449 let test_socket = UdpSocket::bind("[::]:0").await
451 .map_err(|e| P2PError::Network(format!("Ping test failed: {}", e)))?;
452
453 test_socket.connect(socket_addr).await
454 .map_err(|e| P2PError::Network(format!("AFTR unreachable: {}", e)))?;
455
456 Ok::<(), P2PError>(())
457 }).await
458 .map_err(|_| P2PError::Network("Ping timeout".to_string()))??;
459
460 let rtt = start.elapsed();
461 self.metrics.write().await.rtt = Some(rtt);
462 Ok(rtt)
463 } else {
464 Err(P2PError::Network("AFTR address not available for ping".to_string()).into())
465 }
466 }
467}