1use crate::interface::platform_ops;
17use crate::{config::OverlayConfig, PeerInfo};
18#[cfg(not(windows))]
19use boringtun::device::{DeviceConfig, DeviceHandle};
20use std::fmt::Write;
21#[cfg(not(windows))]
22use tokio::io::{AsyncReadExt, AsyncWriteExt};
23#[cfg(not(windows))]
24use tokio::net::UnixStream;
25
26#[cfg(windows)]
27use crate::tun::WindowsTun;
28#[cfg(windows)]
29use boringtun::noise::{Tunn, TunnResult};
30#[cfg(windows)]
31use dashmap::DashMap;
32#[cfg(windows)]
33use parking_lot::RwLock;
34#[cfg(windows)]
35use std::net::{IpAddr, SocketAddr};
36#[cfg(windows)]
37use std::sync::atomic::{AtomicU64, Ordering};
38#[cfg(windows)]
39use std::sync::Arc;
40#[cfg(windows)]
41use std::time::{Duration, SystemTime, UNIX_EPOCH};
42#[cfg(windows)]
43use tokio::net::UdpSocket;
44#[cfg(windows)]
45use tokio::sync::Mutex as AsyncMutex;
46#[cfg(windows)]
47use tokio::task::JoinHandle;
48
49#[cfg(not(windows))]
56fn key_to_hex(base64_key: &str) -> Result<String, Box<dyn std::error::Error>> {
57 use base64::{engine::general_purpose::STANDARD, Engine as _};
58 let bytes = STANDARD.decode(base64_key)?;
59 if bytes.len() != 32 {
60 return Err(format!("Invalid key length: expected 32 bytes, got {}", bytes.len()).into());
61 }
62 Ok(hex::encode(bytes))
63}
64
65#[cfg(not(windows))]
70async fn uapi_set(sock_path: &str, body: &str) -> Result<(), Box<dyn std::error::Error>> {
71 let mut stream = UnixStream::connect(sock_path).await?;
72 let msg = format!("set=1\n{body}\n");
73 stream.write_all(msg.as_bytes()).await?;
74 stream.shutdown().await?;
75 let mut response = String::new();
76 stream.read_to_string(&mut response).await?;
77 if response.contains("errno=0") {
78 Ok(())
79 } else {
80 Err(format!("UAPI set failed: {}", response.trim()).into())
81 }
82}
83
84#[cfg(not(windows))]
86async fn uapi_get(sock_path: &str) -> Result<String, Box<dyn std::error::Error>> {
87 let mut stream = UnixStream::connect(sock_path).await?;
88 stream.write_all(b"get=1\n\n").await?;
89 stream.shutdown().await?;
90 let mut response = String::new();
91 stream.read_to_string(&mut response).await?;
92 Ok(response)
93}
94
95#[cfg(windows)]
109#[derive(Clone)]
110struct WindowsPeerState {
111 tunn: Arc<AsyncMutex<Tunn>>,
112 endpoint: Arc<RwLock<Option<SocketAddr>>>,
113 last_handshake_sec: Arc<AtomicU64>,
114 allowed_ips: Arc<Vec<ipnet::IpNet>>,
115 persistent_keepalive: Option<u16>,
116}
117
118#[cfg(windows)]
123fn decode_key_b64(b64: &str) -> Result<[u8; 32], Box<dyn std::error::Error>> {
124 use base64::{engine::general_purpose::STANDARD, Engine as _};
125 let bytes = STANDARD.decode(b64)?;
126 if bytes.len() != 32 {
127 return Err(format!(
128 "invalid WireGuard key length: expected 32 bytes, got {}",
129 bytes.len()
130 )
131 .into());
132 }
133 let mut out = [0u8; 32];
134 out.copy_from_slice(&bytes);
135 Ok(out)
136}
137
138#[cfg(windows)]
144fn parse_dst_ip(packet: &[u8]) -> Option<IpAddr> {
145 if packet.is_empty() {
146 return None;
147 }
148 match packet[0] >> 4 {
149 4 if packet.len() >= 20 => {
150 let b: [u8; 4] = packet[16..20].try_into().ok()?;
151 Some(IpAddr::from(b))
152 }
153 6 if packet.len() >= 40 => {
154 let b: [u8; 16] = packet[24..40].try_into().ok()?;
155 Some(IpAddr::from(b))
156 }
157 _ => None,
158 }
159}
160
161#[cfg(windows)]
168fn build_tunn(
169 our_priv: &[u8; 32],
170 peer_pub: &[u8; 32],
171 preshared: Option<[u8; 32]>,
172 persistent_keepalive: Option<u16>,
173) -> Tunn {
174 let priv_secret = boringtun::x25519::StaticSecret::from(*our_priv);
175 let peer_pub_key = boringtun::x25519::PublicKey::from(*peer_pub);
176 Tunn::new(
178 priv_secret,
179 peer_pub_key,
180 preshared,
181 persistent_keepalive,
182 0,
183 None,
184 )
185}
186
187pub struct OverlayTransport {
201 config: OverlayConfig,
202 interface_name: String,
203 #[cfg(not(windows))]
206 device: Option<DeviceHandle>,
207 #[cfg(windows)]
214 wintun_dev: Option<Arc<WindowsTun>>,
215 #[cfg(windows)]
219 udp: Option<Arc<UdpSocket>>,
220 #[cfg(windows)]
226 peers: Arc<DashMap<[u8; 32], WindowsPeerState>>,
227 #[cfg(windows)]
229 ingress_task: Option<JoinHandle<()>>,
230 #[cfg(windows)]
232 egress_task: Option<JoinHandle<()>>,
233 #[cfg(windows)]
236 timers_task: Option<JoinHandle<()>>,
237}
238
239impl OverlayTransport {
240 #[must_use]
242 pub fn new(config: OverlayConfig, interface_name: String) -> Self {
243 Self {
244 config,
245 interface_name,
246 #[cfg(not(windows))]
247 device: None,
248 #[cfg(windows)]
249 wintun_dev: None,
250 #[cfg(windows)]
251 udp: None,
252 #[cfg(windows)]
253 peers: Arc::new(DashMap::new()),
254 #[cfg(windows)]
255 ingress_task: None,
256 #[cfg(windows)]
257 egress_task: None,
258 #[cfg(windows)]
259 timers_task: None,
260 }
261 }
262
263 #[must_use]
269 pub fn interface_name(&self) -> &str {
270 &self.interface_name
271 }
272
273 #[cfg(not(windows))]
277 fn uapi_sock_path(&self) -> String {
278 format!("/var/run/wireguard/{}.sock", self.interface_name)
279 }
280
281 pub async fn create_interface(&mut self) -> Result<(), Box<dyn std::error::Error>> {
302 #[cfg(windows)]
303 {
304 self.create_interface_windows().await
305 }
306 #[cfg(not(windows))]
307 {
308 self.create_interface_unix().await
309 }
310 }
311
312 #[cfg(not(windows))]
314 async fn create_interface_unix(&mut self) -> Result<(), Box<dyn std::error::Error>> {
315 #[cfg(not(target_os = "macos"))]
318 if self.interface_name.len() > 15 {
319 return Err(format!(
320 "Interface name '{}' exceeds 15 character limit",
321 self.interface_name
322 )
323 .into());
324 }
325
326 tokio::fs::create_dir_all("/var/run/wireguard").await?;
328
329 #[cfg(target_os = "linux")]
341 {
342 let iface_ops = platform_ops();
343 match iface_ops.link_exists(&self.interface_name).await {
344 Ok(true) => {
345 return Err(format!(
346 "Kernel link '{}' already exists; refusing to delete it. \
347 If this is a stale interface from a previous crash, restart \
348 the daemon (its boot-time sweep clears stale zl-* / veth-* \
349 links). If this fires during normal operation, there is a \
350 duplicate-name bug somewhere in the overlay setup path.",
351 self.interface_name
352 )
353 .into());
354 }
355 Ok(false) => {}
356 Err(e) => {
357 tracing::warn!(
358 interface = %self.interface_name,
359 error = %e,
360 "failed to probe for existing overlay interface; proceeding"
361 );
362 }
363 }
364 }
365
366 let sock_path = format!("/var/run/wireguard/{}.sock", self.interface_name);
368 if tokio::fs::try_exists(&sock_path).await.unwrap_or(false) {
369 tracing::warn!(path = %sock_path, "removing stale UAPI socket");
370 let _ = tokio::fs::remove_file(&sock_path).await;
371 }
372
373 #[cfg(target_os = "macos")]
376 let existing_socks = {
377 let mut set = std::collections::HashSet::new();
378 if let Ok(mut entries) = tokio::fs::read_dir("/var/run/wireguard").await {
379 while let Ok(Some(entry)) = entries.next_entry().await {
380 set.insert(entry.file_name().to_string_lossy().to_string());
381 }
382 }
383 set
384 };
385
386 #[cfg(target_os = "macos")]
388 let name = "utun".to_string();
389 #[cfg(not(target_os = "macos"))]
390 let name = self.interface_name.clone();
391
392 let cfg = DeviceConfig {
393 n_threads: 2,
394 use_connected_socket: true,
395 #[cfg(target_os = "linux")]
396 use_multi_queue: false,
397 #[cfg(target_os = "linux")]
398 uapi_fd: -1,
399 };
400
401 let iface_name_for_err = self.interface_name.clone();
402
403 let handle = tokio::task::spawn_blocking(move || DeviceHandle::new(&name, cfg))
406 .await
407 .map_err(|e| format!("spawn_blocking join error: {e}"))?
408 .map_err(|e| {
409 #[cfg(target_os = "macos")]
410 let hint = "Requires root. Run with sudo or install as a system service (zlayer daemon install).";
411 #[cfg(not(target_os = "macos"))]
412 let hint = "Ensure CAP_NET_ADMIN capability is available.";
413 format!("Failed to create boringtun device '{iface_name_for_err}': {e}. {hint}")
414 })?;
415
416 self.device = Some(handle);
417
418 #[cfg(target_os = "macos")]
421 {
422 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
424 if let Ok(mut entries) = tokio::fs::read_dir("/var/run/wireguard").await {
425 while let Ok(Some(entry)) = entries.next_entry().await {
426 let fname = entry.file_name().to_string_lossy().to_string();
427 if !existing_socks.contains(&fname)
428 && fname.starts_with("utun")
429 && std::path::Path::new(&fname)
430 .extension()
431 .is_some_and(|ext| ext.eq_ignore_ascii_case("sock"))
432 {
433 self.interface_name = fname.trim_end_matches(".sock").to_string();
434 break;
435 }
436 }
437 }
438 }
439
440 tracing::info!(
441 interface = %self.interface_name,
442 "Created boringtun overlay transport"
443 );
444 Ok(())
445 }
446
447 #[cfg(windows)]
454 async fn create_interface_windows(&mut self) -> Result<(), Box<dyn std::error::Error>> {
455 if self.interface_name.len() > 64 {
458 return Err(format!(
459 "Wintun adapter name '{}' exceeds 64 character limit",
460 self.interface_name
461 )
462 .into());
463 }
464
465 let iface_name = self.interface_name.clone();
466 let mtu = 1420; let dev = tokio::task::spawn_blocking(move || WindowsTun::new(&iface_name, mtu))
472 .await
473 .map_err(|e| format!("spawn_blocking join error: {e}"))??;
474
475 tracing::info!(
476 interface = %self.interface_name,
477 luid = dev.luid_value(),
478 "Created Wintun overlay adapter"
479 );
480
481 self.wintun_dev = Some(Arc::new(dev));
482 Ok(())
483 }
484
485 pub async fn configure(
503 &mut self,
504 peers: &[PeerInfo],
505 ) -> Result<(), Box<dyn std::error::Error>> {
506 #[cfg(not(windows))]
507 {
508 let sock = self.uapi_sock_path();
509
510 let private_key_hex = key_to_hex(&self.config.private_key)?;
512 let mut body = format!(
513 "private_key={}\nlisten_port={}\n",
514 private_key_hex,
515 self.config.local_endpoint.port(),
516 );
517
518 for peer in peers {
519 let pub_hex = key_to_hex(&peer.public_key)?;
520 let _ = writeln!(body, "public_key={pub_hex}");
521 let _ = writeln!(body, "endpoint={}", peer.endpoint);
522 let _ = writeln!(body, "allowed_ip={}", peer.allowed_ips);
523 let _ = writeln!(
524 body,
525 "persistent_keepalive_interval={}",
526 peer.persistent_keepalive_interval.as_secs()
527 );
528 }
529
530 uapi_set(&sock, &body).await?;
531 tracing::debug!(interface = %self.interface_name, "Applied UAPI configuration");
532
533 self.configure_interface().await?;
535
536 tracing::info!(interface = %self.interface_name, "Overlay transport configured and up");
537 Ok(())
538 }
539
540 #[cfg(windows)]
541 {
542 self.configure_windows(peers).await
543 }
544 }
545
546 #[cfg(windows)]
554 async fn configure_windows(
555 &mut self,
556 peers: &[PeerInfo],
557 ) -> Result<(), Box<dyn std::error::Error>> {
558 self.configure_interface().await?;
560
561 if let Some(ref cluster_cidr_str) = self.config.cluster_cidr {
573 match cluster_cidr_str.parse::<ipnet::IpNet>() {
574 Ok(net) => {
575 use crate::interface::windows::WindowsIpHelperOps;
576 use crate::interface::InterfaceOps;
577 let ops = WindowsIpHelperOps::new();
578 let adapter_name = self.interface_name.clone();
579 match ops
580 .add_route_via_dev(net.network(), net.prefix_len(), &adapter_name)
581 .await
582 {
583 Ok(()) => {
584 tracing::info!(
585 cidr = %net,
586 adapter = %adapter_name,
587 "Installed cluster-CIDR host route via Wintun adapter"
588 );
589 }
590 Err(e) => {
591 tracing::warn!(
592 error = %e,
593 cidr = %net,
594 adapter = %adapter_name,
595 "Failed to install cluster-CIDR host route via Wintun (overlay traffic may not route across nodes); route may already exist"
596 );
597 }
598 }
599 }
600 Err(e) => {
601 tracing::warn!(
602 error = %e,
603 cidr = %cluster_cidr_str,
604 "cluster_cidr unparseable; skipping Wintun route install"
605 );
606 }
607 }
608 } else {
609 tracing::warn!(
610 "cluster_cidr not set in OverlayConfig; skipping Wintun route install (cross-node overlay traffic may not route)"
611 );
612 }
613
614 let tun = self
616 .wintun_dev
617 .as_ref()
618 .ok_or("Wintun adapter not initialized — call create_interface first")?
619 .clone();
620
621 let listen = self.config.local_endpoint;
624 let udp = Arc::new(
625 UdpSocket::bind(listen)
626 .await
627 .map_err(|e| format!("failed to bind WireGuard UDP socket on {listen}: {e}"))?,
628 );
629 self.udp = Some(udp.clone());
630
631 let priv_bytes = decode_key_b64(&self.config.private_key)?;
633 for peer in peers {
634 self.add_peer_windows(&priv_bytes, peer)?;
635 }
636
637 let peers_ingress = self.peers.clone();
641 let udp_ingress = udp.clone();
642 let tun_ingress = tun.clone();
643 self.ingress_task = Some(tokio::spawn(async move {
644 Self::ingress_loop(udp_ingress, tun_ingress, peers_ingress).await;
645 }));
646
647 let peers_egress = self.peers.clone();
648 let udp_egress = udp.clone();
649 let tun_egress = tun.clone();
650 self.egress_task = Some(tokio::spawn(async move {
651 Self::egress_loop(tun_egress, udp_egress, peers_egress).await;
652 }));
653
654 let peers_timers = self.peers.clone();
655 let udp_timers = udp.clone();
656 self.timers_task = Some(tokio::spawn(async move {
657 Self::timers_loop(udp_timers, peers_timers).await;
658 }));
659
660 tracing::info!(
661 interface = %self.interface_name,
662 peer_count = peers.len(),
663 listen = %listen,
664 "Windows overlay transport configured (Tunn pipeline online)"
665 );
666 Ok(())
667 }
668
669 #[cfg(windows)]
676 fn add_peer_windows(
677 &self,
678 our_priv: &[u8; 32],
679 peer: &PeerInfo,
680 ) -> Result<(), Box<dyn std::error::Error>> {
681 let peer_pub = decode_key_b64(&peer.public_key)?;
682 let allowed: ipnet::IpNet = peer
683 .allowed_ips
684 .parse()
685 .map_err(|e| format!("invalid allowed_ips '{}': {e}", peer.allowed_ips))?;
686 let keepalive = {
689 let secs = peer.persistent_keepalive_interval.as_secs();
690 if secs == 0 {
691 None
692 } else {
693 u16::try_from(secs).ok()
694 }
695 };
696
697 let tunn = build_tunn(our_priv, &peer_pub, None, keepalive);
698 let state = WindowsPeerState {
699 tunn: Arc::new(AsyncMutex::new(tunn)),
700 endpoint: Arc::new(RwLock::new(Some(peer.endpoint))),
701 last_handshake_sec: Arc::new(AtomicU64::new(0)),
702 allowed_ips: Arc::new(vec![allowed]),
703 persistent_keepalive: keepalive,
704 };
705 self.peers.insert(peer_pub, state);
706 tracing::debug!(
707 peer_key = %peer.public_key,
708 endpoint = %peer.endpoint,
709 allowed = %peer.allowed_ips,
710 "Added peer to Windows overlay peer map"
711 );
712 Ok(())
713 }
714
715 #[cfg(windows)]
735 async fn ingress_loop(
736 udp: Arc<UdpSocket>,
737 tun: Arc<WindowsTun>,
738 peers: Arc<DashMap<[u8; 32], WindowsPeerState>>,
739 ) {
740 let mut inbuf = vec![0u8; 65536];
742 loop {
743 let (n, src) = match udp.recv_from(&mut inbuf).await {
744 Ok(p) => p,
745 Err(e) => {
746 tracing::error!(error = %e, "UDP recv failed; ingress loop exiting");
747 break;
748 }
749 };
750
751 let snapshot: Vec<([u8; 32], WindowsPeerState)> = peers
754 .iter()
755 .map(|e| (*e.key(), e.value().clone()))
756 .collect();
757
758 for (pk, state) in snapshot {
759 let mut out = vec![0u8; 65536];
760 let mut handled = false;
761 {
762 let mut tunn = state.tunn.lock().await;
763 match tunn.decapsulate(Some(src.ip()), &inbuf[..n], &mut out) {
764 TunnResult::WriteToTunnelV4(pkt, _)
765 | TunnResult::WriteToTunnelV6(pkt, _) => {
766 let pkt_owned = pkt.to_vec();
767 drop(tunn);
768 if let Err(e) = tun.send(&pkt_owned).await {
769 tracing::warn!(error = %e, "Wintun send failed");
770 }
771 *state.endpoint.write() = Some(src);
772 state.last_handshake_sec.store(
773 SystemTime::now()
774 .duration_since(UNIX_EPOCH)
775 .unwrap_or_default()
776 .as_secs(),
777 Ordering::Relaxed,
778 );
779 handled = true;
780 }
781 TunnResult::WriteToNetwork(resp) => {
782 let resp_owned = resp.to_vec();
783 drop(tunn);
784 if let Err(e) = udp.send_to(&resp_owned, src).await {
785 tracing::warn!(error = %e, "UDP reply send failed");
786 }
787 *state.endpoint.write() = Some(src);
788 handled = true;
789 }
790 TunnResult::Done | TunnResult::Err(_) => {
791 }
793 }
794 }
795 if handled {
796 loop {
801 let mut drain = vec![0u8; 65536];
802 let mut tunn = state.tunn.lock().await;
803 match tunn.decapsulate(None, &[], &mut drain) {
804 TunnResult::WriteToNetwork(resp) => {
805 let resp_owned = resp.to_vec();
806 drop(tunn);
807 if let Err(e) = udp.send_to(&resp_owned, src).await {
808 tracing::warn!(error = %e, "UDP drain send failed");
809 }
810 }
811 TunnResult::WriteToTunnelV4(pkt, _)
812 | TunnResult::WriteToTunnelV6(pkt, _) => {
813 let pkt_owned = pkt.to_vec();
814 drop(tunn);
815 if let Err(e) = tun.send(&pkt_owned).await {
816 tracing::warn!(error = %e, "Wintun drain send failed");
817 }
818 }
819 TunnResult::Done | TunnResult::Err(_) => break,
820 }
821 }
822 let _ = pk; break;
824 }
825 }
826 }
827 }
828
829 #[cfg(windows)]
838 async fn egress_loop(
839 tun: Arc<WindowsTun>,
840 udp: Arc<UdpSocket>,
841 peers: Arc<DashMap<[u8; 32], WindowsPeerState>>,
842 ) {
843 let mut buf = vec![0u8; 65536];
844 loop {
845 let n = match tun.recv(&mut buf).await {
846 Ok(n) => n,
847 Err(e) => {
848 tracing::error!(error = %e, "Wintun recv failed; egress loop exiting");
849 break;
850 }
851 };
852
853 let Some(dst_ip) = parse_dst_ip(&buf[..n]) else {
854 continue;
855 };
856
857 let state = peers.iter().find_map(|entry| {
859 if entry
860 .value()
861 .allowed_ips
862 .iter()
863 .any(|net| net.contains(&dst_ip))
864 {
865 Some(entry.value().clone())
866 } else {
867 None
868 }
869 });
870 let Some(state) = state else {
871 tracing::trace!(%dst_ip, "no matching overlay peer");
872 continue;
873 };
874
875 let endpoint = *state.endpoint.read();
876 let Some(endpoint) = endpoint else {
877 tracing::trace!(%dst_ip, "peer has no endpoint yet; dropping");
878 continue;
879 };
880
881 let mut out = vec![0u8; 65536 + 32];
885 let mut tunn = state.tunn.lock().await;
886 match tunn.encapsulate(&buf[..n], &mut out) {
887 TunnResult::WriteToNetwork(pkt) => {
888 let pkt_owned = pkt.to_vec();
889 drop(tunn);
890 if let Err(e) = udp.send_to(&pkt_owned, endpoint).await {
891 tracing::warn!(error = %e, "UDP send failed");
892 }
893 }
894 TunnResult::Done
895 | TunnResult::WriteToTunnelV4(_, _)
896 | TunnResult::WriteToTunnelV6(_, _) => {
897 }
903 TunnResult::Err(e) => {
904 tracing::warn!(?e, "encapsulate error");
905 }
906 }
907 }
908 }
909
910 #[cfg(windows)]
918 async fn timers_loop(udp: Arc<UdpSocket>, peers: Arc<DashMap<[u8; 32], WindowsPeerState>>) {
919 let mut interval = tokio::time::interval(Duration::from_millis(250));
920 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
921 loop {
922 interval.tick().await;
923 let snapshot: Vec<WindowsPeerState> = peers.iter().map(|e| e.value().clone()).collect();
924 for state in snapshot {
925 let endpoint = *state.endpoint.read();
926 let mut out = vec![0u8; 148];
927 let mut tunn = state.tunn.lock().await;
928 match tunn.update_timers(&mut out) {
929 TunnResult::WriteToNetwork(pkt) => {
930 let pkt_owned = pkt.to_vec();
931 drop(tunn);
932 if let Some(ep) = endpoint {
933 if let Err(e) = udp.send_to(&pkt_owned, ep).await {
934 tracing::debug!(error = %e, "timers UDP send failed");
935 }
936 }
937 }
938 TunnResult::Done
939 | TunnResult::WriteToTunnelV4(_, _)
940 | TunnResult::WriteToTunnelV6(_, _) => {}
941 TunnResult::Err(e) => {
942 tracing::debug!(?e, "update_timers error");
943 }
944 }
945 }
946 }
947 }
948
949 async fn configure_interface(&self) -> Result<(), Box<dyn std::error::Error>> {
964 let cidr: ipnet::IpNet = self.config.overlay_cidr.parse().map_err(|e| {
965 format!(
966 "Failed to parse overlay CIDR '{}': {e}",
967 self.config.overlay_cidr
968 )
969 })?;
970 let overlay_addr = cidr.addr();
971 let prefix_len = cidr.prefix_len();
972 let net_addr = cidr.network();
973
974 let iface_ops = platform_ops();
975
976 if let Err(e) = iface_ops
981 .add_address(&self.interface_name, overlay_addr, prefix_len)
982 .await
983 {
984 let msg = e.to_string();
985 if !msg.contains("File exists") && !msg.contains("EEXIST") {
986 return Err(format!("Failed to assign IP: {msg}").into());
987 }
988 }
989
990 iface_ops
994 .set_link_up(&self.interface_name)
995 .await
996 .map_err(|e| format!("Failed to bring up interface: {e}"))?;
997
998 if let Err(e) = iface_ops
1002 .add_route_via_dev(net_addr, prefix_len, &self.interface_name)
1003 .await
1004 {
1005 let msg = e.to_string();
1006 if !msg.contains("File exists")
1007 && !msg.contains("EEXIST")
1008 && !msg.contains("already in table")
1009 {
1010 return Err(format!("Failed to add route: {msg}").into());
1011 }
1012 }
1013
1014 Ok(())
1015 }
1016
1017 #[cfg_attr(windows, allow(clippy::unused_async))]
1028 pub async fn add_peer(&self, peer: &PeerInfo) -> Result<(), Box<dyn std::error::Error>> {
1029 #[cfg(not(windows))]
1030 {
1031 let sock = self.uapi_sock_path();
1032 let pub_hex = key_to_hex(&peer.public_key)?;
1033
1034 let body = format!(
1035 "public_key={}\nendpoint={}\nallowed_ip={}\npersistent_keepalive_interval={}\n",
1036 pub_hex,
1037 peer.endpoint,
1038 peer.allowed_ips,
1039 peer.persistent_keepalive_interval.as_secs(),
1040 );
1041
1042 uapi_set(&sock, &body).await?;
1043 tracing::debug!(
1044 peer_key = %peer.public_key,
1045 interface = %self.interface_name,
1046 "Added peer via UAPI"
1047 );
1048 Ok(())
1049 }
1050 #[cfg(windows)]
1051 {
1052 let priv_bytes = decode_key_b64(&self.config.private_key)?;
1053 self.add_peer_windows(&priv_bytes, peer)?;
1054 Ok(())
1055 }
1056 }
1057
1058 #[cfg_attr(windows, allow(clippy::unused_async))]
1069 pub async fn remove_peer(&self, public_key: &str) -> Result<(), Box<dyn std::error::Error>> {
1070 #[cfg(not(windows))]
1071 {
1072 let sock = self.uapi_sock_path();
1073 let pub_hex = key_to_hex(public_key)?;
1074
1075 let body = format!("public_key={pub_hex}\nremove=true\n");
1076
1077 uapi_set(&sock, &body).await?;
1078 tracing::debug!(
1079 peer_key = %public_key,
1080 interface = %self.interface_name,
1081 "Removed peer via UAPI"
1082 );
1083 Ok(())
1084 }
1085 #[cfg(windows)]
1086 {
1087 let pk = decode_key_b64(public_key)?;
1088 self.peers.remove(&pk);
1089 tracing::debug!(
1090 peer_key = %public_key,
1091 interface = %self.interface_name,
1092 "Removed peer from Windows overlay"
1093 );
1094 Ok(())
1095 }
1096 }
1097
1098 #[cfg_attr(windows, allow(clippy::unused_async))]
1109 pub async fn status(&self) -> Result<String, Box<dyn std::error::Error>> {
1110 #[cfg(not(windows))]
1111 {
1112 let sock = self.uapi_sock_path();
1113 let response = uapi_get(&sock).await?;
1114 Ok(response)
1115 }
1116 #[cfg(windows)]
1117 {
1118 use base64::{engine::general_purpose::STANDARD, Engine as _};
1121 let mut out = String::new();
1122 let priv_bytes = decode_key_b64(&self.config.private_key).unwrap_or([0u8; 32]);
1123 let _ = writeln!(out, "private_key={}", hex::encode(priv_bytes));
1124 let _ = writeln!(out, "listen_port={}", self.config.local_endpoint.port());
1125 for entry in self.peers.iter() {
1126 let pk_b64 = STANDARD.encode(entry.key());
1127 let _ = writeln!(out, "public_key={}", hex::encode(entry.key()));
1128 let _ = writeln!(out, "public_key_b64={pk_b64}");
1129 if let Some(ep) = *entry.value().endpoint.read() {
1130 let _ = writeln!(out, "endpoint={ep}");
1131 }
1132 for net in entry.value().allowed_ips.iter() {
1133 let _ = writeln!(out, "allowed_ip={net}");
1134 }
1135 if let Some(k) = entry.value().persistent_keepalive {
1136 let _ = writeln!(out, "persistent_keepalive_interval={k}");
1137 }
1138 let last = entry.value().last_handshake_sec.load(Ordering::Relaxed);
1139 let _ = writeln!(out, "last_handshake_time_sec={last}");
1140 }
1141 let _ = writeln!(out, "errno=0");
1142 Ok(out)
1143 }
1144 }
1145
1146 #[allow(clippy::unused_async)]
1155 pub async fn generate_keys() -> Result<(String, String), Box<dyn std::error::Error>> {
1156 use base64::{engine::general_purpose::STANDARD, Engine as _};
1157 use x25519_dalek::{PublicKey, StaticSecret};
1158
1159 let secret = StaticSecret::random();
1160 let public = PublicKey::from(&secret);
1161
1162 let private_key = STANDARD.encode(secret.to_bytes());
1163 let public_key = STANDARD.encode(public.as_bytes());
1164
1165 Ok((private_key, public_key))
1166 }
1167
1168 #[cfg(feature = "nat")]
1177 #[cfg_attr(windows, allow(clippy::unused_async))]
1178 pub async fn update_peer_endpoint(
1179 &self,
1180 public_key: &str,
1181 new_endpoint: std::net::SocketAddr,
1182 ) -> Result<(), Box<dyn std::error::Error>> {
1183 #[cfg(not(windows))]
1184 {
1185 let sock = self.uapi_sock_path();
1186 let pub_hex = key_to_hex(public_key)?;
1187 let body = format!("public_key={pub_hex}\nendpoint={new_endpoint}\n");
1188 uapi_set(&sock, &body).await?;
1189 tracing::debug!(
1190 peer_key = %public_key,
1191 endpoint = %new_endpoint,
1192 "Updated peer endpoint"
1193 );
1194 Ok(())
1195 }
1196 #[cfg(windows)]
1197 {
1198 let pk = decode_key_b64(public_key)?;
1199 let entry = self
1200 .peers
1201 .get(&pk)
1202 .ok_or_else(|| format!("peer not found: {public_key}"))?;
1203 *entry.value().endpoint.write() = Some(new_endpoint);
1204 tracing::debug!(
1205 peer_key = %public_key,
1206 endpoint = %new_endpoint,
1207 "Updated peer endpoint (Windows)"
1208 );
1209 Ok(())
1210 }
1211 }
1212
1213 #[cfg(feature = "nat")]
1222 #[cfg_attr(windows, allow(clippy::unused_async))]
1223 pub async fn check_peer_handshake(
1224 &self,
1225 public_key: &str,
1226 since: u64,
1227 ) -> Result<bool, Box<dyn std::error::Error>> {
1228 #[cfg(not(windows))]
1229 {
1230 let sock = self.uapi_sock_path();
1231 let response = uapi_get(&sock).await?;
1232 let target_hex = key_to_hex(public_key)?;
1233
1234 let mut in_target = false;
1235 for line in response.lines() {
1236 let line = line.trim();
1237 if line.is_empty() || line.starts_with("errno=") {
1238 continue;
1239 }
1240 let Some((key, value)) = line.split_once('=') else {
1241 continue;
1242 };
1243 match key {
1244 "public_key" => {
1245 in_target = value == target_hex;
1246 }
1247 "last_handshake_time_sec" if in_target => {
1248 if let Ok(t) = value.parse::<u64>() {
1249 return Ok(t > 0 && t >= since);
1250 }
1251 }
1252 _ => {}
1253 }
1254 }
1255 Ok(false)
1256 }
1257 #[cfg(windows)]
1258 {
1259 let pk = decode_key_b64(public_key)?;
1260 let entry = self
1261 .peers
1262 .get(&pk)
1263 .ok_or_else(|| format!("peer not found: {public_key}"))?;
1264 let last = entry.value().last_handshake_sec.load(Ordering::Relaxed);
1265 Ok(last > 0 && last >= since)
1266 }
1267 }
1268
1269 pub fn shutdown(&mut self) {
1279 #[cfg(not(windows))]
1280 if let Some(device) = self.device.take() {
1281 tracing::info!(
1282 interface = %self.interface_name,
1283 "Shutting down overlay transport"
1284 );
1285 drop(device);
1287 }
1288 #[cfg(windows)]
1289 {
1290 if let Some(h) = self.ingress_task.take() {
1291 h.abort();
1292 }
1293 if let Some(h) = self.egress_task.take() {
1294 h.abort();
1295 }
1296 if let Some(h) = self.timers_task.take() {
1297 h.abort();
1298 }
1299 self.udp.take();
1303 self.peers.clear();
1304 if let Some(dev) = self.wintun_dev.take() {
1305 tracing::info!(
1306 interface = %self.interface_name,
1307 "Shutting down Wintun overlay transport"
1308 );
1309 drop(dev);
1310 }
1311 }
1312 }
1313}
1314
1315impl Drop for OverlayTransport {
1316 fn drop(&mut self) {
1317 self.shutdown();
1318 }
1319}
1320
1321#[cfg(test)]
1326mod tests {
1327 use super::*;
1328 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
1329 use std::time::Duration;
1330
1331 #[test]
1332 fn test_peer_info_to_config() {
1333 let peer = PeerInfo::new(
1334 "test_public_key".to_string(),
1335 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)), 51820),
1336 "10.0.0.2/32",
1337 Duration::from_secs(25),
1338 );
1339
1340 let config = peer.to_peer_config();
1341 assert!(config.contains("PublicKey = test_public_key"));
1342 assert!(config.contains("Endpoint = 10.0.0.1:51820"));
1343 }
1344
1345 #[cfg(windows)]
1350 #[test]
1351 fn test_parse_dst_ip_v4() {
1352 let mut pkt = vec![0u8; 20];
1355 pkt[0] = 0x45;
1356 pkt[16..20].copy_from_slice(&[10, 0, 0, 7]);
1357 assert_eq!(
1358 super::parse_dst_ip(&pkt),
1359 Some(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 7)))
1360 );
1361 }
1362
1363 #[cfg(windows)]
1364 #[test]
1365 fn test_parse_dst_ip_v6() {
1366 let mut pkt = vec![0u8; 40];
1368 pkt[0] = 0x60;
1369 pkt[24] = 0xfd;
1370 pkt[25] = 0x00;
1371 pkt[39] = 0x01;
1372 let expected = IpAddr::V6(Ipv6Addr::new(0xfd00, 0, 0, 0, 0, 0, 0, 1));
1373 assert_eq!(super::parse_dst_ip(&pkt), Some(expected));
1374 }
1375
1376 #[cfg(windows)]
1377 #[test]
1378 fn test_parse_dst_ip_truncated_returns_none() {
1379 let pkt = vec![0x45u8; 10];
1380 assert_eq!(super::parse_dst_ip(&pkt), None);
1381 assert_eq!(super::parse_dst_ip(&[]), None);
1382 }
1383
1384 #[cfg(windows)]
1385 #[test]
1386 fn test_parse_dst_ip_unknown_version_returns_none() {
1387 let pkt = vec![0x70u8; 64];
1388 assert_eq!(super::parse_dst_ip(&pkt), None);
1389 }
1390
1391 #[cfg(windows)]
1392 #[test]
1393 fn test_decode_key_b64_roundtrip() {
1394 use base64::{engine::general_purpose::STANDARD, Engine as _};
1395 let raw = [0x42u8; 32];
1396 let b64 = STANDARD.encode(raw);
1397 let decoded = super::decode_key_b64(&b64).expect("decode");
1398 assert_eq!(decoded, raw);
1399 }
1400
1401 #[cfg(windows)]
1402 #[test]
1403 fn test_decode_key_b64_wrong_length_errors() {
1404 use base64::{engine::general_purpose::STANDARD, Engine as _};
1405 let short = STANDARD.encode([0u8; 16]);
1406 assert!(super::decode_key_b64(&short).is_err());
1407 }
1408
1409 #[test]
1410 fn test_peer_info_ipv6_to_config() {
1411 let peer = PeerInfo::new(
1412 "test_public_key_v6".to_string(),
1413 SocketAddr::new(
1414 IpAddr::V6(Ipv6Addr::new(0xfd00, 0, 0, 0, 0, 0, 0, 1)),
1415 51820,
1416 ),
1417 "fd00::2/128",
1418 Duration::from_secs(25),
1419 );
1420
1421 let config = peer.to_peer_config();
1422 assert!(config.contains("PublicKey = test_public_key_v6"));
1423 assert!(
1425 config.contains("Endpoint = [fd00::1]:51820"),
1426 "IPv6 endpoint should use bracket notation, got: {config}"
1427 );
1428 assert!(config.contains("AllowedIPs = fd00::2/128"));
1429 }
1430
1431 #[test]
1432 fn test_overlay_cidr_parses_ipv4() {
1433 let cidr: ipnet::IpNet = "10.200.0.1/24".parse().unwrap();
1434 assert!(cidr.addr().is_ipv4());
1435 assert_eq!(cidr.prefix_len(), 24);
1436 assert_eq!(cidr.network().to_string(), "10.200.0.0");
1437 }
1438
1439 #[test]
1440 fn test_overlay_cidr_parses_ipv6() {
1441 let cidr: ipnet::IpNet = "fd00::1/48".parse().unwrap();
1442 assert!(cidr.addr().is_ipv6());
1443 assert_eq!(cidr.prefix_len(), 48);
1444 assert_eq!(cidr.network().to_string(), "fd00::");
1445 }
1446
1447 #[test]
1448 fn test_overlay_cidr_ipv6_host_address() {
1449 let cidr: ipnet::IpNet = "fd00::5/128".parse().unwrap();
1451 assert!(cidr.addr().is_ipv6());
1452 assert_eq!(cidr.prefix_len(), 128);
1453 assert_eq!(cidr.addr().to_string(), "fd00::5");
1454 }
1455
1456 #[test]
1457 fn test_peer_info_ipv6_allowed_ips_format() {
1458 let peer_v4 = PeerInfo::new(
1460 "key_v4".to_string(),
1461 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 51820),
1462 "10.200.0.5/32",
1463 Duration::from_secs(25),
1464 );
1465 assert_eq!(peer_v4.allowed_ips, "10.200.0.5/32");
1466
1467 let peer_v6 = PeerInfo::new(
1468 "key_v6".to_string(),
1469 SocketAddr::new(
1470 IpAddr::V6(Ipv6Addr::new(0xfd00, 0, 0, 0, 0, 0, 0, 5)),
1471 51820,
1472 ),
1473 "fd00::5/128",
1474 Duration::from_secs(25),
1475 );
1476 assert_eq!(peer_v6.allowed_ips, "fd00::5/128");
1477 }
1478
1479 #[test]
1480 fn test_uapi_body_format_ipv6_peer() {
1481 let endpoint = SocketAddr::new(
1484 IpAddr::V6(Ipv6Addr::new(0xfd00, 0, 0, 0, 0, 0, 0, 1)),
1485 51820,
1486 );
1487 let formatted = format!("endpoint={endpoint}");
1488 assert_eq!(formatted, "endpoint=[fd00::1]:51820");
1489 }
1490
1491 #[tokio::test]
1492 async fn test_generate_keys_native() {
1493 use base64::{engine::general_purpose::STANDARD, Engine as _};
1494 use x25519_dalek::{PublicKey, StaticSecret};
1495
1496 let (private_key, public_key) = OverlayTransport::generate_keys().await.unwrap();
1497
1498 assert_eq!(
1499 private_key.len(),
1500 44,
1501 "Private key should be 44 chars base64"
1502 );
1503 assert_eq!(public_key.len(), 44, "Public key should be 44 chars base64");
1504
1505 let priv_bytes = STANDARD.decode(&private_key).unwrap();
1506 let pub_bytes = STANDARD.decode(&public_key).unwrap();
1507 assert_eq!(priv_bytes.len(), 32);
1508 assert_eq!(pub_bytes.len(), 32);
1509
1510 let secret = StaticSecret::from(<[u8; 32]>::try_from(priv_bytes.as_slice()).unwrap());
1511 let expected_public = PublicKey::from(&secret);
1512 assert_eq!(pub_bytes.as_slice(), expected_public.as_bytes());
1513 }
1514
1515 #[tokio::test]
1516 async fn test_generate_keys_unique() {
1517 let (key1, _) = OverlayTransport::generate_keys().await.unwrap();
1518 let (key2, _) = OverlayTransport::generate_keys().await.unwrap();
1519 assert_ne!(
1520 key1, key2,
1521 "Sequential key generation should produce unique keys"
1522 );
1523 }
1524
1525 #[cfg(not(windows))]
1526 #[test]
1527 fn test_key_to_hex() {
1528 use base64::{engine::general_purpose::STANDARD, Engine as _};
1529
1530 let key_bytes = [0xABu8; 32];
1532 let base64_key = STANDARD.encode(key_bytes);
1533 let hex_key = key_to_hex(&base64_key).unwrap();
1534
1535 assert_eq!(hex_key, "ab".repeat(32));
1536 assert_eq!(hex_key.len(), 64, "Hex key should be 64 chars");
1537 }
1538
1539 #[cfg(not(windows))]
1540 #[test]
1541 fn test_key_to_hex_invalid_length() {
1542 use base64::{engine::general_purpose::STANDARD, Engine as _};
1543
1544 let short_bytes = [0xABu8; 16];
1545 let base64_key = STANDARD.encode(short_bytes);
1546 let result = key_to_hex(&base64_key);
1547 assert!(result.is_err());
1548 assert!(result
1549 .unwrap_err()
1550 .to_string()
1551 .contains("Invalid key length"));
1552 }
1553
1554 #[tokio::test]
1555 #[ignore = "Requires root/CAP_NET_ADMIN"]
1556 async fn test_create_interface_boringtun() {
1557 let config = OverlayConfig {
1558 overlay_cidr: "10.42.0.1/24".to_string(),
1559 cluster_cidr: None,
1560 private_key: "test_key".to_string(),
1561 public_key: "test_pub".to_string(),
1562 local_endpoint: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 51820),
1563 peer_discovery_interval: Duration::from_secs(30),
1564 #[cfg(feature = "nat")]
1565 nat: crate::nat::NatConfig::default(),
1566 };
1567
1568 #[cfg(target_os = "macos")]
1571 let iface_name = "utun".to_string();
1572 #[cfg(not(target_os = "macos"))]
1573 let iface_name = "zl-bt-test0".to_string();
1574
1575 let mut transport = OverlayTransport::new(config, iface_name);
1576 let result = transport.create_interface().await;
1577
1578 match result {
1579 Ok(()) => {
1580 #[cfg(target_os = "macos")]
1581 assert!(
1582 transport.interface_name().starts_with("utun"),
1583 "macOS interface should be utunN, got: {}",
1584 transport.interface_name()
1585 );
1586 transport.shutdown();
1587 }
1588 Err(e) => {
1589 let msg = e.to_string();
1590 assert!(
1591 !msg.contains("Attribute failed policy validation"),
1592 "create_interface should not produce kernel WireGuard errors. Got: {msg}",
1593 );
1594 assert!(
1595 msg.contains("boringtun")
1596 || msg.contains("CAP_NET_ADMIN")
1597 || msg.contains("sudo"),
1598 "Error should mention boringtun, CAP_NET_ADMIN, or sudo. Got: {msg}",
1599 );
1600 }
1601 }
1602 }
1603
1604 #[tokio::test]
1605 #[ignore = "Requires root/CAP_NET_ADMIN"]
1606 async fn test_create_interface_boringtun_ipv6() {
1607 let config = OverlayConfig {
1608 overlay_cidr: "fd00::1/48".to_string(),
1609 cluster_cidr: None,
1610 private_key: "test_key".to_string(),
1611 public_key: "test_pub".to_string(),
1612 local_endpoint: SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 51820),
1613 peer_discovery_interval: Duration::from_secs(30),
1614 #[cfg(feature = "nat")]
1615 nat: crate::nat::NatConfig::default(),
1616 };
1617
1618 #[cfg(target_os = "macos")]
1619 let iface_name = "utun".to_string();
1620 #[cfg(not(target_os = "macos"))]
1621 let iface_name = "zl-bt6-test0".to_string();
1622
1623 let mut transport = OverlayTransport::new(config, iface_name);
1624 let result = transport.create_interface().await;
1625
1626 match result {
1627 Ok(()) => {
1628 #[cfg(target_os = "macos")]
1629 assert!(
1630 transport.interface_name().starts_with("utun"),
1631 "macOS interface should be utunN, got: {}",
1632 transport.interface_name()
1633 );
1634 transport.shutdown();
1635 }
1636 Err(e) => {
1637 let msg = e.to_string();
1638 assert!(
1639 !msg.contains("Attribute failed policy validation"),
1640 "create_interface should not produce kernel WireGuard errors. Got: {msg}",
1641 );
1642 assert!(
1643 msg.contains("boringtun")
1644 || msg.contains("CAP_NET_ADMIN")
1645 || msg.contains("sudo"),
1646 "Error should mention boringtun, CAP_NET_ADMIN, or sudo. Got: {msg}",
1647 );
1648 }
1649 }
1650 }
1651}