1use crate::interface::platform_ops;
17use crate::{config::OverlayConfig, PeerInfo};
18#[cfg(not(windows))]
19use boringtun::device::{DeviceConfig, DeviceHandle};
20use std::fmt::Write;
21#[cfg(not(windows))]
22use tokio::io::{AsyncReadExt, AsyncWriteExt};
23#[cfg(not(windows))]
24use tokio::net::UnixStream;
25
26#[cfg(windows)]
27use crate::tun::WindowsTun;
28#[cfg(windows)]
29use boringtun::noise::{Tunn, TunnResult};
30#[cfg(windows)]
31use dashmap::DashMap;
32#[cfg(windows)]
33use parking_lot::RwLock;
34#[cfg(windows)]
35use std::net::{IpAddr, SocketAddr};
36#[cfg(windows)]
37use std::sync::atomic::{AtomicU64, Ordering};
38#[cfg(windows)]
39use std::sync::Arc;
40#[cfg(windows)]
41use std::time::{Duration, SystemTime, UNIX_EPOCH};
42#[cfg(windows)]
43use tokio::net::UdpSocket;
44#[cfg(windows)]
45use tokio::sync::Mutex as AsyncMutex;
46#[cfg(windows)]
47use tokio::task::JoinHandle;
48
49#[cfg(not(windows))]
56fn key_to_hex(base64_key: &str) -> Result<String, Box<dyn std::error::Error>> {
57 use base64::{engine::general_purpose::STANDARD, Engine as _};
58 let bytes = STANDARD.decode(base64_key)?;
59 if bytes.len() != 32 {
60 return Err(format!("Invalid key length: expected 32 bytes, got {}", bytes.len()).into());
61 }
62 Ok(hex::encode(bytes))
63}
64
65#[cfg(not(windows))]
70async fn uapi_set(sock_path: &str, body: &str) -> Result<(), Box<dyn std::error::Error>> {
71 let mut stream = UnixStream::connect(sock_path).await?;
72 let msg = format!("set=1\n{body}\n");
73 stream.write_all(msg.as_bytes()).await?;
74 stream.shutdown().await?;
75 let mut response = String::new();
76 stream.read_to_string(&mut response).await?;
77 if response.contains("errno=0") {
78 Ok(())
79 } else {
80 Err(format!("UAPI set failed: {}", response.trim()).into())
81 }
82}
83
84#[cfg(not(windows))]
86async fn uapi_get(sock_path: &str) -> Result<String, Box<dyn std::error::Error>> {
87 let mut stream = UnixStream::connect(sock_path).await?;
88 stream.write_all(b"get=1\n\n").await?;
89 stream.shutdown().await?;
90 let mut response = String::new();
91 stream.read_to_string(&mut response).await?;
92 Ok(response)
93}
94
95#[cfg(windows)]
109#[derive(Clone)]
110struct WindowsPeerState {
111 tunn: Arc<AsyncMutex<Tunn>>,
112 endpoint: Arc<RwLock<Option<SocketAddr>>>,
113 last_handshake_sec: Arc<AtomicU64>,
114 allowed_ips: Arc<Vec<ipnet::IpNet>>,
115 persistent_keepalive: Option<u16>,
116}
117
118#[cfg(windows)]
123fn decode_key_b64(b64: &str) -> Result<[u8; 32], Box<dyn std::error::Error>> {
124 use base64::{engine::general_purpose::STANDARD, Engine as _};
125 let bytes = STANDARD.decode(b64)?;
126 if bytes.len() != 32 {
127 return Err(format!(
128 "invalid WireGuard key length: expected 32 bytes, got {}",
129 bytes.len()
130 )
131 .into());
132 }
133 let mut out = [0u8; 32];
134 out.copy_from_slice(&bytes);
135 Ok(out)
136}
137
138#[cfg(windows)]
144fn parse_dst_ip(packet: &[u8]) -> Option<IpAddr> {
145 if packet.is_empty() {
146 return None;
147 }
148 match packet[0] >> 4 {
149 4 if packet.len() >= 20 => {
150 let b: [u8; 4] = packet[16..20].try_into().ok()?;
151 Some(IpAddr::from(b))
152 }
153 6 if packet.len() >= 40 => {
154 let b: [u8; 16] = packet[24..40].try_into().ok()?;
155 Some(IpAddr::from(b))
156 }
157 _ => None,
158 }
159}
160
161#[cfg(windows)]
168fn build_tunn(
169 our_priv: &[u8; 32],
170 peer_pub: &[u8; 32],
171 preshared: Option<[u8; 32]>,
172 persistent_keepalive: Option<u16>,
173) -> Tunn {
174 let priv_secret = boringtun::x25519::StaticSecret::from(*our_priv);
175 let peer_pub_key = boringtun::x25519::PublicKey::from(*peer_pub);
176 Tunn::new(
178 priv_secret,
179 peer_pub_key,
180 preshared,
181 persistent_keepalive,
182 0,
183 None,
184 )
185}
186
187pub struct OverlayTransport {
201 config: OverlayConfig,
202 interface_name: String,
203 #[cfg(not(windows))]
206 device: Option<DeviceHandle>,
207 #[cfg(windows)]
214 wintun_dev: Option<Arc<WindowsTun>>,
215 #[cfg(windows)]
219 udp: Option<Arc<UdpSocket>>,
220 #[cfg(windows)]
226 peers: Arc<DashMap<[u8; 32], WindowsPeerState>>,
227 #[cfg(windows)]
229 ingress_task: Option<JoinHandle<()>>,
230 #[cfg(windows)]
232 egress_task: Option<JoinHandle<()>>,
233 #[cfg(windows)]
236 timers_task: Option<JoinHandle<()>>,
237}
238
239impl OverlayTransport {
240 #[must_use]
242 pub fn new(config: OverlayConfig, interface_name: String) -> Self {
243 Self {
244 config,
245 interface_name,
246 #[cfg(not(windows))]
247 device: None,
248 #[cfg(windows)]
249 wintun_dev: None,
250 #[cfg(windows)]
251 udp: None,
252 #[cfg(windows)]
253 peers: Arc::new(DashMap::new()),
254 #[cfg(windows)]
255 ingress_task: None,
256 #[cfg(windows)]
257 egress_task: None,
258 #[cfg(windows)]
259 timers_task: None,
260 }
261 }
262
263 #[must_use]
269 pub fn interface_name(&self) -> &str {
270 &self.interface_name
271 }
272
273 #[cfg(not(windows))]
282 fn uapi_sock_path(&self) -> String {
283 self.config
284 .uapi_sock_dir
285 .join(format!("{}.sock", self.interface_name))
286 .to_string_lossy()
287 .into_owned()
288 }
289
290 pub async fn create_interface(&mut self) -> Result<(), Box<dyn std::error::Error>> {
311 #[cfg(windows)]
312 {
313 self.create_interface_windows().await
314 }
315 #[cfg(not(windows))]
316 {
317 self.create_interface_unix().await
318 }
319 }
320
321 #[cfg(not(windows))]
323 #[allow(clippy::too_many_lines)]
324 async fn create_interface_unix(&mut self) -> Result<(), Box<dyn std::error::Error>> {
325 #[cfg(not(target_os = "macos"))]
328 if self.interface_name.len() > 15 {
329 return Err(format!(
330 "Interface name '{}' exceeds 15 character limit",
331 self.interface_name
332 )
333 .into());
334 }
335
336 tokio::fs::create_dir_all(&self.config.uapi_sock_dir).await?;
339
340 #[cfg(target_os = "linux")]
352 {
353 let iface_ops = platform_ops();
354 match iface_ops.link_exists(&self.interface_name).await {
355 Ok(true) => {
356 return Err(format!(
357 "Kernel link '{}' already exists; refusing to delete it. \
358 If this is a stale interface from a previous crash, restart \
359 the daemon (its boot-time sweep clears stale zl-* / veth-* \
360 links). If this fires during normal operation, there is a \
361 duplicate-name bug somewhere in the overlay setup path.",
362 self.interface_name
363 )
364 .into());
365 }
366 Ok(false) => {}
367 Err(e) => {
368 tracing::warn!(
369 interface = %self.interface_name,
370 error = %e,
371 "failed to probe for existing overlay interface; proceeding"
372 );
373 }
374 }
375 }
376
377 let sock_path = self
379 .config
380 .uapi_sock_dir
381 .join(format!("{}.sock", self.interface_name));
382 if tokio::fs::try_exists(&sock_path).await.unwrap_or(false) {
383 tracing::warn!(path = %sock_path.display(), "removing stale UAPI socket");
384 let _ = tokio::fs::remove_file(&sock_path).await;
385 }
386
387 #[cfg(target_os = "macos")]
390 let existing_socks = {
391 let mut set = std::collections::HashSet::new();
392 if let Ok(mut entries) = tokio::fs::read_dir(&self.config.uapi_sock_dir).await {
393 while let Ok(Some(entry)) = entries.next_entry().await {
394 set.insert(entry.file_name().to_string_lossy().to_string());
395 }
396 }
397 set
398 };
399
400 #[cfg(target_os = "macos")]
402 let name = "utun".to_string();
403 #[cfg(not(target_os = "macos"))]
404 let name = self.interface_name.clone();
405
406 let cfg = DeviceConfig {
407 n_threads: 2,
408 use_connected_socket: true,
409 #[cfg(target_os = "linux")]
410 use_multi_queue: false,
411 #[cfg(target_os = "linux")]
412 uapi_fd: -1,
413 };
414
415 let iface_name_for_err = self.interface_name.clone();
416
417 let handle = tokio::task::spawn_blocking(move || DeviceHandle::new(&name, cfg))
420 .await
421 .map_err(|e| format!("spawn_blocking join error: {e}"))?
422 .map_err(|e| {
423 #[cfg(target_os = "macos")]
424 let hint = "Requires root. Run with sudo or install as a system service (zlayer daemon install).";
425 #[cfg(not(target_os = "macos"))]
426 let hint = "Ensure CAP_NET_ADMIN capability is available.";
427 format!("Failed to create boringtun device '{iface_name_for_err}': {e}. {hint}")
428 })?;
429
430 self.device = Some(handle);
431
432 #[cfg(target_os = "macos")]
435 {
436 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
438 if let Ok(mut entries) = tokio::fs::read_dir(&self.config.uapi_sock_dir).await {
439 while let Ok(Some(entry)) = entries.next_entry().await {
440 let fname = entry.file_name().to_string_lossy().to_string();
441 if !existing_socks.contains(&fname)
442 && fname.starts_with("utun")
443 && std::path::Path::new(&fname)
444 .extension()
445 .is_some_and(|ext| ext.eq_ignore_ascii_case("sock"))
446 {
447 self.interface_name = fname.trim_end_matches(".sock").to_string();
448 break;
449 }
450 }
451 }
452 }
453
454 tracing::info!(
455 interface = %self.interface_name,
456 "Created boringtun overlay transport"
457 );
458 Ok(())
459 }
460
461 #[cfg(windows)]
468 async fn create_interface_windows(&mut self) -> Result<(), Box<dyn std::error::Error>> {
469 if self.interface_name.len() > 64 {
472 return Err(format!(
473 "Wintun adapter name '{}' exceeds 64 character limit",
474 self.interface_name
475 )
476 .into());
477 }
478
479 let iface_name = self.interface_name.clone();
480 let mtu = 1420; let dev = tokio::task::spawn_blocking(move || WindowsTun::new(&iface_name, mtu))
486 .await
487 .map_err(|e| format!("spawn_blocking join error: {e}"))??;
488
489 tracing::info!(
490 interface = %self.interface_name,
491 luid = dev.luid_value(),
492 "Created Wintun overlay adapter"
493 );
494
495 self.wintun_dev = Some(Arc::new(dev));
496 Ok(())
497 }
498
499 pub async fn configure(
517 &mut self,
518 peers: &[PeerInfo],
519 ) -> Result<(), Box<dyn std::error::Error>> {
520 #[cfg(not(windows))]
521 {
522 let sock = self.uapi_sock_path();
523
524 let private_key_hex = key_to_hex(&self.config.private_key)?;
526 let mut body = format!(
527 "private_key={}\nlisten_port={}\n",
528 private_key_hex,
529 self.config.local_endpoint.port(),
530 );
531
532 for peer in peers {
533 let pub_hex = key_to_hex(&peer.public_key)?;
534 let _ = writeln!(body, "public_key={pub_hex}");
535 let _ = writeln!(body, "endpoint={}", peer.endpoint);
536 let _ = writeln!(body, "allowed_ip={}", peer.allowed_ips);
537 let _ = writeln!(
538 body,
539 "persistent_keepalive_interval={}",
540 peer.persistent_keepalive_interval.as_secs()
541 );
542 }
543
544 uapi_set(&sock, &body).await?;
545 tracing::debug!(interface = %self.interface_name, "Applied UAPI configuration");
546
547 self.configure_interface().await?;
549
550 tracing::info!(interface = %self.interface_name, "Overlay transport configured and up");
551 Ok(())
552 }
553
554 #[cfg(windows)]
555 {
556 self.configure_windows(peers).await
557 }
558 }
559
560 #[cfg(windows)]
568 async fn configure_windows(
569 &mut self,
570 peers: &[PeerInfo],
571 ) -> Result<(), Box<dyn std::error::Error>> {
572 self.configure_interface().await?;
574
575 if let Some(ref cluster_cidr_str) = self.config.cluster_cidr {
587 match cluster_cidr_str.parse::<ipnet::IpNet>() {
588 Ok(net) => {
589 use crate::interface::windows::WindowsIpHelperOps;
590 use crate::interface::InterfaceOps;
591 let ops = WindowsIpHelperOps::new();
592 let adapter_name = self.interface_name.clone();
593 match ops
594 .add_route_via_dev(net.network(), net.prefix_len(), &adapter_name)
595 .await
596 {
597 Ok(()) => {
598 tracing::info!(
599 cidr = %net,
600 adapter = %adapter_name,
601 "Installed cluster-CIDR host route via Wintun adapter"
602 );
603 }
604 Err(e) => {
605 tracing::warn!(
606 error = %e,
607 cidr = %net,
608 adapter = %adapter_name,
609 "Failed to install cluster-CIDR host route via Wintun (overlay traffic may not route across nodes); route may already exist"
610 );
611 }
612 }
613 }
614 Err(e) => {
615 tracing::warn!(
616 error = %e,
617 cidr = %cluster_cidr_str,
618 "cluster_cidr unparseable; skipping Wintun route install"
619 );
620 }
621 }
622 } else {
623 tracing::warn!(
624 "cluster_cidr not set in OverlayConfig; skipping Wintun route install (cross-node overlay traffic may not route)"
625 );
626 }
627
628 let tun = self
630 .wintun_dev
631 .as_ref()
632 .ok_or("Wintun adapter not initialized — call create_interface first")?
633 .clone();
634
635 let listen = self.config.local_endpoint;
638 let udp = Arc::new(
639 UdpSocket::bind(listen)
640 .await
641 .map_err(|e| format!("failed to bind WireGuard UDP socket on {listen}: {e}"))?,
642 );
643 self.udp = Some(udp.clone());
644
645 let priv_bytes = decode_key_b64(&self.config.private_key)?;
647 for peer in peers {
648 self.add_peer_windows(&priv_bytes, peer)?;
649 }
650
651 let peers_ingress = self.peers.clone();
655 let udp_ingress = udp.clone();
656 let tun_ingress = tun.clone();
657 self.ingress_task = Some(tokio::spawn(async move {
658 Self::ingress_loop(udp_ingress, tun_ingress, peers_ingress).await;
659 }));
660
661 let peers_egress = self.peers.clone();
662 let udp_egress = udp.clone();
663 let tun_egress = tun.clone();
664 self.egress_task = Some(tokio::spawn(async move {
665 Self::egress_loop(tun_egress, udp_egress, peers_egress).await;
666 }));
667
668 let peers_timers = self.peers.clone();
669 let udp_timers = udp.clone();
670 self.timers_task = Some(tokio::spawn(async move {
671 Self::timers_loop(udp_timers, peers_timers).await;
672 }));
673
674 tracing::info!(
675 interface = %self.interface_name,
676 peer_count = peers.len(),
677 listen = %listen,
678 "Windows overlay transport configured (Tunn pipeline online)"
679 );
680 Ok(())
681 }
682
683 #[cfg(windows)]
690 fn add_peer_windows(
691 &self,
692 our_priv: &[u8; 32],
693 peer: &PeerInfo,
694 ) -> Result<(), Box<dyn std::error::Error>> {
695 let peer_pub = decode_key_b64(&peer.public_key)?;
696 let allowed: ipnet::IpNet = peer
697 .allowed_ips
698 .parse()
699 .map_err(|e| format!("invalid allowed_ips '{}': {e}", peer.allowed_ips))?;
700 let keepalive = {
703 let secs = peer.persistent_keepalive_interval.as_secs();
704 if secs == 0 {
705 None
706 } else {
707 u16::try_from(secs).ok()
708 }
709 };
710
711 let tunn = build_tunn(our_priv, &peer_pub, None, keepalive);
712 let state = WindowsPeerState {
713 tunn: Arc::new(AsyncMutex::new(tunn)),
714 endpoint: Arc::new(RwLock::new(Some(peer.endpoint))),
715 last_handshake_sec: Arc::new(AtomicU64::new(0)),
716 allowed_ips: Arc::new(vec![allowed]),
717 persistent_keepalive: keepalive,
718 };
719 self.peers.insert(peer_pub, state);
720 tracing::debug!(
721 peer_key = %peer.public_key,
722 endpoint = %peer.endpoint,
723 allowed = %peer.allowed_ips,
724 "Added peer to Windows overlay peer map"
725 );
726 Ok(())
727 }
728
729 #[cfg(windows)]
749 async fn ingress_loop(
750 udp: Arc<UdpSocket>,
751 tun: Arc<WindowsTun>,
752 peers: Arc<DashMap<[u8; 32], WindowsPeerState>>,
753 ) {
754 let mut inbuf = vec![0u8; 65536];
756 loop {
757 let (n, src) = match udp.recv_from(&mut inbuf).await {
758 Ok(p) => p,
759 Err(e) => {
760 tracing::error!(error = %e, "UDP recv failed; ingress loop exiting");
761 break;
762 }
763 };
764
765 let snapshot: Vec<([u8; 32], WindowsPeerState)> = peers
768 .iter()
769 .map(|e| (*e.key(), e.value().clone()))
770 .collect();
771
772 for (pk, state) in snapshot {
773 let mut out = vec![0u8; 65536];
774 let mut handled = false;
775 {
776 let mut tunn = state.tunn.lock().await;
777 match tunn.decapsulate(Some(src.ip()), &inbuf[..n], &mut out) {
778 TunnResult::WriteToTunnelV4(pkt, _)
779 | TunnResult::WriteToTunnelV6(pkt, _) => {
780 let pkt_owned = pkt.to_vec();
781 drop(tunn);
782 if let Err(e) = tun.send(&pkt_owned).await {
783 tracing::warn!(error = %e, "Wintun send failed");
784 }
785 *state.endpoint.write() = Some(src);
786 state.last_handshake_sec.store(
787 SystemTime::now()
788 .duration_since(UNIX_EPOCH)
789 .unwrap_or_default()
790 .as_secs(),
791 Ordering::Relaxed,
792 );
793 handled = true;
794 }
795 TunnResult::WriteToNetwork(resp) => {
796 let resp_owned = resp.to_vec();
797 drop(tunn);
798 if let Err(e) = udp.send_to(&resp_owned, src).await {
799 tracing::warn!(error = %e, "UDP reply send failed");
800 }
801 *state.endpoint.write() = Some(src);
802 handled = true;
803 }
804 TunnResult::Done | TunnResult::Err(_) => {
805 }
807 }
808 }
809 if handled {
810 loop {
815 let mut drain = vec![0u8; 65536];
816 let mut tunn = state.tunn.lock().await;
817 match tunn.decapsulate(None, &[], &mut drain) {
818 TunnResult::WriteToNetwork(resp) => {
819 let resp_owned = resp.to_vec();
820 drop(tunn);
821 if let Err(e) = udp.send_to(&resp_owned, src).await {
822 tracing::warn!(error = %e, "UDP drain send failed");
823 }
824 }
825 TunnResult::WriteToTunnelV4(pkt, _)
826 | TunnResult::WriteToTunnelV6(pkt, _) => {
827 let pkt_owned = pkt.to_vec();
828 drop(tunn);
829 if let Err(e) = tun.send(&pkt_owned).await {
830 tracing::warn!(error = %e, "Wintun drain send failed");
831 }
832 }
833 TunnResult::Done | TunnResult::Err(_) => break,
834 }
835 }
836 let _ = pk; break;
838 }
839 }
840 }
841 }
842
843 #[cfg(windows)]
852 async fn egress_loop(
853 tun: Arc<WindowsTun>,
854 udp: Arc<UdpSocket>,
855 peers: Arc<DashMap<[u8; 32], WindowsPeerState>>,
856 ) {
857 let mut buf = vec![0u8; 65536];
858 loop {
859 let n = match tun.recv(&mut buf).await {
860 Ok(n) => n,
861 Err(e) => {
862 tracing::error!(error = %e, "Wintun recv failed; egress loop exiting");
863 break;
864 }
865 };
866
867 let Some(dst_ip) = parse_dst_ip(&buf[..n]) else {
868 continue;
869 };
870
871 let state = peers.iter().find_map(|entry| {
873 if entry
874 .value()
875 .allowed_ips
876 .iter()
877 .any(|net| net.contains(&dst_ip))
878 {
879 Some(entry.value().clone())
880 } else {
881 None
882 }
883 });
884 let Some(state) = state else {
885 tracing::trace!(%dst_ip, "no matching overlay peer");
886 continue;
887 };
888
889 let endpoint = *state.endpoint.read();
890 let Some(endpoint) = endpoint else {
891 tracing::trace!(%dst_ip, "peer has no endpoint yet; dropping");
892 continue;
893 };
894
895 let mut out = vec![0u8; 65536 + 32];
899 let mut tunn = state.tunn.lock().await;
900 match tunn.encapsulate(&buf[..n], &mut out) {
901 TunnResult::WriteToNetwork(pkt) => {
902 let pkt_owned = pkt.to_vec();
903 drop(tunn);
904 if let Err(e) = udp.send_to(&pkt_owned, endpoint).await {
905 tracing::warn!(error = %e, "UDP send failed");
906 }
907 }
908 TunnResult::Done
909 | TunnResult::WriteToTunnelV4(_, _)
910 | TunnResult::WriteToTunnelV6(_, _) => {
911 }
917 TunnResult::Err(e) => {
918 tracing::warn!(?e, "encapsulate error");
919 }
920 }
921 }
922 }
923
924 #[cfg(windows)]
932 async fn timers_loop(udp: Arc<UdpSocket>, peers: Arc<DashMap<[u8; 32], WindowsPeerState>>) {
933 let mut interval = tokio::time::interval(Duration::from_millis(250));
934 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
935 loop {
936 interval.tick().await;
937 let snapshot: Vec<WindowsPeerState> = peers.iter().map(|e| e.value().clone()).collect();
938 for state in snapshot {
939 let endpoint = *state.endpoint.read();
940 let mut out = vec![0u8; 148];
941 let mut tunn = state.tunn.lock().await;
942 match tunn.update_timers(&mut out) {
943 TunnResult::WriteToNetwork(pkt) => {
944 let pkt_owned = pkt.to_vec();
945 drop(tunn);
946 if let Some(ep) = endpoint {
947 if let Err(e) = udp.send_to(&pkt_owned, ep).await {
948 tracing::debug!(error = %e, "timers UDP send failed");
949 }
950 }
951 }
952 TunnResult::Done
953 | TunnResult::WriteToTunnelV4(_, _)
954 | TunnResult::WriteToTunnelV6(_, _) => {}
955 TunnResult::Err(e) => {
956 tracing::debug!(?e, "update_timers error");
957 }
958 }
959 }
960 }
961 }
962
963 async fn configure_interface(&self) -> Result<(), Box<dyn std::error::Error>> {
978 let cidr: ipnet::IpNet = self.config.overlay_cidr.parse().map_err(|e| {
979 format!(
980 "Failed to parse overlay CIDR '{}': {e}",
981 self.config.overlay_cidr
982 )
983 })?;
984 let overlay_addr = cidr.addr();
985 let prefix_len = cidr.prefix_len();
986 let net_addr = cidr.network();
987
988 let iface_ops = platform_ops();
989
990 if let Err(e) = iface_ops
995 .add_address(&self.interface_name, overlay_addr, prefix_len)
996 .await
997 {
998 let msg = e.to_string();
999 if !msg.contains("File exists") && !msg.contains("EEXIST") {
1000 return Err(format!("Failed to assign IP: {msg}").into());
1001 }
1002 }
1003
1004 iface_ops
1008 .set_link_up(&self.interface_name)
1009 .await
1010 .map_err(|e| format!("Failed to bring up interface: {e}"))?;
1011
1012 if let Err(e) = iface_ops
1016 .add_route_via_dev(net_addr, prefix_len, &self.interface_name)
1017 .await
1018 {
1019 let msg = e.to_string();
1020 if !msg.contains("File exists")
1021 && !msg.contains("EEXIST")
1022 && !msg.contains("already in table")
1023 {
1024 return Err(format!("Failed to add route: {msg}").into());
1025 }
1026 }
1027
1028 Ok(())
1029 }
1030
1031 #[cfg_attr(windows, allow(clippy::unused_async))]
1042 pub async fn add_peer(&self, peer: &PeerInfo) -> Result<(), Box<dyn std::error::Error>> {
1043 #[cfg(not(windows))]
1044 {
1045 let sock = self.uapi_sock_path();
1046 let pub_hex = key_to_hex(&peer.public_key)?;
1047
1048 let body = format!(
1049 "public_key={}\nendpoint={}\nallowed_ip={}\npersistent_keepalive_interval={}\n",
1050 pub_hex,
1051 peer.endpoint,
1052 peer.allowed_ips,
1053 peer.persistent_keepalive_interval.as_secs(),
1054 );
1055
1056 uapi_set(&sock, &body).await?;
1057 tracing::debug!(
1058 peer_key = %peer.public_key,
1059 interface = %self.interface_name,
1060 "Added peer via UAPI"
1061 );
1062 Ok(())
1063 }
1064 #[cfg(windows)]
1065 {
1066 let priv_bytes = decode_key_b64(&self.config.private_key)?;
1067 self.add_peer_windows(&priv_bytes, peer)?;
1068 Ok(())
1069 }
1070 }
1071
1072 #[cfg_attr(windows, allow(clippy::unused_async))]
1083 pub async fn remove_peer(&self, public_key: &str) -> Result<(), Box<dyn std::error::Error>> {
1084 #[cfg(not(windows))]
1085 {
1086 let sock = self.uapi_sock_path();
1087 let pub_hex = key_to_hex(public_key)?;
1088
1089 let body = format!("public_key={pub_hex}\nremove=true\n");
1090
1091 uapi_set(&sock, &body).await?;
1092 tracing::debug!(
1093 peer_key = %public_key,
1094 interface = %self.interface_name,
1095 "Removed peer via UAPI"
1096 );
1097 Ok(())
1098 }
1099 #[cfg(windows)]
1100 {
1101 let pk = decode_key_b64(public_key)?;
1102 self.peers.remove(&pk);
1103 tracing::debug!(
1104 peer_key = %public_key,
1105 interface = %self.interface_name,
1106 "Removed peer from Windows overlay"
1107 );
1108 Ok(())
1109 }
1110 }
1111
1112 #[cfg_attr(windows, allow(clippy::unused_async))]
1123 pub async fn status(&self) -> Result<String, Box<dyn std::error::Error>> {
1124 #[cfg(not(windows))]
1125 {
1126 let sock = self.uapi_sock_path();
1127 let response = uapi_get(&sock).await?;
1128 Ok(response)
1129 }
1130 #[cfg(windows)]
1131 {
1132 use base64::{engine::general_purpose::STANDARD, Engine as _};
1135 let mut out = String::new();
1136 let priv_bytes = decode_key_b64(&self.config.private_key).unwrap_or([0u8; 32]);
1137 let _ = writeln!(out, "private_key={}", hex::encode(priv_bytes));
1138 let _ = writeln!(out, "listen_port={}", self.config.local_endpoint.port());
1139 for entry in self.peers.iter() {
1140 let pk_b64 = STANDARD.encode(entry.key());
1141 let _ = writeln!(out, "public_key={}", hex::encode(entry.key()));
1142 let _ = writeln!(out, "public_key_b64={pk_b64}");
1143 if let Some(ep) = *entry.value().endpoint.read() {
1144 let _ = writeln!(out, "endpoint={ep}");
1145 }
1146 for net in entry.value().allowed_ips.iter() {
1147 let _ = writeln!(out, "allowed_ip={net}");
1148 }
1149 if let Some(k) = entry.value().persistent_keepalive {
1150 let _ = writeln!(out, "persistent_keepalive_interval={k}");
1151 }
1152 let last = entry.value().last_handshake_sec.load(Ordering::Relaxed);
1153 let _ = writeln!(out, "last_handshake_time_sec={last}");
1154 }
1155 let _ = writeln!(out, "errno=0");
1156 Ok(out)
1157 }
1158 }
1159
1160 #[allow(clippy::unused_async)]
1169 pub async fn generate_keys() -> Result<(String, String), Box<dyn std::error::Error>> {
1170 use base64::{engine::general_purpose::STANDARD, Engine as _};
1171 use x25519_dalek::{PublicKey, StaticSecret};
1172
1173 let secret = StaticSecret::random();
1174 let public = PublicKey::from(&secret);
1175
1176 let private_key = STANDARD.encode(secret.to_bytes());
1177 let public_key = STANDARD.encode(public.as_bytes());
1178
1179 Ok((private_key, public_key))
1180 }
1181
1182 #[cfg(feature = "nat")]
1191 #[cfg_attr(windows, allow(clippy::unused_async))]
1192 pub async fn update_peer_endpoint(
1193 &self,
1194 public_key: &str,
1195 new_endpoint: std::net::SocketAddr,
1196 ) -> Result<(), Box<dyn std::error::Error>> {
1197 #[cfg(not(windows))]
1198 {
1199 let sock = self.uapi_sock_path();
1200 let pub_hex = key_to_hex(public_key)?;
1201 let body = format!("public_key={pub_hex}\nendpoint={new_endpoint}\n");
1202 uapi_set(&sock, &body).await?;
1203 tracing::debug!(
1204 peer_key = %public_key,
1205 endpoint = %new_endpoint,
1206 "Updated peer endpoint"
1207 );
1208 Ok(())
1209 }
1210 #[cfg(windows)]
1211 {
1212 let pk = decode_key_b64(public_key)?;
1213 let entry = self
1214 .peers
1215 .get(&pk)
1216 .ok_or_else(|| format!("peer not found: {public_key}"))?;
1217 *entry.value().endpoint.write() = Some(new_endpoint);
1218 tracing::debug!(
1219 peer_key = %public_key,
1220 endpoint = %new_endpoint,
1221 "Updated peer endpoint (Windows)"
1222 );
1223 Ok(())
1224 }
1225 }
1226
1227 #[cfg(feature = "nat")]
1236 #[cfg_attr(windows, allow(clippy::unused_async))]
1237 pub async fn check_peer_handshake(
1238 &self,
1239 public_key: &str,
1240 since: u64,
1241 ) -> Result<bool, Box<dyn std::error::Error>> {
1242 #[cfg(not(windows))]
1243 {
1244 let sock = self.uapi_sock_path();
1245 let response = uapi_get(&sock).await?;
1246 let target_hex = key_to_hex(public_key)?;
1247
1248 let mut in_target = false;
1249 for line in response.lines() {
1250 let line = line.trim();
1251 if line.is_empty() || line.starts_with("errno=") {
1252 continue;
1253 }
1254 let Some((key, value)) = line.split_once('=') else {
1255 continue;
1256 };
1257 match key {
1258 "public_key" => {
1259 in_target = value == target_hex;
1260 }
1261 "last_handshake_time_sec" if in_target => {
1262 if let Ok(t) = value.parse::<u64>() {
1263 return Ok(t > 0 && t >= since);
1264 }
1265 }
1266 _ => {}
1267 }
1268 }
1269 Ok(false)
1270 }
1271 #[cfg(windows)]
1272 {
1273 let pk = decode_key_b64(public_key)?;
1274 let entry = self
1275 .peers
1276 .get(&pk)
1277 .ok_or_else(|| format!("peer not found: {public_key}"))?;
1278 let last = entry.value().last_handshake_sec.load(Ordering::Relaxed);
1279 Ok(last > 0 && last >= since)
1280 }
1281 }
1282
1283 pub fn shutdown(&mut self) {
1293 #[cfg(not(windows))]
1294 if let Some(device) = self.device.take() {
1295 tracing::info!(
1296 interface = %self.interface_name,
1297 "Shutting down overlay transport"
1298 );
1299 drop(device);
1301 }
1302 #[cfg(windows)]
1303 {
1304 if let Some(h) = self.ingress_task.take() {
1305 h.abort();
1306 }
1307 if let Some(h) = self.egress_task.take() {
1308 h.abort();
1309 }
1310 if let Some(h) = self.timers_task.take() {
1311 h.abort();
1312 }
1313 self.udp.take();
1317 self.peers.clear();
1318 if let Some(dev) = self.wintun_dev.take() {
1319 tracing::info!(
1320 interface = %self.interface_name,
1321 "Shutting down Wintun overlay transport"
1322 );
1323 drop(dev);
1324 }
1325 }
1326 }
1327}
1328
1329impl Drop for OverlayTransport {
1330 fn drop(&mut self) {
1331 self.shutdown();
1332 }
1333}
1334
1335#[cfg(test)]
1340mod tests {
1341 use super::*;
1342 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
1343 use std::time::Duration;
1344
1345 #[test]
1346 fn test_peer_info_to_config() {
1347 let peer = PeerInfo::new(
1348 "test_public_key".to_string(),
1349 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)), 51820),
1350 "10.0.0.2/32",
1351 Duration::from_secs(25),
1352 );
1353
1354 let config = peer.to_peer_config();
1355 assert!(config.contains("PublicKey = test_public_key"));
1356 assert!(config.contains("Endpoint = 10.0.0.1:51820"));
1357 }
1358
1359 #[cfg(windows)]
1364 #[test]
1365 fn test_parse_dst_ip_v4() {
1366 let mut pkt = vec![0u8; 20];
1369 pkt[0] = 0x45;
1370 pkt[16..20].copy_from_slice(&[10, 0, 0, 7]);
1371 assert_eq!(
1372 super::parse_dst_ip(&pkt),
1373 Some(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 7)))
1374 );
1375 }
1376
1377 #[cfg(windows)]
1378 #[test]
1379 fn test_parse_dst_ip_v6() {
1380 let mut pkt = vec![0u8; 40];
1382 pkt[0] = 0x60;
1383 pkt[24] = 0xfd;
1384 pkt[25] = 0x00;
1385 pkt[39] = 0x01;
1386 let expected = IpAddr::V6(Ipv6Addr::new(0xfd00, 0, 0, 0, 0, 0, 0, 1));
1387 assert_eq!(super::parse_dst_ip(&pkt), Some(expected));
1388 }
1389
1390 #[cfg(windows)]
1391 #[test]
1392 fn test_parse_dst_ip_truncated_returns_none() {
1393 let pkt = vec![0x45u8; 10];
1394 assert_eq!(super::parse_dst_ip(&pkt), None);
1395 assert_eq!(super::parse_dst_ip(&[]), None);
1396 }
1397
1398 #[cfg(windows)]
1399 #[test]
1400 fn test_parse_dst_ip_unknown_version_returns_none() {
1401 let pkt = vec![0x70u8; 64];
1402 assert_eq!(super::parse_dst_ip(&pkt), None);
1403 }
1404
1405 #[cfg(windows)]
1406 #[test]
1407 fn test_decode_key_b64_roundtrip() {
1408 use base64::{engine::general_purpose::STANDARD, Engine as _};
1409 let raw = [0x42u8; 32];
1410 let b64 = STANDARD.encode(raw);
1411 let decoded = super::decode_key_b64(&b64).expect("decode");
1412 assert_eq!(decoded, raw);
1413 }
1414
1415 #[cfg(windows)]
1416 #[test]
1417 fn test_decode_key_b64_wrong_length_errors() {
1418 use base64::{engine::general_purpose::STANDARD, Engine as _};
1419 let short = STANDARD.encode([0u8; 16]);
1420 assert!(super::decode_key_b64(&short).is_err());
1421 }
1422
1423 #[test]
1424 fn test_peer_info_ipv6_to_config() {
1425 let peer = PeerInfo::new(
1426 "test_public_key_v6".to_string(),
1427 SocketAddr::new(
1428 IpAddr::V6(Ipv6Addr::new(0xfd00, 0, 0, 0, 0, 0, 0, 1)),
1429 51820,
1430 ),
1431 "fd00::2/128",
1432 Duration::from_secs(25),
1433 );
1434
1435 let config = peer.to_peer_config();
1436 assert!(config.contains("PublicKey = test_public_key_v6"));
1437 assert!(
1439 config.contains("Endpoint = [fd00::1]:51820"),
1440 "IPv6 endpoint should use bracket notation, got: {config}"
1441 );
1442 assert!(config.contains("AllowedIPs = fd00::2/128"));
1443 }
1444
1445 #[test]
1446 fn test_overlay_cidr_parses_ipv4() {
1447 let cidr: ipnet::IpNet = "10.200.0.1/24".parse().unwrap();
1448 assert!(cidr.addr().is_ipv4());
1449 assert_eq!(cidr.prefix_len(), 24);
1450 assert_eq!(cidr.network().to_string(), "10.200.0.0");
1451 }
1452
1453 #[test]
1454 fn test_overlay_cidr_parses_ipv6() {
1455 let cidr: ipnet::IpNet = "fd00::1/48".parse().unwrap();
1456 assert!(cidr.addr().is_ipv6());
1457 assert_eq!(cidr.prefix_len(), 48);
1458 assert_eq!(cidr.network().to_string(), "fd00::");
1459 }
1460
1461 #[test]
1462 fn test_overlay_cidr_ipv6_host_address() {
1463 let cidr: ipnet::IpNet = "fd00::5/128".parse().unwrap();
1465 assert!(cidr.addr().is_ipv6());
1466 assert_eq!(cidr.prefix_len(), 128);
1467 assert_eq!(cidr.addr().to_string(), "fd00::5");
1468 }
1469
1470 #[test]
1471 fn test_peer_info_ipv6_allowed_ips_format() {
1472 let peer_v4 = PeerInfo::new(
1474 "key_v4".to_string(),
1475 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 51820),
1476 "10.200.0.5/32",
1477 Duration::from_secs(25),
1478 );
1479 assert_eq!(peer_v4.allowed_ips, "10.200.0.5/32");
1480
1481 let peer_v6 = PeerInfo::new(
1482 "key_v6".to_string(),
1483 SocketAddr::new(
1484 IpAddr::V6(Ipv6Addr::new(0xfd00, 0, 0, 0, 0, 0, 0, 5)),
1485 51820,
1486 ),
1487 "fd00::5/128",
1488 Duration::from_secs(25),
1489 );
1490 assert_eq!(peer_v6.allowed_ips, "fd00::5/128");
1491 }
1492
1493 #[test]
1494 fn test_uapi_body_format_ipv6_peer() {
1495 let endpoint = SocketAddr::new(
1498 IpAddr::V6(Ipv6Addr::new(0xfd00, 0, 0, 0, 0, 0, 0, 1)),
1499 51820,
1500 );
1501 let formatted = format!("endpoint={endpoint}");
1502 assert_eq!(formatted, "endpoint=[fd00::1]:51820");
1503 }
1504
1505 #[tokio::test]
1506 async fn test_generate_keys_native() {
1507 use base64::{engine::general_purpose::STANDARD, Engine as _};
1508 use x25519_dalek::{PublicKey, StaticSecret};
1509
1510 let (private_key, public_key) = OverlayTransport::generate_keys().await.unwrap();
1511
1512 assert_eq!(
1513 private_key.len(),
1514 44,
1515 "Private key should be 44 chars base64"
1516 );
1517 assert_eq!(public_key.len(), 44, "Public key should be 44 chars base64");
1518
1519 let priv_bytes = STANDARD.decode(&private_key).unwrap();
1520 let pub_bytes = STANDARD.decode(&public_key).unwrap();
1521 assert_eq!(priv_bytes.len(), 32);
1522 assert_eq!(pub_bytes.len(), 32);
1523
1524 let secret = StaticSecret::from(<[u8; 32]>::try_from(priv_bytes.as_slice()).unwrap());
1525 let expected_public = PublicKey::from(&secret);
1526 assert_eq!(pub_bytes.as_slice(), expected_public.as_bytes());
1527 }
1528
1529 #[tokio::test]
1530 async fn test_generate_keys_unique() {
1531 let (key1, _) = OverlayTransport::generate_keys().await.unwrap();
1532 let (key2, _) = OverlayTransport::generate_keys().await.unwrap();
1533 assert_ne!(
1534 key1, key2,
1535 "Sequential key generation should produce unique keys"
1536 );
1537 }
1538
1539 #[cfg(not(windows))]
1540 #[test]
1541 fn test_key_to_hex() {
1542 use base64::{engine::general_purpose::STANDARD, Engine as _};
1543
1544 let key_bytes = [0xABu8; 32];
1546 let base64_key = STANDARD.encode(key_bytes);
1547 let hex_key = key_to_hex(&base64_key).unwrap();
1548
1549 assert_eq!(hex_key, "ab".repeat(32));
1550 assert_eq!(hex_key.len(), 64, "Hex key should be 64 chars");
1551 }
1552
1553 #[cfg(not(windows))]
1554 #[test]
1555 fn test_key_to_hex_invalid_length() {
1556 use base64::{engine::general_purpose::STANDARD, Engine as _};
1557
1558 let short_bytes = [0xABu8; 16];
1559 let base64_key = STANDARD.encode(short_bytes);
1560 let result = key_to_hex(&base64_key);
1561 assert!(result.is_err());
1562 assert!(result
1563 .unwrap_err()
1564 .to_string()
1565 .contains("Invalid key length"));
1566 }
1567
1568 #[tokio::test]
1569 #[ignore = "Requires root/CAP_NET_ADMIN"]
1570 async fn test_create_interface_boringtun() {
1571 let config = OverlayConfig {
1572 overlay_cidr: "10.42.0.1/24".to_string(),
1573 cluster_cidr: None,
1574 private_key: "test_key".to_string(),
1575 public_key: "test_pub".to_string(),
1576 local_endpoint: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 51820),
1577 peer_discovery_interval: Duration::from_secs(30),
1578 #[cfg(feature = "nat")]
1579 nat: crate::nat::NatConfig::default(),
1580 uapi_sock_dir: std::path::PathBuf::from("/var/run/wireguard"),
1581 };
1582
1583 #[cfg(target_os = "macos")]
1586 let iface_name = "utun".to_string();
1587 #[cfg(not(target_os = "macos"))]
1588 let iface_name = "zl-bt-test0".to_string();
1589
1590 let mut transport = OverlayTransport::new(config, iface_name);
1591 let result = transport.create_interface().await;
1592
1593 match result {
1594 Ok(()) => {
1595 #[cfg(target_os = "macos")]
1596 assert!(
1597 transport.interface_name().starts_with("utun"),
1598 "macOS interface should be utunN, got: {}",
1599 transport.interface_name()
1600 );
1601 transport.shutdown();
1602 }
1603 Err(e) => {
1604 let msg = e.to_string();
1605 assert!(
1606 !msg.contains("Attribute failed policy validation"),
1607 "create_interface should not produce kernel WireGuard errors. Got: {msg}",
1608 );
1609 assert!(
1610 msg.contains("boringtun")
1611 || msg.contains("CAP_NET_ADMIN")
1612 || msg.contains("sudo"),
1613 "Error should mention boringtun, CAP_NET_ADMIN, or sudo. Got: {msg}",
1614 );
1615 }
1616 }
1617 }
1618
1619 #[tokio::test]
1620 #[ignore = "Requires root/CAP_NET_ADMIN"]
1621 async fn test_create_interface_boringtun_ipv6() {
1622 let config = OverlayConfig {
1623 overlay_cidr: "fd00::1/48".to_string(),
1624 cluster_cidr: None,
1625 private_key: "test_key".to_string(),
1626 public_key: "test_pub".to_string(),
1627 local_endpoint: SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 51820),
1628 peer_discovery_interval: Duration::from_secs(30),
1629 #[cfg(feature = "nat")]
1630 nat: crate::nat::NatConfig::default(),
1631 uapi_sock_dir: std::path::PathBuf::from("/var/run/wireguard"),
1632 };
1633
1634 #[cfg(target_os = "macos")]
1635 let iface_name = "utun".to_string();
1636 #[cfg(not(target_os = "macos"))]
1637 let iface_name = "zl-bt6-test0".to_string();
1638
1639 let mut transport = OverlayTransport::new(config, iface_name);
1640 let result = transport.create_interface().await;
1641
1642 match result {
1643 Ok(()) => {
1644 #[cfg(target_os = "macos")]
1645 assert!(
1646 transport.interface_name().starts_with("utun"),
1647 "macOS interface should be utunN, got: {}",
1648 transport.interface_name()
1649 );
1650 transport.shutdown();
1651 }
1652 Err(e) => {
1653 let msg = e.to_string();
1654 assert!(
1655 !msg.contains("Attribute failed policy validation"),
1656 "create_interface should not produce kernel WireGuard errors. Got: {msg}",
1657 );
1658 assert!(
1659 msg.contains("boringtun")
1660 || msg.contains("CAP_NET_ADMIN")
1661 || msg.contains("sudo"),
1662 "Error should mention boringtun, CAP_NET_ADMIN, or sudo. Got: {msg}",
1663 );
1664 }
1665 }
1666 }
1667}