Skip to main content

zlayer_overlay/
transport.rs

1//! Encrypted overlay transport layer
2//!
3//! Uses boringtun (userspace `WireGuard`) to create TUN-based encrypted tunnels.
4//! No kernel `WireGuard` module or `wg` binary required.
5//!
6//! On Linux, creates TUN interfaces via `/dev/net/tun`.
7//! On macOS, creates utun interfaces via the kernel control socket.
8//! On Windows, creates a Wintun adapter (see [`crate::tun::windows`]),
9//! configures it via IP Helper (see [`crate::interface::windows`]), and
10//! drives the `WireGuard` noise pipeline directly via
11//! `boringtun::noise::Tunn` paired with a userspace UDP socket. Three
12//! async tasks shuttle packets: `ingress` (UDP → decap → TUN), `egress`
13//! (TUN → encap → UDP), and `timers` (per-peer `update_timers` tick to
14//! emit keepalives / re-initiate handshakes).
15
16use crate::interface::platform_ops;
17use crate::{config::OverlayConfig, PeerInfo};
18#[cfg(not(windows))]
19use boringtun::device::{DeviceConfig, DeviceHandle};
20use std::fmt::Write;
21#[cfg(not(windows))]
22use tokio::io::{AsyncReadExt, AsyncWriteExt};
23#[cfg(not(windows))]
24use tokio::net::UnixStream;
25
26#[cfg(windows)]
27use crate::tun::WindowsTun;
28#[cfg(windows)]
29use boringtun::noise::{Tunn, TunnResult};
30#[cfg(windows)]
31use dashmap::DashMap;
32#[cfg(windows)]
33use parking_lot::RwLock;
34#[cfg(windows)]
35use std::net::{IpAddr, SocketAddr};
36#[cfg(windows)]
37use std::sync::atomic::{AtomicU64, Ordering};
38#[cfg(windows)]
39use std::sync::Arc;
40#[cfg(windows)]
41use std::time::{Duration, SystemTime, UNIX_EPOCH};
42#[cfg(windows)]
43use tokio::net::UdpSocket;
44#[cfg(windows)]
45use tokio::sync::Mutex as AsyncMutex;
46#[cfg(windows)]
47use tokio::task::JoinHandle;
48
49// ---------------------------------------------------------------------------
50// UAPI helpers (Linux/macOS only — Windows drives boringtun::noise::Tunn
51// directly without going through a Unix socket)
52// ---------------------------------------------------------------------------
53
54/// Convert a base64-encoded `WireGuard` key to hex (UAPI requires hex-encoded keys).
55#[cfg(not(windows))]
56fn key_to_hex(base64_key: &str) -> Result<String, Box<dyn std::error::Error>> {
57    use base64::{engine::general_purpose::STANDARD, Engine as _};
58    let bytes = STANDARD.decode(base64_key)?;
59    if bytes.len() != 32 {
60        return Err(format!("Invalid key length: expected 32 bytes, got {}", bytes.len()).into());
61    }
62    Ok(hex::encode(bytes))
63}
64
65/// Send a UAPI `set` command to the boringtun device.
66///
67/// The body should contain newline-delimited `key=value` pairs (without the
68/// leading `set=1\n` — that is prepended automatically).
69#[cfg(not(windows))]
70async fn uapi_set(sock_path: &str, body: &str) -> Result<(), Box<dyn std::error::Error>> {
71    let mut stream = UnixStream::connect(sock_path).await?;
72    let msg = format!("set=1\n{body}\n");
73    stream.write_all(msg.as_bytes()).await?;
74    stream.shutdown().await?;
75    let mut response = String::new();
76    stream.read_to_string(&mut response).await?;
77    if response.contains("errno=0") {
78        Ok(())
79    } else {
80        Err(format!("UAPI set failed: {}", response.trim()).into())
81    }
82}
83
84/// Send a UAPI `get` command and return the raw response.
85#[cfg(not(windows))]
86async fn uapi_get(sock_path: &str) -> Result<String, Box<dyn std::error::Error>> {
87    let mut stream = UnixStream::connect(sock_path).await?;
88    stream.write_all(b"get=1\n\n").await?;
89    stream.shutdown().await?;
90    let mut response = String::new();
91    stream.read_to_string(&mut response).await?;
92    Ok(response)
93}
94
95// ---------------------------------------------------------------------------
96// Windows-only helpers (packet parsing, key decoding, Tunn construction)
97// ---------------------------------------------------------------------------
98
99/// Per-peer state held by the Windows packet loop.
100///
101/// `tunn` is behind an async Mutex because ingress / egress / timers
102/// tasks all need `&mut Tunn` for encapsulate / decapsulate /
103/// `update_timers`. `endpoint` uses `parking_lot::RwLock` since reads
104/// dominate (egress path + timers) and writes are rare (NAT endpoint
105/// switch). `last_handshake_sec` is a monotonic-ish unix-seconds
106/// counter updated from the ingress path; `allowed_ips` is immutable
107/// after `add_peer`.
108#[cfg(windows)]
109#[derive(Clone)]
110struct WindowsPeerState {
111    tunn: Arc<AsyncMutex<Tunn>>,
112    endpoint: Arc<RwLock<Option<SocketAddr>>>,
113    last_handshake_sec: Arc<AtomicU64>,
114    allowed_ips: Arc<Vec<ipnet::IpNet>>,
115    persistent_keepalive: Option<u16>,
116}
117
118/// Decode a base64-encoded `WireGuard` key into a 32-byte array.
119///
120/// Used on Windows where we drive `Tunn` directly and therefore need
121/// raw key bytes rather than the hex encoding UAPI uses.
122#[cfg(windows)]
123fn decode_key_b64(b64: &str) -> Result<[u8; 32], Box<dyn std::error::Error>> {
124    use base64::{engine::general_purpose::STANDARD, Engine as _};
125    let bytes = STANDARD.decode(b64)?;
126    if bytes.len() != 32 {
127        return Err(format!(
128            "invalid WireGuard key length: expected 32 bytes, got {}",
129            bytes.len()
130        )
131        .into());
132    }
133    let mut out = [0u8; 32];
134    out.copy_from_slice(&bytes);
135    Ok(out)
136}
137
138/// Extract the destination IP from a raw IPv4 or IPv6 packet.
139///
140/// IPv4: version in the top 4 bits of byte 0, dst in bytes 16..20.
141/// IPv6: version in the top 4 bits of byte 0, dst in bytes 24..40.
142/// Returns `None` for non-IP (version mismatch) or truncated packets.
143#[cfg(windows)]
144fn parse_dst_ip(packet: &[u8]) -> Option<IpAddr> {
145    if packet.is_empty() {
146        return None;
147    }
148    match packet[0] >> 4 {
149        4 if packet.len() >= 20 => {
150            let b: [u8; 4] = packet[16..20].try_into().ok()?;
151            Some(IpAddr::from(b))
152        }
153        6 if packet.len() >= 40 => {
154            let b: [u8; 16] = packet[24..40].try_into().ok()?;
155            Some(IpAddr::from(b))
156        }
157        _ => None,
158    }
159}
160
161/// Build a new `Tunn` from raw key material.
162///
163/// Any boringtun construction error is surfaced as
164/// `OverlayError::NetworkConfig` via the caller's error mapping.
165/// `index=0` lets boringtun assign its own internal session indices;
166/// `rate_limiter=None` uses the per-tunnel default.
167#[cfg(windows)]
168fn build_tunn(
169    our_priv: &[u8; 32],
170    peer_pub: &[u8; 32],
171    preshared: Option<[u8; 32]>,
172    persistent_keepalive: Option<u16>,
173) -> Tunn {
174    let priv_secret = boringtun::x25519::StaticSecret::from(*our_priv);
175    let peer_pub_key = boringtun::x25519::PublicKey::from(*peer_pub);
176    // `Tunn::new` returns `Self` directly in boringtun 0.7 — no Result.
177    Tunn::new(
178        priv_secret,
179        peer_pub_key,
180        preshared,
181        persistent_keepalive,
182        0,
183        None,
184    )
185}
186
187// ---------------------------------------------------------------------------
188// OverlayTransport
189// ---------------------------------------------------------------------------
190
191/// Encrypted overlay transport layer.
192///
193/// Uses boringtun (userspace `WireGuard`) to create TUN-based encrypted tunnels.
194/// No kernel `WireGuard` module required.
195///
196/// **Important:** This struct holds the boringtun [`DeviceHandle`] (or, on
197/// Windows, the Wintun adapter + spawned loop tasks). Dropping the struct
198/// tears down the TUN device. Callers **must** keep this struct alive for
199/// the entire overlay network lifetime.
200pub struct OverlayTransport {
201    config: OverlayConfig,
202    interface_name: String,
203    /// boringtun-managed TUN device handle (Linux/macOS).
204    /// On Windows we own the adapter via `wintun` instead — see `wintun_dev`.
205    #[cfg(not(windows))]
206    device: Option<DeviceHandle>,
207    /// Wintun adapter + session handle (Windows only).
208    ///
209    /// Wrapped in `Arc` so ingress / egress tasks can hold references
210    /// while the transport itself remains owned by the caller. Dropping
211    /// the last Arc tears down the Wintun session and removes the
212    /// adapter, mirroring the `DeviceHandle::drop` semantics on Unix.
213    #[cfg(windows)]
214    wintun_dev: Option<Arc<WindowsTun>>,
215    /// UDP socket servicing the `WireGuard` protocol for this overlay
216    /// (Windows only). Bound to `OverlayConfig::local_endpoint` in
217    /// `configure`; shared across ingress / egress / timers tasks.
218    #[cfg(windows)]
219    udp: Option<Arc<UdpSocket>>,
220    /// Per-peer Noise state + metadata, keyed by raw 32-byte public key.
221    /// `DashMap` lets the ingress task (write to endpoint / handshake
222    /// timestamp), the egress task (read endpoint), and the timers task
223    /// (encapsulate keepalives) all mutate independent shards without
224    /// contending on a global lock.
225    #[cfg(windows)]
226    peers: Arc<DashMap<[u8; 32], WindowsPeerState>>,
227    /// Ingress task: UDP → `Tunn::decapsulate` → Wintun send.
228    #[cfg(windows)]
229    ingress_task: Option<JoinHandle<()>>,
230    /// Egress task: Wintun recv → `Tunn::encapsulate` → UDP send.
231    #[cfg(windows)]
232    egress_task: Option<JoinHandle<()>>,
233    /// Timers task: ~250 ms tick driving `Tunn::update_timers` per peer
234    /// to fire keepalives and handshake re-initiations.
235    #[cfg(windows)]
236    timers_task: Option<JoinHandle<()>>,
237}
238
239impl OverlayTransport {
240    /// Create a new overlay transport (device is not started yet).
241    #[must_use]
242    pub fn new(config: OverlayConfig, interface_name: String) -> Self {
243        Self {
244            config,
245            interface_name,
246            #[cfg(not(windows))]
247            device: None,
248            #[cfg(windows)]
249            wintun_dev: None,
250            #[cfg(windows)]
251            udp: None,
252            #[cfg(windows)]
253            peers: Arc::new(DashMap::new()),
254            #[cfg(windows)]
255            ingress_task: None,
256            #[cfg(windows)]
257            egress_task: None,
258            #[cfg(windows)]
259            timers_task: None,
260        }
261    }
262
263    /// Get the resolved interface name.
264    ///
265    /// On macOS, this returns the kernel-assigned `utunN` name after
266    /// [`create_interface`] has been called. Before that, it returns the
267    /// name passed to [`new`].
268    #[must_use]
269    pub fn interface_name(&self) -> &str {
270        &self.interface_name
271    }
272
273    /// Path to the UAPI Unix socket for this interface.
274    ///
275    /// Linux/macOS only — Windows does not use UAPI.
276    ///
277    /// Derived from [`OverlayConfig::uapi_sock_dir`] so callers running
278    /// with a non-default `--data-dir` can scope their UAPI sockets to
279    /// their own data directory (avoids collisions with a system-wide
280    /// zlayer install that owns `/var/run/wireguard`).
281    #[cfg(not(windows))]
282    fn uapi_sock_path(&self) -> String {
283        self.config
284            .uapi_sock_dir
285            .join(format!("{}.sock", self.interface_name))
286            .to_string_lossy()
287            .into_owned()
288    }
289
290    /// Create the TUN interface.
291    ///
292    /// On Linux/macOS this spawns boringtun worker threads that manage
293    /// the TUN device. On Windows it instantiates a Wintun adapter (no
294    /// boringtun threads — the Windows packet loop is driven directly
295    /// via `boringtun::noise::Tunn` in a follow-up task; today this
296    /// only stands the adapter up so IP configuration can run).
297    ///
298    /// The device is torn down when this struct is dropped (or
299    /// [`shutdown`] is called).
300    ///
301    /// On Linux, creates a named TUN interface (requires `CAP_NET_ADMIN`).
302    /// On macOS, creates a kernel-assigned `utunN` interface (requires `sudo`).
303    /// On Windows, creates a Wintun adapter (requires Administrator and
304    /// `wintun.dll` on disk — see [`crate::tun::windows`]).
305    ///
306    /// # Errors
307    ///
308    /// Returns an error if the TUN device cannot be created or required
309    /// privileges are unavailable.
310    pub async fn create_interface(&mut self) -> Result<(), Box<dyn std::error::Error>> {
311        #[cfg(windows)]
312        {
313            self.create_interface_windows().await
314        }
315        #[cfg(not(windows))]
316        {
317            self.create_interface_unix().await
318        }
319    }
320
321    /// Linux / macOS implementation of [`Self::create_interface`].
322    #[cfg(not(windows))]
323    #[allow(clippy::too_many_lines)]
324    async fn create_interface_unix(&mut self) -> Result<(), Box<dyn std::error::Error>> {
325        // On Linux, validate interface name length (IFNAMSIZ = 15).
326        // On macOS, the kernel auto-assigns utunN names so validation is skipped.
327        #[cfg(not(target_os = "macos"))]
328        if self.interface_name.len() > 15 {
329            return Err(format!(
330                "Interface name '{}' exceeds 15 character limit",
331                self.interface_name
332            )
333            .into());
334        }
335
336        // Ensure the UAPI socket directory exists (data-dir-aware — see
337        // `OverlayConfig::uapi_sock_dir`).
338        tokio::fs::create_dir_all(&self.config.uapi_sock_dir).await?;
339
340        // On Linux, refuse to silently delete an existing kernel link. Stale
341        // interfaces from a previous crashed daemon are swept by the
342        // boot-time cleanup in `bin/zlayer/src/commands/serve.rs::cleanup_stale_daemon`.
343        // If a link with this name still exists when we get here, that's a
344        // real duplicate-name bug (or a foreign owner) and silently deleting
345        // it would yank the TUN out from under another live boringtun
346        // worker, producing the upstream "Fatal read error on tun interface:
347        // Os { code: 77 }" (EBADFD) symptom.
348        //
349        // macOS utun devices are kernel-managed and auto-destroyed when the
350        // owning socket closes, so this check is Linux-only.
351        #[cfg(target_os = "linux")]
352        {
353            let iface_ops = platform_ops();
354            match iface_ops.link_exists(&self.interface_name).await {
355                Ok(true) => {
356                    return Err(format!(
357                        "Kernel link '{}' already exists; refusing to delete it. \
358                         If this is a stale interface from a previous crash, restart \
359                         the daemon (its boot-time sweep clears stale zl-* / veth-* \
360                         links). If this fires during normal operation, there is a \
361                         duplicate-name bug somewhere in the overlay setup path.",
362                        self.interface_name
363                    )
364                    .into());
365                }
366                Ok(false) => {}
367                Err(e) => {
368                    tracing::warn!(
369                        interface = %self.interface_name,
370                        error = %e,
371                        "failed to probe for existing overlay interface; proceeding"
372                    );
373                }
374            }
375        }
376
377        // Clean up stale UAPI socket left behind by a crashed process.
378        let sock_path = self
379            .config
380            .uapi_sock_dir
381            .join(format!("{}.sock", self.interface_name));
382        if tokio::fs::try_exists(&sock_path).await.unwrap_or(false) {
383            tracing::warn!(path = %sock_path.display(), "removing stale UAPI socket");
384            let _ = tokio::fs::remove_file(&sock_path).await;
385        }
386
387        // On macOS, snapshot existing UAPI sockets so we can discover the
388        // kernel-assigned utunN name after device creation.
389        #[cfg(target_os = "macos")]
390        let existing_socks = {
391            let mut set = std::collections::HashSet::new();
392            if let Ok(mut entries) = tokio::fs::read_dir(&self.config.uapi_sock_dir).await {
393                while let Ok(Some(entry)) = entries.next_entry().await {
394                    set.insert(entry.file_name().to_string_lossy().to_string());
395                }
396            }
397            set
398        };
399
400        // On macOS, pass "utun" to let the kernel auto-assign a utunN device.
401        #[cfg(target_os = "macos")]
402        let name = "utun".to_string();
403        #[cfg(not(target_os = "macos"))]
404        let name = self.interface_name.clone();
405
406        let cfg = DeviceConfig {
407            n_threads: 2,
408            use_connected_socket: true,
409            #[cfg(target_os = "linux")]
410            use_multi_queue: false,
411            #[cfg(target_os = "linux")]
412            uapi_fd: -1,
413        };
414
415        let iface_name_for_err = self.interface_name.clone();
416
417        // DeviceHandle::new() blocks (spawns threads), so run on the blocking
418        // thread pool.
419        let handle = tokio::task::spawn_blocking(move || DeviceHandle::new(&name, cfg))
420            .await
421            .map_err(|e| format!("spawn_blocking join error: {e}"))?
422            .map_err(|e| {
423                #[cfg(target_os = "macos")]
424                let hint = "Requires root. Run with sudo or install as a system service (zlayer daemon install).";
425                #[cfg(not(target_os = "macos"))]
426                let hint = "Ensure CAP_NET_ADMIN capability is available.";
427                format!("Failed to create boringtun device '{iface_name_for_err}': {e}. {hint}")
428            })?;
429
430        self.device = Some(handle);
431
432        // On macOS, discover the actual utunN interface name by finding the
433        // newly created UAPI socket.
434        #[cfg(target_os = "macos")]
435        {
436            // Small delay to let boringtun finish socket setup
437            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
438            if let Ok(mut entries) = tokio::fs::read_dir(&self.config.uapi_sock_dir).await {
439                while let Ok(Some(entry)) = entries.next_entry().await {
440                    let fname = entry.file_name().to_string_lossy().to_string();
441                    if !existing_socks.contains(&fname)
442                        && fname.starts_with("utun")
443                        && std::path::Path::new(&fname)
444                            .extension()
445                            .is_some_and(|ext| ext.eq_ignore_ascii_case("sock"))
446                    {
447                        self.interface_name = fname.trim_end_matches(".sock").to_string();
448                        break;
449                    }
450                }
451            }
452        }
453
454        tracing::info!(
455            interface = %self.interface_name,
456            "Created boringtun overlay transport"
457        );
458        Ok(())
459    }
460
461    /// Windows implementation of [`Self::create_interface`].
462    ///
463    /// Stands up a Wintun adapter named `self.interface_name`. The
464    /// packet-forwarding loop (UDP ↔ Tunn ↔ Wintun) is started later
465    /// from [`Self::configure`] once the peer set + listen port are
466    /// known.
467    #[cfg(windows)]
468    async fn create_interface_windows(&mut self) -> Result<(), Box<dyn std::error::Error>> {
469        // Wintun has no IFNAMSIZ-style limit, but very long names are
470        // a sign of a misconfigured caller and look terrible in the UI.
471        if self.interface_name.len() > 64 {
472            return Err(format!(
473                "Wintun adapter name '{}' exceeds 64 character limit",
474                self.interface_name
475            )
476            .into());
477        }
478
479        let iface_name = self.interface_name.clone();
480        let mtu = 1420; // Standard WireGuard MTU. IP Helper can override later.
481
482        // WindowsTun::new is synchronous — wrap in spawn_blocking so we
483        // don't stall the runtime if Wintun's load_from_path / adapter
484        // creation is slow on a busy host.
485        let dev = tokio::task::spawn_blocking(move || WindowsTun::new(&iface_name, mtu))
486            .await
487            .map_err(|e| format!("spawn_blocking join error: {e}"))??;
488
489        tracing::info!(
490            interface = %self.interface_name,
491            luid = dev.luid_value(),
492            "Created Wintun overlay adapter"
493        );
494
495        self.wintun_dev = Some(Arc::new(dev));
496        Ok(())
497    }
498
499    /// Configure the transport with private key, listen port, and peers.
500    ///
501    /// On Linux/macOS this writes the `WireGuard` configuration via UAPI
502    /// to boringtun's per-interface socket, then assigns the overlay IP
503    /// and brings the interface up via [`InterfaceOps`].
504    ///
505    /// On Windows this binds the UDP listener, creates a
506    /// `boringtun::noise::Tunn` instance for each configured peer,
507    /// spawns the ingress / egress / timers tasks that drive the noise
508    /// protocol against the Wintun adapter, and configures the IP layer.
509    ///
510    /// # Errors
511    ///
512    /// On Linux/macOS, returns an error if UAPI configuration or IP
513    /// assignment fails. On Windows, returns an error if the Wintun
514    /// adapter is missing, the UDP socket cannot be bound, key decoding
515    /// fails, or the IP layer cannot be configured.
516    pub async fn configure(
517        &mut self,
518        peers: &[PeerInfo],
519    ) -> Result<(), Box<dyn std::error::Error>> {
520        #[cfg(not(windows))]
521        {
522            let sock = self.uapi_sock_path();
523
524            // Build the UAPI set body
525            let private_key_hex = key_to_hex(&self.config.private_key)?;
526            let mut body = format!(
527                "private_key={}\nlisten_port={}\n",
528                private_key_hex,
529                self.config.local_endpoint.port(),
530            );
531
532            for peer in peers {
533                let pub_hex = key_to_hex(&peer.public_key)?;
534                let _ = writeln!(body, "public_key={pub_hex}");
535                let _ = writeln!(body, "endpoint={}", peer.endpoint);
536                let _ = writeln!(body, "allowed_ip={}", peer.allowed_ips);
537                let _ = writeln!(
538                    body,
539                    "persistent_keepalive_interval={}",
540                    peer.persistent_keepalive_interval.as_secs()
541                );
542            }
543
544            uapi_set(&sock, &body).await?;
545            tracing::debug!(interface = %self.interface_name, "Applied UAPI configuration");
546
547            // Assign overlay IP address and bring interface up
548            self.configure_interface().await?;
549
550            tracing::info!(interface = %self.interface_name, "Overlay transport configured and up");
551            Ok(())
552        }
553
554        #[cfg(windows)]
555        {
556            self.configure_windows(peers).await
557        }
558    }
559
560    /// Windows implementation of [`Self::configure`].
561    ///
562    /// Responsibilities (in order):
563    /// 1. Configure the IP/route layer so host state is consistent.
564    /// 2. Bind the `WireGuard` UDP socket on `local_endpoint`.
565    /// 3. Build a `Tunn` per peer and seed `self.peers`.
566    /// 4. Spawn the three loop tasks (ingress, egress, timers).
567    #[cfg(windows)]
568    async fn configure_windows(
569        &mut self,
570        peers: &[PeerInfo],
571    ) -> Result<(), Box<dyn std::error::Error>> {
572        // IP layer first.
573        self.configure_interface().await?;
574
575        // Install a catch-all host route for the full cluster CIDR via the
576        // Wintun adapter. HCN auto-installs the more specific per-node /28 →
577        // vSwitch route, so longest-prefix-match sends local-slice traffic to
578        // the vSwitch and remote-slice traffic to Wintun (where boringtun's
579        // egress loop picks it up, encapsulates, and forwards to the owning
580        // peer). Without this route, packets destined for remote container
581        // IPs leak out of the default gateway instead of the overlay.
582        //
583        // Failure is logged as a warning but not fatal — the route may
584        // already exist from a previous run, and we want adapter bringup to
585        // remain idempotent.
586        if let Some(ref cluster_cidr_str) = self.config.cluster_cidr {
587            match cluster_cidr_str.parse::<ipnet::IpNet>() {
588                Ok(net) => {
589                    use crate::interface::windows::WindowsIpHelperOps;
590                    use crate::interface::InterfaceOps;
591                    let ops = WindowsIpHelperOps::new();
592                    let adapter_name = self.interface_name.clone();
593                    match ops
594                        .add_route_via_dev(net.network(), net.prefix_len(), &adapter_name)
595                        .await
596                    {
597                        Ok(()) => {
598                            tracing::info!(
599                                cidr = %net,
600                                adapter = %adapter_name,
601                                "Installed cluster-CIDR host route via Wintun adapter"
602                            );
603                        }
604                        Err(e) => {
605                            tracing::warn!(
606                                error = %e,
607                                cidr = %net,
608                                adapter = %adapter_name,
609                                "Failed to install cluster-CIDR host route via Wintun (overlay traffic may not route across nodes); route may already exist"
610                            );
611                        }
612                    }
613                }
614                Err(e) => {
615                    tracing::warn!(
616                        error = %e,
617                        cidr = %cluster_cidr_str,
618                        "cluster_cidr unparseable; skipping Wintun route install"
619                    );
620                }
621            }
622        } else {
623            tracing::warn!(
624                "cluster_cidr not set in OverlayConfig; skipping Wintun route install (cross-node overlay traffic may not route)"
625            );
626        }
627
628        // Wintun adapter must already be up via `create_interface`.
629        let tun = self
630            .wintun_dev
631            .as_ref()
632            .ok_or("Wintun adapter not initialized — call create_interface first")?
633            .clone();
634
635        // Bind the WireGuard UDP socket. Use the configured local
636        // endpoint so IPv4 / IPv6 family matches the overlay.
637        let listen = self.config.local_endpoint;
638        let udp = Arc::new(
639            UdpSocket::bind(listen)
640                .await
641                .map_err(|e| format!("failed to bind WireGuard UDP socket on {listen}: {e}"))?,
642        );
643        self.udp = Some(udp.clone());
644
645        // Seed peers from the initial config.
646        let priv_bytes = decode_key_b64(&self.config.private_key)?;
647        for peer in peers {
648            self.add_peer_windows(&priv_bytes, peer)?;
649        }
650
651        // Spawn the three driver tasks. They hold Arc clones of the
652        // shared state so they outlive individual peer inserts /
653        // removes; aborted during `shutdown`.
654        let peers_ingress = self.peers.clone();
655        let udp_ingress = udp.clone();
656        let tun_ingress = tun.clone();
657        self.ingress_task = Some(tokio::spawn(async move {
658            Self::ingress_loop(udp_ingress, tun_ingress, peers_ingress).await;
659        }));
660
661        let peers_egress = self.peers.clone();
662        let udp_egress = udp.clone();
663        let tun_egress = tun.clone();
664        self.egress_task = Some(tokio::spawn(async move {
665            Self::egress_loop(tun_egress, udp_egress, peers_egress).await;
666        }));
667
668        let peers_timers = self.peers.clone();
669        let udp_timers = udp.clone();
670        self.timers_task = Some(tokio::spawn(async move {
671            Self::timers_loop(udp_timers, peers_timers).await;
672        }));
673
674        tracing::info!(
675            interface = %self.interface_name,
676            peer_count = peers.len(),
677            listen = %listen,
678            "Windows overlay transport configured (Tunn pipeline online)"
679        );
680        Ok(())
681    }
682
683    /// Insert (or replace) a peer in the Windows peer map.
684    ///
685    /// Used both by `configure_windows` at bootstrap time and by the
686    /// public `add_peer` entry point. Assumes `self.config.private_key`
687    /// has already been decoded (the caller passes the raw bytes to
688    /// avoid re-decoding per peer during bulk seeding).
689    #[cfg(windows)]
690    fn add_peer_windows(
691        &self,
692        our_priv: &[u8; 32],
693        peer: &PeerInfo,
694    ) -> Result<(), Box<dyn std::error::Error>> {
695        let peer_pub = decode_key_b64(&peer.public_key)?;
696        let allowed: ipnet::IpNet = peer
697            .allowed_ips
698            .parse()
699            .map_err(|e| format!("invalid allowed_ips '{}': {e}", peer.allowed_ips))?;
700        // WireGuard persistent_keepalive_interval of 0 means "disabled",
701        // which boringtun models as `None`.
702        let keepalive = {
703            let secs = peer.persistent_keepalive_interval.as_secs();
704            if secs == 0 {
705                None
706            } else {
707                u16::try_from(secs).ok()
708            }
709        };
710
711        let tunn = build_tunn(our_priv, &peer_pub, None, keepalive);
712        let state = WindowsPeerState {
713            tunn: Arc::new(AsyncMutex::new(tunn)),
714            endpoint: Arc::new(RwLock::new(Some(peer.endpoint))),
715            last_handshake_sec: Arc::new(AtomicU64::new(0)),
716            allowed_ips: Arc::new(vec![allowed]),
717            persistent_keepalive: keepalive,
718        };
719        self.peers.insert(peer_pub, state);
720        tracing::debug!(
721            peer_key = %peer.public_key,
722            endpoint = %peer.endpoint,
723            allowed = %peer.allowed_ips,
724            "Added peer to Windows overlay peer map"
725        );
726        Ok(())
727    }
728
729    /// UDP → decapsulate → Wintun loop.
730    ///
731    /// For each inbound datagram we linear-scan the peer map and hand
732    /// the packet to each `Tunn::decapsulate` until one returns a
733    /// non-error result. This is O(N peers) per packet; for the
734    /// small-cluster overlays `ZLayer` targets it is fine. A future
735    /// optimization can cache `src_addr → pubkey` once sessions are
736    /// established.
737    ///
738    /// `decapsulate` returns:
739    /// - `WriteToTunnelV4` / `WriteToTunnelV6` — cleartext IP packet to
740    ///   inject into Wintun.
741    /// - `WriteToNetwork` — an auto-generated WG reply (cookie / 2nd
742    ///   handshake message) to echo back to the remote.
743    /// - `Done` / `Err` — not our peer, keep scanning.
744    ///
745    /// After a successful decap, we loop on an empty-datagram call
746    /// to drain any queued packets boringtun buffered during the
747    /// handshake (documented behavior of `decapsulate`).
748    #[cfg(windows)]
749    async fn ingress_loop(
750        udp: Arc<UdpSocket>,
751        tun: Arc<WindowsTun>,
752        peers: Arc<DashMap<[u8; 32], WindowsPeerState>>,
753    ) {
754        // 65536 covers the largest possible IPv4/IPv6 datagram.
755        let mut inbuf = vec![0u8; 65536];
756        loop {
757            let (n, src) = match udp.recv_from(&mut inbuf).await {
758                Ok(p) => p,
759                Err(e) => {
760                    tracing::error!(error = %e, "UDP recv failed; ingress loop exiting");
761                    break;
762                }
763            };
764
765            // Snapshot (pubkey, state) pairs so we release the DashMap
766            // shard lock before awaiting on the async per-peer Mutex.
767            let snapshot: Vec<([u8; 32], WindowsPeerState)> = peers
768                .iter()
769                .map(|e| (*e.key(), e.value().clone()))
770                .collect();
771
772            for (pk, state) in snapshot {
773                let mut out = vec![0u8; 65536];
774                let mut handled = false;
775                {
776                    let mut tunn = state.tunn.lock().await;
777                    match tunn.decapsulate(Some(src.ip()), &inbuf[..n], &mut out) {
778                        TunnResult::WriteToTunnelV4(pkt, _)
779                        | TunnResult::WriteToTunnelV6(pkt, _) => {
780                            let pkt_owned = pkt.to_vec();
781                            drop(tunn);
782                            if let Err(e) = tun.send(&pkt_owned).await {
783                                tracing::warn!(error = %e, "Wintun send failed");
784                            }
785                            *state.endpoint.write() = Some(src);
786                            state.last_handshake_sec.store(
787                                SystemTime::now()
788                                    .duration_since(UNIX_EPOCH)
789                                    .unwrap_or_default()
790                                    .as_secs(),
791                                Ordering::Relaxed,
792                            );
793                            handled = true;
794                        }
795                        TunnResult::WriteToNetwork(resp) => {
796                            let resp_owned = resp.to_vec();
797                            drop(tunn);
798                            if let Err(e) = udp.send_to(&resp_owned, src).await {
799                                tracing::warn!(error = %e, "UDP reply send failed");
800                            }
801                            *state.endpoint.write() = Some(src);
802                            handled = true;
803                        }
804                        TunnResult::Done | TunnResult::Err(_) => {
805                            // Not this peer — try the next.
806                        }
807                    }
808                }
809                if handled {
810                    // Drain queued packets: boringtun buffers data
811                    // packets that arrived before the handshake
812                    // completed; passing an empty datagram releases
813                    // them one at a time until `Done`.
814                    loop {
815                        let mut drain = vec![0u8; 65536];
816                        let mut tunn = state.tunn.lock().await;
817                        match tunn.decapsulate(None, &[], &mut drain) {
818                            TunnResult::WriteToNetwork(resp) => {
819                                let resp_owned = resp.to_vec();
820                                drop(tunn);
821                                if let Err(e) = udp.send_to(&resp_owned, src).await {
822                                    tracing::warn!(error = %e, "UDP drain send failed");
823                                }
824                            }
825                            TunnResult::WriteToTunnelV4(pkt, _)
826                            | TunnResult::WriteToTunnelV6(pkt, _) => {
827                                let pkt_owned = pkt.to_vec();
828                                drop(tunn);
829                                if let Err(e) = tun.send(&pkt_owned).await {
830                                    tracing::warn!(error = %e, "Wintun drain send failed");
831                                }
832                            }
833                            TunnResult::Done | TunnResult::Err(_) => break,
834                        }
835                    }
836                    let _ = pk; // peer matched; stop scanning.
837                    break;
838                }
839            }
840        }
841    }
842
843    /// Wintun → encapsulate → UDP loop.
844    ///
845    /// Parses the destination IP from the outbound clear packet,
846    /// matches it against each peer's `allowed_ips`, encapsulates with
847    /// that peer's `Tunn`, and writes the ciphertext to UDP. If no
848    /// endpoint is known yet the packet is dropped silently — callers
849    /// typically retry at a higher layer, and `update_timers` will be
850    /// firing handshake initiations independently.
851    #[cfg(windows)]
852    async fn egress_loop(
853        tun: Arc<WindowsTun>,
854        udp: Arc<UdpSocket>,
855        peers: Arc<DashMap<[u8; 32], WindowsPeerState>>,
856    ) {
857        let mut buf = vec![0u8; 65536];
858        loop {
859            let n = match tun.recv(&mut buf).await {
860                Ok(n) => n,
861                Err(e) => {
862                    tracing::error!(error = %e, "Wintun recv failed; egress loop exiting");
863                    break;
864                }
865            };
866
867            let Some(dst_ip) = parse_dst_ip(&buf[..n]) else {
868                continue;
869            };
870
871            // Find the first peer whose allowed_ips contains dst_ip.
872            let state = peers.iter().find_map(|entry| {
873                if entry
874                    .value()
875                    .allowed_ips
876                    .iter()
877                    .any(|net| net.contains(&dst_ip))
878                {
879                    Some(entry.value().clone())
880                } else {
881                    None
882                }
883            });
884            let Some(state) = state else {
885                tracing::trace!(%dst_ip, "no matching overlay peer");
886                continue;
887            };
888
889            let endpoint = *state.endpoint.read();
890            let Some(endpoint) = endpoint else {
891                tracing::trace!(%dst_ip, "peer has no endpoint yet; dropping");
892                continue;
893            };
894
895            // `encapsulate` requires dst ≥ src.len() + 32 and ≥ 148.
896            // We size to 64 KiB + 32 to cover any legal IP packet plus
897            // the WG overhead.
898            let mut out = vec![0u8; 65536 + 32];
899            let mut tunn = state.tunn.lock().await;
900            match tunn.encapsulate(&buf[..n], &mut out) {
901                TunnResult::WriteToNetwork(pkt) => {
902                    let pkt_owned = pkt.to_vec();
903                    drop(tunn);
904                    if let Err(e) = udp.send_to(&pkt_owned, endpoint).await {
905                        tracing::warn!(error = %e, "UDP send failed");
906                    }
907                }
908                TunnResult::Done
909                | TunnResult::WriteToTunnelV4(_, _)
910                | TunnResult::WriteToTunnelV6(_, _) => {
911                    // `Done`: packet queued inside boringtun pending
912                    // handshake; nothing to emit right now.
913                    // `WriteToTunnel*`: encapsulate never produces
914                    // TUN-ward results, but we treat them as no-ops for
915                    // exhaustiveness.
916                }
917                TunnResult::Err(e) => {
918                    tracing::warn!(?e, "encapsulate error");
919                }
920            }
921        }
922    }
923
924    /// Per-peer periodic `update_timers` tick.
925    ///
926    /// Fires every 250 ms (the cadence boringtun's reference
927    /// implementation uses) to emit keepalives and re-initiate stale
928    /// handshakes. `update_timers` writes at most 148 bytes (max WG
929    /// handshake init length), so the scratch buffer is sized
930    /// accordingly.
931    #[cfg(windows)]
932    async fn timers_loop(udp: Arc<UdpSocket>, peers: Arc<DashMap<[u8; 32], WindowsPeerState>>) {
933        let mut interval = tokio::time::interval(Duration::from_millis(250));
934        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
935        loop {
936            interval.tick().await;
937            let snapshot: Vec<WindowsPeerState> = peers.iter().map(|e| e.value().clone()).collect();
938            for state in snapshot {
939                let endpoint = *state.endpoint.read();
940                let mut out = vec![0u8; 148];
941                let mut tunn = state.tunn.lock().await;
942                match tunn.update_timers(&mut out) {
943                    TunnResult::WriteToNetwork(pkt) => {
944                        let pkt_owned = pkt.to_vec();
945                        drop(tunn);
946                        if let Some(ep) = endpoint {
947                            if let Err(e) = udp.send_to(&pkt_owned, ep).await {
948                                tracing::debug!(error = %e, "timers UDP send failed");
949                            }
950                        }
951                    }
952                    TunnResult::Done
953                    | TunnResult::WriteToTunnelV4(_, _)
954                    | TunnResult::WriteToTunnelV6(_, _) => {}
955                    TunnResult::Err(e) => {
956                        tracing::debug!(?e, "update_timers error");
957                    }
958                }
959            }
960        }
961    }
962
963    /// Platform-agnostic interface IP assignment and bring-up.
964    ///
965    /// Supports both IPv4 and IPv6 overlay CIDRs. The per-OS details
966    /// (RTNETLINK on Linux, `ifconfig` / `route` shell-outs on macOS)
967    /// are hidden behind the [`InterfaceOps`] trait.
968    ///
969    /// Idempotency behavior is preserved byte-for-byte:
970    /// - `add_address` failures whose message contains `"File exists"`
971    ///   or `"EEXIST"` (Linux RTNETLINK EEXIST on re-add) are swallowed.
972    /// - `add_route_via_dev` failures whose message contains
973    ///   `"File exists"` / `"EEXIST"` (Linux) or `"already in table"`
974    ///   (BSD route) are swallowed inside the macOS implementation; the
975    ///   Linux implementation returns the error, and we swallow it here
976    ///   via the same text-match gate used by the original code.
977    async fn configure_interface(&self) -> Result<(), Box<dyn std::error::Error>> {
978        let cidr: ipnet::IpNet = self.config.overlay_cidr.parse().map_err(|e| {
979            format!(
980                "Failed to parse overlay CIDR '{}': {e}",
981                self.config.overlay_cidr
982            )
983        })?;
984        let overlay_addr = cidr.addr();
985        let prefix_len = cidr.prefix_len();
986        let net_addr = cidr.network();
987
988        let iface_ops = platform_ops();
989
990        // Assign overlay IP address — handles both IPv4 and IPv6.
991        // Preserve original idempotency: swallow EEXIST / "File exists"
992        // since a previous run may have left the address configured on
993        // a still-live TUN device.
994        if let Err(e) = iface_ops
995            .add_address(&self.interface_name, overlay_addr, prefix_len)
996            .await
997        {
998            let msg = e.to_string();
999            if !msg.contains("File exists") && !msg.contains("EEXIST") {
1000                return Err(format!("Failed to assign IP: {msg}").into());
1001            }
1002        }
1003
1004        // Bring interface up. On macOS this is redundant with the
1005        // `up` token passed into ifconfig during `add_address`, but
1006        // harmless.
1007        iface_ops
1008            .set_link_up(&self.interface_name)
1009            .await
1010            .map_err(|e| format!("Failed to bring up interface: {e}"))?;
1011
1012        // Add explicit route for the overlay subnet. Preserve original
1013        // idempotency: swallow EEXIST since the kernel may auto-install
1014        // a connected route when the address is assigned.
1015        if let Err(e) = iface_ops
1016            .add_route_via_dev(net_addr, prefix_len, &self.interface_name)
1017            .await
1018        {
1019            let msg = e.to_string();
1020            if !msg.contains("File exists")
1021                && !msg.contains("EEXIST")
1022                && !msg.contains("already in table")
1023            {
1024                return Err(format!("Failed to add route: {msg}").into());
1025            }
1026        }
1027
1028        Ok(())
1029    }
1030
1031    /// Add a peer dynamically.
1032    ///
1033    /// On Linux/macOS this writes to boringtun's UAPI socket. On
1034    /// Windows it returns an error until the per-peer
1035    /// `boringtun::noise::Tunn` map is wired (Phase D3.x).
1036    ///
1037    /// # Errors
1038    ///
1039    /// Returns an error if the key conversion or UAPI command fails on
1040    /// Linux/macOS, or always on Windows (until the packet loop lands).
1041    #[cfg_attr(windows, allow(clippy::unused_async))]
1042    pub async fn add_peer(&self, peer: &PeerInfo) -> Result<(), Box<dyn std::error::Error>> {
1043        #[cfg(not(windows))]
1044        {
1045            let sock = self.uapi_sock_path();
1046            let pub_hex = key_to_hex(&peer.public_key)?;
1047
1048            let body = format!(
1049                "public_key={}\nendpoint={}\nallowed_ip={}\npersistent_keepalive_interval={}\n",
1050                pub_hex,
1051                peer.endpoint,
1052                peer.allowed_ips,
1053                peer.persistent_keepalive_interval.as_secs(),
1054            );
1055
1056            uapi_set(&sock, &body).await?;
1057            tracing::debug!(
1058                peer_key = %peer.public_key,
1059                interface = %self.interface_name,
1060                "Added peer via UAPI"
1061            );
1062            Ok(())
1063        }
1064        #[cfg(windows)]
1065        {
1066            let priv_bytes = decode_key_b64(&self.config.private_key)?;
1067            self.add_peer_windows(&priv_bytes, peer)?;
1068            Ok(())
1069        }
1070    }
1071
1072    /// Remove a peer.
1073    ///
1074    /// On Linux/macOS this writes to boringtun's UAPI socket. On
1075    /// Windows it returns an error until the per-peer
1076    /// `boringtun::noise::Tunn` map is wired (Phase D3.x).
1077    ///
1078    /// # Errors
1079    ///
1080    /// Returns an error if the key conversion or UAPI command fails on
1081    /// Linux/macOS, or always on Windows (until the packet loop lands).
1082    #[cfg_attr(windows, allow(clippy::unused_async))]
1083    pub async fn remove_peer(&self, public_key: &str) -> Result<(), Box<dyn std::error::Error>> {
1084        #[cfg(not(windows))]
1085        {
1086            let sock = self.uapi_sock_path();
1087            let pub_hex = key_to_hex(public_key)?;
1088
1089            let body = format!("public_key={pub_hex}\nremove=true\n");
1090
1091            uapi_set(&sock, &body).await?;
1092            tracing::debug!(
1093                peer_key = %public_key,
1094                interface = %self.interface_name,
1095                "Removed peer via UAPI"
1096            );
1097            Ok(())
1098        }
1099        #[cfg(windows)]
1100        {
1101            let pk = decode_key_b64(public_key)?;
1102            self.peers.remove(&pk);
1103            tracing::debug!(
1104                peer_key = %public_key,
1105                interface = %self.interface_name,
1106                "Removed peer from Windows overlay"
1107            );
1108            Ok(())
1109        }
1110    }
1111
1112    /// Query interface status.
1113    ///
1114    /// On Linux/macOS this reads boringtun's UAPI socket. On Windows it
1115    /// returns an error until the packet loop lands (since there is no
1116    /// running `WireGuard` stack to query yet).
1117    ///
1118    /// # Errors
1119    ///
1120    /// Returns an error if the UAPI query fails on Linux/macOS, or
1121    /// always on Windows.
1122    #[cfg_attr(windows, allow(clippy::unused_async))]
1123    pub async fn status(&self) -> Result<String, Box<dyn std::error::Error>> {
1124        #[cfg(not(windows))]
1125        {
1126            let sock = self.uapi_sock_path();
1127            let response = uapi_get(&sock).await?;
1128            Ok(response)
1129        }
1130        #[cfg(windows)]
1131        {
1132            // Mimic the `wg show`-style key=value newline-delimited
1133            // dump that the Linux/macOS UAPI surface produces.
1134            use base64::{engine::general_purpose::STANDARD, Engine as _};
1135            let mut out = String::new();
1136            let priv_bytes = decode_key_b64(&self.config.private_key).unwrap_or([0u8; 32]);
1137            let _ = writeln!(out, "private_key={}", hex::encode(priv_bytes));
1138            let _ = writeln!(out, "listen_port={}", self.config.local_endpoint.port());
1139            for entry in self.peers.iter() {
1140                let pk_b64 = STANDARD.encode(entry.key());
1141                let _ = writeln!(out, "public_key={}", hex::encode(entry.key()));
1142                let _ = writeln!(out, "public_key_b64={pk_b64}");
1143                if let Some(ep) = *entry.value().endpoint.read() {
1144                    let _ = writeln!(out, "endpoint={ep}");
1145                }
1146                for net in entry.value().allowed_ips.iter() {
1147                    let _ = writeln!(out, "allowed_ip={net}");
1148                }
1149                if let Some(k) = entry.value().persistent_keepalive {
1150                    let _ = writeln!(out, "persistent_keepalive_interval={k}");
1151                }
1152                let last = entry.value().last_handshake_sec.load(Ordering::Relaxed);
1153                let _ = writeln!(out, "last_handshake_time_sec={last}");
1154            }
1155            let _ = writeln!(out, "errno=0");
1156            Ok(out)
1157        }
1158    }
1159
1160    /// Generate an overlay keypair using native Rust crypto (x25519-dalek).
1161    ///
1162    /// No external binary is required. Returns `(private_key, public_key)` in
1163    /// base64 encoding.
1164    ///
1165    /// # Errors
1166    ///
1167    /// This method currently always succeeds but returns `Result` for API consistency.
1168    #[allow(clippy::unused_async)]
1169    pub async fn generate_keys() -> Result<(String, String), Box<dyn std::error::Error>> {
1170        use base64::{engine::general_purpose::STANDARD, Engine as _};
1171        use x25519_dalek::{PublicKey, StaticSecret};
1172
1173        let secret = StaticSecret::random();
1174        let public = PublicKey::from(&secret);
1175
1176        let private_key = STANDARD.encode(secret.to_bytes());
1177        let public_key = STANDARD.encode(public.as_bytes());
1178
1179        Ok((private_key, public_key))
1180    }
1181
1182    /// Update a peer's endpoint address via UAPI.
1183    ///
1184    /// Used by NAT traversal to switch endpoints after discovery (e.g. from a
1185    /// relay to a direct reflexive address after hole punching succeeds).
1186    ///
1187    /// # Errors
1188    ///
1189    /// Returns an error if key conversion or UAPI command fails.
1190    #[cfg(feature = "nat")]
1191    #[cfg_attr(windows, allow(clippy::unused_async))]
1192    pub async fn update_peer_endpoint(
1193        &self,
1194        public_key: &str,
1195        new_endpoint: std::net::SocketAddr,
1196    ) -> Result<(), Box<dyn std::error::Error>> {
1197        #[cfg(not(windows))]
1198        {
1199            let sock = self.uapi_sock_path();
1200            let pub_hex = key_to_hex(public_key)?;
1201            let body = format!("public_key={pub_hex}\nendpoint={new_endpoint}\n");
1202            uapi_set(&sock, &body).await?;
1203            tracing::debug!(
1204                peer_key = %public_key,
1205                endpoint = %new_endpoint,
1206                "Updated peer endpoint"
1207            );
1208            Ok(())
1209        }
1210        #[cfg(windows)]
1211        {
1212            let pk = decode_key_b64(public_key)?;
1213            let entry = self
1214                .peers
1215                .get(&pk)
1216                .ok_or_else(|| format!("peer not found: {public_key}"))?;
1217            *entry.value().endpoint.write() = Some(new_endpoint);
1218            tracing::debug!(
1219                peer_key = %public_key,
1220                endpoint = %new_endpoint,
1221                "Updated peer endpoint (Windows)"
1222            );
1223            Ok(())
1224        }
1225    }
1226
1227    /// Check if a peer has completed a `WireGuard` handshake since a given timestamp.
1228    ///
1229    /// Returns `true` if `last_handshake_time_sec >= since` (and is non-zero).
1230    /// Used by NAT traversal to verify connectivity after switching endpoints.
1231    ///
1232    /// # Errors
1233    ///
1234    /// Returns an error if the UAPI query fails.
1235    #[cfg(feature = "nat")]
1236    #[cfg_attr(windows, allow(clippy::unused_async))]
1237    pub async fn check_peer_handshake(
1238        &self,
1239        public_key: &str,
1240        since: u64,
1241    ) -> Result<bool, Box<dyn std::error::Error>> {
1242        #[cfg(not(windows))]
1243        {
1244            let sock = self.uapi_sock_path();
1245            let response = uapi_get(&sock).await?;
1246            let target_hex = key_to_hex(public_key)?;
1247
1248            let mut in_target = false;
1249            for line in response.lines() {
1250                let line = line.trim();
1251                if line.is_empty() || line.starts_with("errno=") {
1252                    continue;
1253                }
1254                let Some((key, value)) = line.split_once('=') else {
1255                    continue;
1256                };
1257                match key {
1258                    "public_key" => {
1259                        in_target = value == target_hex;
1260                    }
1261                    "last_handshake_time_sec" if in_target => {
1262                        if let Ok(t) = value.parse::<u64>() {
1263                            return Ok(t > 0 && t >= since);
1264                        }
1265                    }
1266                    _ => {}
1267                }
1268            }
1269            Ok(false)
1270        }
1271        #[cfg(windows)]
1272        {
1273            let pk = decode_key_b64(public_key)?;
1274            let entry = self
1275                .peers
1276                .get(&pk)
1277                .ok_or_else(|| format!("peer not found: {public_key}"))?;
1278            let last = entry.value().last_handshake_sec.load(Ordering::Relaxed);
1279            Ok(last > 0 && last >= since)
1280        }
1281    }
1282
1283    /// Shut down the overlay transport, destroying the TUN device.
1284    ///
1285    /// On Linux/macOS this takes the boringtun [`DeviceHandle`] and
1286    /// drops it, which triggers boringtun's cleanup logic (signal exit +
1287    /// join worker threads + remove socket).
1288    ///
1289    /// On Windows it takes the Wintun adapter handle and drops it, which
1290    /// ends the session and removes the adapter from the Windows device
1291    /// tree.
1292    pub fn shutdown(&mut self) {
1293        #[cfg(not(windows))]
1294        if let Some(device) = self.device.take() {
1295            tracing::info!(
1296                interface = %self.interface_name,
1297                "Shutting down overlay transport"
1298            );
1299            // DeviceHandle::drop triggers exit + cleanup
1300            drop(device);
1301        }
1302        #[cfg(windows)]
1303        {
1304            if let Some(h) = self.ingress_task.take() {
1305                h.abort();
1306            }
1307            if let Some(h) = self.egress_task.take() {
1308                h.abort();
1309            }
1310            if let Some(h) = self.timers_task.take() {
1311                h.abort();
1312            }
1313            // Drop the UDP socket — any in-flight recv on the ingress
1314            // task will already have been aborted above, but the Arc
1315            // count must drop to zero for the kernel handle to close.
1316            self.udp.take();
1317            self.peers.clear();
1318            if let Some(dev) = self.wintun_dev.take() {
1319                tracing::info!(
1320                    interface = %self.interface_name,
1321                    "Shutting down Wintun overlay transport"
1322                );
1323                drop(dev);
1324            }
1325        }
1326    }
1327}
1328
1329impl Drop for OverlayTransport {
1330    fn drop(&mut self) {
1331        self.shutdown();
1332    }
1333}
1334
1335// ---------------------------------------------------------------------------
1336// Tests
1337// ---------------------------------------------------------------------------
1338
1339#[cfg(test)]
1340mod tests {
1341    use super::*;
1342    use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
1343    use std::time::Duration;
1344
1345    #[test]
1346    fn test_peer_info_to_config() {
1347        let peer = PeerInfo::new(
1348            "test_public_key".to_string(),
1349            SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)), 51820),
1350            "10.0.0.2/32",
1351            Duration::from_secs(25),
1352        );
1353
1354        let config = peer.to_peer_config();
1355        assert!(config.contains("PublicKey = test_public_key"));
1356        assert!(config.contains("Endpoint = 10.0.0.1:51820"));
1357    }
1358
1359    // -----------------------------------------------------------------
1360    // Windows-only helper tests
1361    // -----------------------------------------------------------------
1362
1363    #[cfg(windows)]
1364    #[test]
1365    fn test_parse_dst_ip_v4() {
1366        // Minimal IPv4 header: version=4 (top nibble), header length=5,
1367        // dst IP = 10.0.0.7 in bytes 16..20.
1368        let mut pkt = vec![0u8; 20];
1369        pkt[0] = 0x45;
1370        pkt[16..20].copy_from_slice(&[10, 0, 0, 7]);
1371        assert_eq!(
1372            super::parse_dst_ip(&pkt),
1373            Some(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 7)))
1374        );
1375    }
1376
1377    #[cfg(windows)]
1378    #[test]
1379    fn test_parse_dst_ip_v6() {
1380        // IPv6: version=6 (top nibble), dst IP = fd00::1 in bytes 24..40.
1381        let mut pkt = vec![0u8; 40];
1382        pkt[0] = 0x60;
1383        pkt[24] = 0xfd;
1384        pkt[25] = 0x00;
1385        pkt[39] = 0x01;
1386        let expected = IpAddr::V6(Ipv6Addr::new(0xfd00, 0, 0, 0, 0, 0, 0, 1));
1387        assert_eq!(super::parse_dst_ip(&pkt), Some(expected));
1388    }
1389
1390    #[cfg(windows)]
1391    #[test]
1392    fn test_parse_dst_ip_truncated_returns_none() {
1393        let pkt = vec![0x45u8; 10];
1394        assert_eq!(super::parse_dst_ip(&pkt), None);
1395        assert_eq!(super::parse_dst_ip(&[]), None);
1396    }
1397
1398    #[cfg(windows)]
1399    #[test]
1400    fn test_parse_dst_ip_unknown_version_returns_none() {
1401        let pkt = vec![0x70u8; 64];
1402        assert_eq!(super::parse_dst_ip(&pkt), None);
1403    }
1404
1405    #[cfg(windows)]
1406    #[test]
1407    fn test_decode_key_b64_roundtrip() {
1408        use base64::{engine::general_purpose::STANDARD, Engine as _};
1409        let raw = [0x42u8; 32];
1410        let b64 = STANDARD.encode(raw);
1411        let decoded = super::decode_key_b64(&b64).expect("decode");
1412        assert_eq!(decoded, raw);
1413    }
1414
1415    #[cfg(windows)]
1416    #[test]
1417    fn test_decode_key_b64_wrong_length_errors() {
1418        use base64::{engine::general_purpose::STANDARD, Engine as _};
1419        let short = STANDARD.encode([0u8; 16]);
1420        assert!(super::decode_key_b64(&short).is_err());
1421    }
1422
1423    #[test]
1424    fn test_peer_info_ipv6_to_config() {
1425        let peer = PeerInfo::new(
1426            "test_public_key_v6".to_string(),
1427            SocketAddr::new(
1428                IpAddr::V6(Ipv6Addr::new(0xfd00, 0, 0, 0, 0, 0, 0, 1)),
1429                51820,
1430            ),
1431            "fd00::2/128",
1432            Duration::from_secs(25),
1433        );
1434
1435        let config = peer.to_peer_config();
1436        assert!(config.contains("PublicKey = test_public_key_v6"));
1437        // SocketAddr for IPv6 uses bracket notation: [fd00::1]:51820
1438        assert!(
1439            config.contains("Endpoint = [fd00::1]:51820"),
1440            "IPv6 endpoint should use bracket notation, got: {config}"
1441        );
1442        assert!(config.contains("AllowedIPs = fd00::2/128"));
1443    }
1444
1445    #[test]
1446    fn test_overlay_cidr_parses_ipv4() {
1447        let cidr: ipnet::IpNet = "10.200.0.1/24".parse().unwrap();
1448        assert!(cidr.addr().is_ipv4());
1449        assert_eq!(cidr.prefix_len(), 24);
1450        assert_eq!(cidr.network().to_string(), "10.200.0.0");
1451    }
1452
1453    #[test]
1454    fn test_overlay_cidr_parses_ipv6() {
1455        let cidr: ipnet::IpNet = "fd00::1/48".parse().unwrap();
1456        assert!(cidr.addr().is_ipv6());
1457        assert_eq!(cidr.prefix_len(), 48);
1458        assert_eq!(cidr.network().to_string(), "fd00::");
1459    }
1460
1461    #[test]
1462    fn test_overlay_cidr_ipv6_host_address() {
1463        // Verify /128 single-host prefix works (used in allowed_ips)
1464        let cidr: ipnet::IpNet = "fd00::5/128".parse().unwrap();
1465        assert!(cidr.addr().is_ipv6());
1466        assert_eq!(cidr.prefix_len(), 128);
1467        assert_eq!(cidr.addr().to_string(), "fd00::5");
1468    }
1469
1470    #[test]
1471    fn test_peer_info_ipv6_allowed_ips_format() {
1472        // PeerInfo.allowed_ips is a String — verify both formats are valid
1473        let peer_v4 = PeerInfo::new(
1474            "key_v4".to_string(),
1475            SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 51820),
1476            "10.200.0.5/32",
1477            Duration::from_secs(25),
1478        );
1479        assert_eq!(peer_v4.allowed_ips, "10.200.0.5/32");
1480
1481        let peer_v6 = PeerInfo::new(
1482            "key_v6".to_string(),
1483            SocketAddr::new(
1484                IpAddr::V6(Ipv6Addr::new(0xfd00, 0, 0, 0, 0, 0, 0, 5)),
1485                51820,
1486            ),
1487            "fd00::5/128",
1488            Duration::from_secs(25),
1489        );
1490        assert_eq!(peer_v6.allowed_ips, "fd00::5/128");
1491    }
1492
1493    #[test]
1494    fn test_uapi_body_format_ipv6_peer() {
1495        // Verify that formatting an IPv6 SocketAddr for UAPI produces correct output.
1496        // WireGuard UAPI expects [ipv6]:port format for endpoints.
1497        let endpoint = SocketAddr::new(
1498            IpAddr::V6(Ipv6Addr::new(0xfd00, 0, 0, 0, 0, 0, 0, 1)),
1499            51820,
1500        );
1501        let formatted = format!("endpoint={endpoint}");
1502        assert_eq!(formatted, "endpoint=[fd00::1]:51820");
1503    }
1504
1505    #[tokio::test]
1506    async fn test_generate_keys_native() {
1507        use base64::{engine::general_purpose::STANDARD, Engine as _};
1508        use x25519_dalek::{PublicKey, StaticSecret};
1509
1510        let (private_key, public_key) = OverlayTransport::generate_keys().await.unwrap();
1511
1512        assert_eq!(
1513            private_key.len(),
1514            44,
1515            "Private key should be 44 chars base64"
1516        );
1517        assert_eq!(public_key.len(), 44, "Public key should be 44 chars base64");
1518
1519        let priv_bytes = STANDARD.decode(&private_key).unwrap();
1520        let pub_bytes = STANDARD.decode(&public_key).unwrap();
1521        assert_eq!(priv_bytes.len(), 32);
1522        assert_eq!(pub_bytes.len(), 32);
1523
1524        let secret = StaticSecret::from(<[u8; 32]>::try_from(priv_bytes.as_slice()).unwrap());
1525        let expected_public = PublicKey::from(&secret);
1526        assert_eq!(pub_bytes.as_slice(), expected_public.as_bytes());
1527    }
1528
1529    #[tokio::test]
1530    async fn test_generate_keys_unique() {
1531        let (key1, _) = OverlayTransport::generate_keys().await.unwrap();
1532        let (key2, _) = OverlayTransport::generate_keys().await.unwrap();
1533        assert_ne!(
1534            key1, key2,
1535            "Sequential key generation should produce unique keys"
1536        );
1537    }
1538
1539    #[cfg(not(windows))]
1540    #[test]
1541    fn test_key_to_hex() {
1542        use base64::{engine::general_purpose::STANDARD, Engine as _};
1543
1544        // Create a known 32-byte key and encode it as base64
1545        let key_bytes = [0xABu8; 32];
1546        let base64_key = STANDARD.encode(key_bytes);
1547        let hex_key = key_to_hex(&base64_key).unwrap();
1548
1549        assert_eq!(hex_key, "ab".repeat(32));
1550        assert_eq!(hex_key.len(), 64, "Hex key should be 64 chars");
1551    }
1552
1553    #[cfg(not(windows))]
1554    #[test]
1555    fn test_key_to_hex_invalid_length() {
1556        use base64::{engine::general_purpose::STANDARD, Engine as _};
1557
1558        let short_bytes = [0xABu8; 16];
1559        let base64_key = STANDARD.encode(short_bytes);
1560        let result = key_to_hex(&base64_key);
1561        assert!(result.is_err());
1562        assert!(result
1563            .unwrap_err()
1564            .to_string()
1565            .contains("Invalid key length"));
1566    }
1567
1568    #[tokio::test]
1569    #[ignore = "Requires root/CAP_NET_ADMIN"]
1570    async fn test_create_interface_boringtun() {
1571        let config = OverlayConfig {
1572            overlay_cidr: "10.42.0.1/24".to_string(),
1573            cluster_cidr: None,
1574            private_key: "test_key".to_string(),
1575            public_key: "test_pub".to_string(),
1576            local_endpoint: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 51820),
1577            peer_discovery_interval: Duration::from_secs(30),
1578            #[cfg(feature = "nat")]
1579            nat: crate::nat::NatConfig::default(),
1580            uapi_sock_dir: std::path::PathBuf::from("/var/run/wireguard"),
1581        };
1582
1583        // On macOS, boringtun uses "utun" and the kernel assigns utunN.
1584        // On Linux, we use a custom interface name.
1585        #[cfg(target_os = "macos")]
1586        let iface_name = "utun".to_string();
1587        #[cfg(not(target_os = "macos"))]
1588        let iface_name = "zl-bt-test0".to_string();
1589
1590        let mut transport = OverlayTransport::new(config, iface_name);
1591        let result = transport.create_interface().await;
1592
1593        match result {
1594            Ok(()) => {
1595                #[cfg(target_os = "macos")]
1596                assert!(
1597                    transport.interface_name().starts_with("utun"),
1598                    "macOS interface should be utunN, got: {}",
1599                    transport.interface_name()
1600                );
1601                transport.shutdown();
1602            }
1603            Err(e) => {
1604                let msg = e.to_string();
1605                assert!(
1606                    !msg.contains("Attribute failed policy validation"),
1607                    "create_interface should not produce kernel WireGuard errors. Got: {msg}",
1608                );
1609                assert!(
1610                    msg.contains("boringtun")
1611                        || msg.contains("CAP_NET_ADMIN")
1612                        || msg.contains("sudo"),
1613                    "Error should mention boringtun, CAP_NET_ADMIN, or sudo. Got: {msg}",
1614                );
1615            }
1616        }
1617    }
1618
1619    #[tokio::test]
1620    #[ignore = "Requires root/CAP_NET_ADMIN"]
1621    async fn test_create_interface_boringtun_ipv6() {
1622        let config = OverlayConfig {
1623            overlay_cidr: "fd00::1/48".to_string(),
1624            cluster_cidr: None,
1625            private_key: "test_key".to_string(),
1626            public_key: "test_pub".to_string(),
1627            local_endpoint: SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 51820),
1628            peer_discovery_interval: Duration::from_secs(30),
1629            #[cfg(feature = "nat")]
1630            nat: crate::nat::NatConfig::default(),
1631            uapi_sock_dir: std::path::PathBuf::from("/var/run/wireguard"),
1632        };
1633
1634        #[cfg(target_os = "macos")]
1635        let iface_name = "utun".to_string();
1636        #[cfg(not(target_os = "macos"))]
1637        let iface_name = "zl-bt6-test0".to_string();
1638
1639        let mut transport = OverlayTransport::new(config, iface_name);
1640        let result = transport.create_interface().await;
1641
1642        match result {
1643            Ok(()) => {
1644                #[cfg(target_os = "macos")]
1645                assert!(
1646                    transport.interface_name().starts_with("utun"),
1647                    "macOS interface should be utunN, got: {}",
1648                    transport.interface_name()
1649                );
1650                transport.shutdown();
1651            }
1652            Err(e) => {
1653                let msg = e.to_string();
1654                assert!(
1655                    !msg.contains("Attribute failed policy validation"),
1656                    "create_interface should not produce kernel WireGuard errors. Got: {msg}",
1657                );
1658                assert!(
1659                    msg.contains("boringtun")
1660                        || msg.contains("CAP_NET_ADMIN")
1661                        || msg.contains("sudo"),
1662                    "Error should mention boringtun, CAP_NET_ADMIN, or sudo. Got: {msg}",
1663                );
1664            }
1665        }
1666    }
1667}