1use crate::error::AgentError;
2use ipnetwork::IpNetwork;
3use serde::{Deserialize, Serialize};
4use std::collections::hash_map::DefaultHasher;
5use std::collections::HashMap;
6use std::hash::{Hash, Hasher};
7use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
8#[cfg(target_os = "linux")]
9use std::os::fd::AsFd;
10use std::path::Path;
11use std::sync::atomic::{AtomicU64, Ordering};
12use tokio::sync::RwLock;
13use zlayer_overlay::{
14 NatConfig, NatPeerSnapshot, NatStatusSnapshot, NatTraversal, OverlayConfig, OverlayTransport,
15};
16
17const MAX_IFNAME_LEN: usize = 15;
19
20#[must_use]
26pub fn make_interface_name(parts: &[&str], suffix: &str) -> String {
27 let base = format!("zl-{}", parts.join("-"));
28 let candidate = if suffix.is_empty() {
29 base
30 } else {
31 format!("{base}-{suffix}")
32 };
33
34 if candidate.len() <= MAX_IFNAME_LEN {
35 return candidate;
36 }
37
38 let mut hasher = DefaultHasher::new();
40 for part in parts {
41 part.hash(&mut hasher);
42 }
43 suffix.hash(&mut hasher);
44 let hash = format!("{:x}", hasher.finish());
45
46 if suffix.is_empty() {
47 let budget = MAX_IFNAME_LEN - 3;
49 format!("zl-{}", &hash[..budget.min(hash.len())])
50 } else {
51 let suffix_cost = 1 + suffix.len(); let hash_budget = MAX_IFNAME_LEN.saturating_sub(3 + suffix_cost);
54 if hash_budget == 0 {
55 let budget = MAX_IFNAME_LEN - 3;
57 format!("zl-{}", &hash[..budget.min(hash.len())])
58 } else {
59 format!("zl-{}-{}", &hash[..hash_budget.min(hash.len())], suffix)
60 }
61 }
62}
63
64pub struct OverlayManager {
66 deployment: String,
68 global_interface: Option<String>,
70 global_transport: Option<OverlayTransport>,
72 service_interfaces: RwLock<HashMap<String, String>>,
74 service_transports: RwLock<HashMap<String, OverlayTransport>>,
76 ip_allocator: IpAllocator,
78 node_ip: Option<IpAddr>,
81 overlay_port: u16,
83 cluster_cidr: Option<IpNetwork>,
87 slice_cidr: Option<IpNetwork>,
91 #[cfg(target_os = "windows")]
95 hcn_cleanup: std::sync::Arc<
96 tokio::sync::Mutex<
97 std::collections::HashMap<windows::core::GUID, (String, std::net::IpAddr)>,
98 >,
99 >,
100 dns_server_addr: Option<SocketAddr>,
106 dns_domain: Option<String>,
112 nat_config: Option<NatConfig>,
117 nat_traversal: tokio::sync::RwLock<Option<NatTraversal>>,
123 nat_last_refresh: AtomicU64,
126}
127
128impl OverlayManager {
129 #[allow(clippy::unused_async)]
144 pub async fn new(deployment: String) -> Result<Self, AgentError> {
145 tracing::debug!(
146 deployment = %deployment,
147 "OverlayManager::new uses full /16 default; cluster deployments should use with_slice"
148 );
149 let default_cidr: IpNetwork = "10.200.0.0/16".parse().unwrap();
150 Ok(Self {
151 deployment,
152 global_interface: None,
153 global_transport: None,
154 service_interfaces: RwLock::new(HashMap::new()),
155 service_transports: RwLock::new(HashMap::new()),
156 ip_allocator: IpAllocator::new(default_cidr),
157 node_ip: None,
158 overlay_port: zlayer_core::DEFAULT_WG_PORT,
159 cluster_cidr: Some(default_cidr),
160 slice_cidr: None,
161 #[cfg(target_os = "windows")]
162 hcn_cleanup: std::sync::Arc::new(tokio::sync::Mutex::new(
163 std::collections::HashMap::new(),
164 )),
165 dns_server_addr: None,
166 dns_domain: None,
167 nat_config: None,
168 nat_traversal: tokio::sync::RwLock::new(None),
169 nat_last_refresh: AtomicU64::new(0),
170 })
171 }
172
173 #[must_use]
184 pub fn with_slice(
185 deployment: String,
186 cluster_cidr: IpNetwork,
187 slice_cidr: IpNetwork,
188 port: u16,
189 ) -> Self {
190 Self {
191 deployment,
192 global_interface: None,
193 global_transport: None,
194 service_interfaces: RwLock::new(HashMap::new()),
195 service_transports: RwLock::new(HashMap::new()),
196 ip_allocator: IpAllocator::new(slice_cidr),
197 node_ip: None,
198 overlay_port: port,
199 cluster_cidr: Some(cluster_cidr),
200 slice_cidr: Some(slice_cidr),
201 #[cfg(target_os = "windows")]
202 hcn_cleanup: std::sync::Arc::new(tokio::sync::Mutex::new(
203 std::collections::HashMap::new(),
204 )),
205 dns_server_addr: None,
206 dns_domain: None,
207 nat_config: None,
208 nat_traversal: tokio::sync::RwLock::new(None),
209 nat_last_refresh: AtomicU64::new(0),
210 }
211 }
212
213 #[must_use]
215 pub fn with_overlay_port(mut self, port: u16) -> Self {
216 self.overlay_port = port;
217 self
218 }
219
220 #[must_use]
227 pub fn with_nat_config(mut self, nat: NatConfig) -> Self {
228 self.nat_config = Some(nat);
229 self
230 }
231
232 pub async fn service_count(&self) -> usize {
240 self.service_interfaces.read().await.len()
241 }
242
243 #[must_use]
249 pub fn nat_enabled(&self) -> bool {
250 self.nat_config
251 .as_ref()
252 .map_or_else(|| NatConfig::default().enabled, |c| c.enabled)
253 }
254
255 #[must_use]
260 pub fn nat_config(&self) -> Option<NatConfig> {
261 self.nat_config.clone()
262 }
263
264 pub async fn start_nat_traversal(&self) -> Result<bool, AgentError> {
285 let config = self.nat_config.clone().unwrap_or_default();
286 if !config.enabled {
287 tracing::debug!("NAT traversal disabled in config; skipping start");
288 return Ok(false);
289 }
290
291 let mut nat = NatTraversal::new(config, self.overlay_port);
292 match nat.gather_candidates().await {
293 Ok(candidates) => {
294 tracing::info!(
295 count = candidates.len(),
296 "Gathered NAT candidates for overlay manager",
297 );
298 let now = std::time::SystemTime::now()
299 .duration_since(std::time::UNIX_EPOCH)
300 .unwrap_or_default()
301 .as_secs();
302 self.nat_last_refresh.store(now, Ordering::SeqCst);
303 *self.nat_traversal.write().await = Some(nat);
304 Ok(true)
305 }
306 Err(e) => {
307 tracing::warn!(error = %e, "NAT candidate gathering failed");
308 Ok(false)
309 }
310 }
311 }
312
313 pub async fn nat_maintenance_tick(&self) -> Result<(), AgentError> {
326 let mut guard = self.nat_traversal.write().await;
327 let Some(nat) = guard.as_mut() else {
328 return Ok(());
329 };
330
331 match nat.refresh().await {
332 Ok(changed) => {
333 if changed {
334 tracing::info!("NAT reflexive address changed during refresh");
335 }
336 let now = std::time::SystemTime::now()
337 .duration_since(std::time::UNIX_EPOCH)
338 .unwrap_or_default()
339 .as_secs();
340 self.nat_last_refresh.store(now, Ordering::SeqCst);
341 Ok(())
342 }
343 Err(e) => Err(AgentError::Network(format!(
344 "NAT maintenance tick failed: {e}"
345 ))),
346 }
347 }
348
349 pub async fn nat_status_snapshot(&self) -> NatStatusSnapshot {
356 let guard = self.nat_traversal.read().await;
357 let Some(nat) = guard.as_ref() else {
358 return NatStatusSnapshot::empty();
359 };
360 let candidates = nat.local_candidates().to_vec();
361 let last_refresh = self.nat_last_refresh.load(Ordering::SeqCst);
362 let peers: Vec<NatPeerSnapshot> = Vec::new();
367 NatStatusSnapshot {
368 candidates,
369 peers,
370 last_refresh,
371 }
372 }
373
374 pub fn set_dns_config(&mut self, addr: Option<SocketAddr>, domain: Option<String>) {
382 self.dns_server_addr = addr;
383 self.dns_domain = domain;
384 }
385
386 #[must_use]
388 pub fn with_dns_config(mut self, addr: Option<SocketAddr>, domain: Option<String>) -> Self {
389 self.dns_server_addr = addr;
390 self.dns_domain = domain;
391 self
392 }
393
394 #[must_use]
396 pub fn dns_server_addr(&self) -> Option<SocketAddr> {
397 self.dns_server_addr
398 }
399
400 #[must_use]
402 pub fn dns_domain(&self) -> Option<&str> {
403 self.dns_domain.as_deref()
404 }
405
406 pub async fn setup_global_overlay(&mut self) -> Result<(), AgentError> {
411 if self.global_transport.is_some() {
415 tracing::debug!(
416 deployment = %self.deployment,
417 "Global overlay already active, reusing existing transport"
418 );
419 return Ok(());
420 }
421
422 let interface_name = make_interface_name(&[&self.deployment], "g");
423
424 let (private_key, public_key) = OverlayTransport::generate_keys()
425 .await
426 .map_err(|e| AgentError::Network(format!("Failed to generate keys: {e}")))?;
427
428 let node_ip = self.ip_allocator.allocate()?;
429 let config = self.build_config(private_key, public_key, node_ip, 16, self.overlay_port);
430 let mut transport = OverlayTransport::new(config, interface_name.clone());
431
432 transport
433 .create_interface()
434 .await
435 .map_err(|e| AgentError::Network(format!("Failed to create global overlay: {e}")))?;
436 transport
437 .configure(&[])
438 .await
439 .map_err(|e| AgentError::Network(format!("Failed to configure global overlay: {e}")))?;
440
441 let actual_name = transport.interface_name().to_string();
443
444 self.node_ip = Some(node_ip);
445 self.global_interface = Some(actual_name);
446 self.global_transport = Some(transport);
447 Ok(())
448 }
449
450 pub async fn setup_service_overlay(&self, service_name: &str) -> Result<String, AgentError> {
455 let mut transports = self.service_transports.write().await;
461
462 if let Some(existing) = transports.get(service_name) {
463 let existing_name = existing.interface_name().to_string();
464 tracing::debug!(
465 service = %service_name,
466 interface = %existing_name,
467 "Service overlay already active, reusing existing transport"
468 );
469 drop(transports);
470 return Ok(existing_name);
471 }
472
473 let interface_name = make_interface_name(&[&self.deployment, service_name], "s");
474
475 match self
478 .build_service_transport(&interface_name, service_name)
479 .await
480 {
481 Ok(transport) => {
482 let actual_name = transport.interface_name().to_string();
483 transports.insert(service_name.to_string(), transport);
484 drop(transports);
485 tracing::info!(
486 service = %service_name,
487 interface = %actual_name,
488 "Service overlay created"
489 );
490 self.service_interfaces
492 .write()
493 .await
494 .insert(service_name.to_string(), actual_name.clone());
495 Ok(actual_name)
496 }
497 Err(e) => {
498 drop(transports);
499 tracing::warn!(
500 service = %service_name,
501 error = %e,
502 "Overlay unavailable, using direct networking"
503 );
504 self.service_interfaces
507 .write()
508 .await
509 .insert(service_name.to_string(), interface_name.clone());
510 Ok(interface_name)
511 }
512 }
513 }
514
515 async fn build_service_transport(
519 &self,
520 interface_name: &str,
521 service_name: &str,
522 ) -> Result<OverlayTransport, AgentError> {
523 let (private_key, public_key) = OverlayTransport::generate_keys()
524 .await
525 .map_err(|e| AgentError::Network(format!("Failed to generate keys: {e}")))?;
526
527 let service_ip = self.ip_allocator.allocate_for_service(service_name)?;
528 let config = self.build_config(private_key, public_key, service_ip, 24, 0);
529 let mut transport = OverlayTransport::new(config, interface_name.to_string());
530
531 transport
532 .create_interface()
533 .await
534 .map_err(|e| AgentError::Network(format!("Failed to create service overlay: {e}")))?;
535 transport.configure(&[]).await.map_err(|e| {
536 AgentError::Network(format!("Failed to configure service overlay: {e}"))
537 })?;
538
539 Ok(transport)
540 }
541
542 #[cfg_attr(
555 not(target_os = "linux"),
556 allow(clippy::needless_return, clippy::unused_async)
557 )]
558 pub async fn attach_container(
559 &self,
560 container_pid: u32,
561 service_name: &str,
562 join_global: bool,
563 ) -> Result<IpAddr, AgentError> {
564 #[cfg(not(target_os = "linux"))]
567 {
568 let _ = (container_pid, join_global);
570 tracing::debug!(
571 service = %service_name,
572 "Skipping per-container overlay attachment (not supported on this platform). \
573 Containers will use the node's overlay IP via host networking."
574 );
575 return Ok(self.node_ip.unwrap_or(IpAddr::V4(Ipv4Addr::LOCALHOST)));
576 }
577
578 #[cfg(target_os = "linux")]
579 {
580 let interfaces = self.service_interfaces.read().await;
581 let service_iface = interfaces.get(service_name).ok_or_else(|| {
582 AgentError::Network(format!("No overlay for service: {service_name}"))
583 })?;
584
585 let container_ip = self.ip_allocator.allocate()?;
586 self.attach_to_interface(
587 container_pid,
588 service_iface,
589 container_ip,
590 "s",
591 "eth0",
592 true,
593 )
594 .await?;
595
596 if join_global {
597 if let Some(global_iface) = &self.global_interface {
598 let global_ip = self.ip_allocator.allocate()?;
599 self.attach_to_interface(
600 container_pid,
601 global_iface,
602 global_ip,
603 "g",
604 "eth1",
605 false,
606 )
607 .await?;
608 }
609 }
610
611 Ok(container_ip)
612 }
613 }
614
615 #[cfg(target_os = "windows")]
616 pub async fn attach_container_hcn(
643 &self,
644 namespace_id: windows::core::GUID,
645 service_name: &str,
646 ip_override: Option<std::net::IpAddr>,
647 autoclean: bool,
648 dns_server: Option<std::net::IpAddr>,
649 dns_domain: Option<String>,
650 ) -> Result<std::net::IpAddr, AgentError> {
651 let ip = match ip_override {
652 Some(ip) => ip,
653 None => self.ip_allocator.allocate()?,
654 };
655 if autoclean {
656 let mut cleanup = self.hcn_cleanup.lock().await;
657 cleanup.insert(namespace_id, (service_name.to_string(), ip));
658 }
659 tracing::info!(
660 ns = ?namespace_id,
661 service = %service_name,
662 ip = %ip,
663 dns_server = ?dns_server,
664 dns_domain = ?dns_domain,
665 "Attached container to HCN overlay",
666 );
667 Ok(ip)
668 }
669
670 #[cfg(target_os = "windows")]
671 pub async fn detach_container_hcn(
682 &self,
683 namespace_id: windows::core::GUID,
684 ) -> Result<(), AgentError> {
685 let mut cleanup = self.hcn_cleanup.lock().await;
686 if let Some((service_name, ip)) = cleanup.remove(&namespace_id) {
687 tracing::info!(ns = ?namespace_id, service = %service_name, ip = %ip, "Released HCN overlay attachment");
688 }
689 Ok(())
690 }
691
692 #[cfg(target_os = "linux")]
693 #[allow(clippy::too_many_lines)]
694 async fn attach_to_interface(
695 &self,
696 container_pid: u32,
697 _interface: &str,
698 ip: IpAddr,
699 tag: &str,
700 container_iface: &str,
701 add_default_route: bool,
702 ) -> Result<(), AgentError> {
703 self.sweep_orphan_veths().await;
705
706 let is_v6 = ip.is_ipv6();
707 let prefix_len: u8 = if is_v6 { 64 } else { 24 };
708 let host_prefix: u8 = if is_v6 { 128 } else { 32 };
709
710 let veth_host = format!("veth-{container_pid}-{tag}");
711 let veth_pending = format!("vc-{container_pid}-{tag}");
712 let veth_container = container_iface.to_string();
713
714 let container_ns_fd = std::os::fd::OwnedFd::from(
717 std::fs::File::open(format!("/proc/{container_pid}/ns/net")).map_err(|e| {
718 AgentError::Network(format!("Failed to open /proc/{container_pid}/ns/net: {e}"))
719 })?,
720 );
721
722 crate::netlink::delete_link_by_name(&veth_host)
725 .await
726 .map_err(|e| AgentError::Network(format!("pre-cleanup delete {veth_host}: {e}")))?;
727 crate::netlink::delete_link_by_name(&veth_pending)
728 .await
729 .map_err(|e| AgentError::Network(format!("pre-cleanup delete {veth_pending}: {e}")))?;
730
731 let result: Result<(), AgentError> = async {
733 crate::netlink::create_veth_pair(&veth_host, &veth_pending)
735 .await
736 .map_err(|e| AgentError::Network(format!("create veth pair: {e}")))?;
737
738 crate::netlink::move_link_into_netns_fd_and_rename(
741 &veth_pending,
742 AsFd::as_fd(&container_ns_fd),
743 &veth_container,
744 )
745 .map_err(|e| AgentError::Network(format!("move veth into netns: {e}")))?;
746
747 let vc = veth_container.clone();
751 tokio::task::spawn_blocking(move || {
752 crate::netlink::with_netns_fd_async(container_ns_fd, move || async move {
753 crate::netlink::add_address_to_link_by_name(&vc, ip, prefix_len).await?;
754 crate::netlink::set_link_up_by_name(&vc).await?;
755 crate::netlink::set_link_up_by_name("lo").await?;
756 if add_default_route {
757 crate::netlink::add_default_route_via_dev(&vc, is_v6).await?;
758 }
759 Ok(())
760 })
761 })
762 .await
763 .map_err(|e| AgentError::Network(format!("container netns task panicked: {e}")))?
764 .map_err(|e| AgentError::Network(format!("container netns ops: {e}")))?;
765
766 crate::netlink::set_link_up_by_name(&veth_host)
768 .await
769 .map_err(|e| AgentError::Network(format!("set {veth_host} up: {e}")))?;
770
771 crate::netlink::replace_route_via_dev(ip, host_prefix, &veth_host, self.node_ip)
773 .await
774 .map_err(|e| {
775 AgentError::Network(format!("host route for {ip}/{host_prefix}: {e}"))
776 })?;
777
778 let _ = crate::netlink::set_sysctl("net.ipv4.ip_forward", "1");
780 let _ = crate::netlink::set_sysctl("net.ipv6.conf.all.forwarding", "1");
781
782 Ok(())
783 }
784 .await;
785
786 if result.is_err() {
789 let _ = crate::netlink::delete_link_by_name(&veth_host).await;
790 let _ = crate::netlink::delete_link_by_name(&veth_pending).await;
791 }
792
793 result
794 }
795
796 #[cfg(target_os = "linux")]
803 async fn sweep_orphan_veths(&self) {
804 let links = match crate::netlink::list_all_links().await {
805 Ok(links) => links,
806 Err(e) => {
807 tracing::warn!(error = %e, "Failed to list links for orphan sweep");
808 return;
809 }
810 };
811
812 for (_index, name) in links {
813 let remainder = if let Some(r) = name.strip_prefix("veth-") {
815 r
816 } else if let Some(r) = name.strip_prefix("vc-") {
817 r
818 } else {
819 continue;
820 };
821
822 let Some(pid_str) = remainder.split('-').next() else {
824 continue;
825 };
826
827 let pid: u32 = match pid_str.parse() {
828 Ok(p) => p,
829 Err(_) => continue,
830 };
831
832 if std::path::Path::new(&format!("/proc/{pid}")).exists() {
834 continue;
835 }
836
837 tracing::info!(link = %name, pid = pid, "Deleting orphan veth");
838 if let Err(e) = crate::netlink::delete_link_by_name(&name).await {
839 tracing::warn!(link = %name, error = %e, "Failed to delete orphan veth");
840 }
841 }
842 }
843
844 pub async fn teardown_service_overlay(&self, service_name: &str) {
850 if let Some(mut transport) = self.service_transports.write().await.remove(service_name) {
852 tracing::info!(service = %service_name, "Shutting down service overlay transport");
853 transport.shutdown();
854 }
855
856 if let Some(iface) = self.service_interfaces.write().await.remove(service_name) {
858 tracing::info!(
859 service = %service_name,
860 interface = %iface,
861 "Removed service overlay interface"
862 );
863 }
864 }
865
866 pub async fn cleanup(&mut self) -> Result<(), AgentError> {
871 let mut transports = self.service_transports.write().await;
873 for (name, mut transport) in transports.drain() {
874 tracing::info!(service = %name, "Shutting down service overlay");
875 transport.shutdown();
876 }
877 drop(transports);
878
879 if let Some(mut transport) = self.global_transport.take() {
881 tracing::info!("Shutting down global overlay");
882 transport.shutdown();
883 }
884
885 self.service_interfaces.write().await.clear();
887 self.global_interface = None;
888
889 Ok(())
890 }
891
892 pub fn node_ip(&self) -> Option<IpAddr> {
896 self.node_ip
897 }
898
899 pub fn deployment(&self) -> &str {
901 &self.deployment
902 }
903
904 pub fn global_interface(&self) -> Option<&str> {
906 self.global_interface.as_deref()
907 }
908
909 pub fn overlay_port(&self) -> u16 {
911 self.overlay_port
912 }
913
914 pub fn has_global_transport(&self) -> bool {
916 self.global_transport.is_some()
917 }
918
919 pub async fn service_transport_count(&self) -> usize {
921 self.service_transports.read().await.len()
922 }
923
924 pub fn overlay_cidr(&self) -> String {
926 match self.ip_allocator.base {
927 IpAddr::V4(_) => format!("{}/16", self.ip_allocator.base),
928 IpAddr::V6(_) => format!("{}/48", self.ip_allocator.base),
929 }
930 }
931
932 pub fn slice_cidr(&self) -> Option<IpNetwork> {
935 self.slice_cidr
936 }
937
938 pub fn cluster_cidr(&self) -> Option<IpNetwork> {
942 self.cluster_cidr
943 }
944
945 pub async fn persist_ipam_state(&self, path: &Path) -> Result<(), AgentError> {
954 self.ip_allocator.save(path).await
955 }
956
957 pub async fn restore_ipam_state(&mut self, path: &Path) -> Result<(), AgentError> {
967 self.ip_allocator.restore(path).await
968 }
969
970 pub fn ip_alloc_stats(&self) -> (u64, IpAddr) {
972 let offset = self
973 .ip_allocator
974 .next_offset
975 .load(std::sync::atomic::Ordering::SeqCst);
976 (offset.saturating_sub(1), self.ip_allocator.base)
977 }
978
979 fn build_config(
980 &self,
981 private_key: String,
982 public_key: String,
983 ip: IpAddr,
984 mask: u8,
985 listen_port: u16,
986 ) -> OverlayConfig {
987 let local_addr = match ip {
989 IpAddr::V4(_) => IpAddr::V4(Ipv4Addr::UNSPECIFIED),
990 IpAddr::V6(_) => IpAddr::V6(Ipv6Addr::UNSPECIFIED),
991 };
992 let mut config = OverlayConfig {
993 local_endpoint: SocketAddr::new(local_addr, listen_port),
994 private_key,
995 public_key,
996 overlay_cidr: format!("{ip}/{mask}"),
997 ..OverlayConfig::default()
998 };
999 if let Some(nat) = self.nat_config.clone() {
1000 config.nat = nat;
1001 }
1002 config
1003 }
1004}
1005
1006struct IpAllocator {
1017 base: IpAddr,
1021 cidr: IpNetwork,
1024 next_offset: AtomicU64,
1026}
1027
1028#[derive(Debug, Serialize, Deserialize)]
1034struct IpAllocatorState {
1035 cidr: String,
1036 next_offset: u64,
1037}
1038
1039impl IpAllocator {
1040 fn new(cidr: IpNetwork) -> Self {
1041 Self {
1042 base: cidr.network(),
1043 cidr,
1044 next_offset: AtomicU64::new(1),
1045 }
1046 }
1047
1048 #[allow(clippy::cast_possible_truncation)]
1049 fn compute_addr(&self, offset: u64) -> IpAddr {
1050 match self.base {
1051 IpAddr::V4(base_v4) => {
1052 let base_u32 = u32::from_be_bytes(base_v4.octets());
1053 let addr = base_u32.wrapping_add(offset as u32);
1054 IpAddr::V4(Ipv4Addr::from(addr.to_be_bytes()))
1055 }
1056 IpAddr::V6(base_v6) => {
1057 let base_u128 = u128::from(base_v6);
1058 let addr = base_u128.wrapping_add(u128::from(offset));
1059 IpAddr::V6(Ipv6Addr::from(addr))
1060 }
1061 }
1062 }
1063
1064 fn allocate(&self) -> Result<IpAddr, AgentError> {
1070 let offset = self.next_offset.fetch_add(1, Ordering::SeqCst);
1074 let addr = self.compute_addr(offset);
1075
1076 let in_cidr = self.cidr.contains(addr);
1079 let is_v4_broadcast = matches!(
1080 (&self.cidr, &addr),
1081 (IpNetwork::V4(v4), IpAddr::V4(a)) if *a == v4.broadcast()
1082 );
1083 if !in_cidr || is_v4_broadcast {
1084 return Err(AgentError::Network(format!(
1085 "IP allocator exhausted: next address {addr} is outside slice {}",
1086 self.cidr
1087 )));
1088 }
1089 Ok(addr)
1090 }
1091
1092 fn allocate_for_service(&self, _service: &str) -> Result<IpAddr, AgentError> {
1093 self.allocate()
1094 }
1095
1096 async fn save(&self, path: &Path) -> Result<(), AgentError> {
1098 let state = IpAllocatorState {
1099 cidr: self.cidr.to_string(),
1100 next_offset: self.next_offset.load(Ordering::SeqCst),
1101 };
1102 let json = serde_json::to_vec_pretty(&state)
1103 .map_err(|e| AgentError::Network(format!("serialize ipam state: {e}")))?;
1104 if let Some(parent) = path.parent() {
1105 if !parent.as_os_str().is_empty() {
1106 tokio::fs::create_dir_all(parent).await.map_err(|e| {
1107 AgentError::Network(format!("create ipam state dir {}: {e}", parent.display()))
1108 })?;
1109 }
1110 }
1111 tokio::fs::write(path, json).await.map_err(|e| {
1112 AgentError::Network(format!("write ipam state {}: {e}", path.display()))
1113 })?;
1114 Ok(())
1115 }
1116
1117 async fn restore(&mut self, path: &Path) -> Result<(), AgentError> {
1124 let raw = match tokio::fs::read_to_string(path).await {
1125 Ok(s) => s,
1126 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(()),
1127 Err(e) => {
1128 return Err(AgentError::Network(format!(
1129 "read ipam state {}: {e}",
1130 path.display()
1131 )));
1132 }
1133 };
1134 let state: IpAllocatorState = serde_json::from_str(&raw).map_err(|e| {
1135 AgentError::Network(format!("parse ipam state {}: {e}", path.display()))
1136 })?;
1137
1138 if state.cidr != self.cidr.to_string() {
1139 tracing::warn!(
1140 persisted_cidr = %state.cidr,
1141 current_cidr = %self.cidr,
1142 path = %path.display(),
1143 "IPAM state CIDR mismatch; ignoring persisted counter"
1144 );
1145 return Ok(());
1146 }
1147
1148 self.next_offset.store(state.next_offset, Ordering::SeqCst);
1149 Ok(())
1150 }
1151
1152 #[allow(dead_code)]
1158 async fn load(path: &Path, cidr: IpNetwork) -> Result<Self, AgentError> {
1159 let mut alloc = Self::new(cidr);
1160 alloc.restore(path).await?;
1161 Ok(alloc)
1162 }
1163}
1164
1165#[cfg(test)]
1166mod tests {
1167 use super::*;
1168
1169 #[test]
1171 fn interface_name_never_exceeds_limit() {
1172 let cases: Vec<(&[&str], &str)> = vec![
1173 (&["a"], "g"),
1174 (&["zlayer-manager"], "g"),
1175 (&["my-very-long-deployment-name-that-goes-on-and-on"], "g"),
1176 (&["zlayer", "manager"], "s"),
1177 (&["zlayer-manager", "frontend-service"], "s"),
1178 (&["a", "b"], "s"),
1179 (
1180 &["abcdefghijklmnopqrstuvwxyz", "abcdefghijklmnopqrstuvwxyz"],
1181 "s",
1182 ),
1183 (&["x"], ""),
1184 (&["deployment"], ""),
1185 (&["a-really-long-name-exceeding-everything"], "suffix"),
1186 ];
1187
1188 for (parts, suffix) in &cases {
1189 let name = make_interface_name(parts, suffix);
1190 assert!(
1191 name.len() <= MAX_IFNAME_LEN,
1192 "Name '{}' is {} chars (parts={:?}, suffix='{}')",
1193 name,
1194 name.len(),
1195 parts,
1196 suffix,
1197 );
1198 }
1199 }
1200
1201 #[test]
1203 fn interface_name_with_extreme_lengths() {
1204 let long = "a".repeat(200);
1205 let long_ref = long.as_str();
1206
1207 let name = make_interface_name(&[long_ref], "g");
1208 assert!(name.len() <= MAX_IFNAME_LEN, "Name '{name}' too long");
1209
1210 let name = make_interface_name(&[long_ref, long_ref, long_ref], "s");
1211 assert!(name.len() <= MAX_IFNAME_LEN, "Name '{name}' too long");
1212
1213 let name = make_interface_name(&[long_ref], "");
1214 assert!(name.len() <= MAX_IFNAME_LEN, "Name '{name}' too long");
1215 }
1216
1217 #[test]
1219 fn interface_name_with_empty_inputs() {
1220 let name = make_interface_name(&[""], "");
1221 assert!(name.len() <= MAX_IFNAME_LEN);
1222 assert!(name.starts_with("zl-"));
1223
1224 let name = make_interface_name(&["", ""], "s");
1225 assert!(name.len() <= MAX_IFNAME_LEN);
1226 assert!(name.starts_with("zl-"));
1227
1228 let name = make_interface_name(&[], "g");
1229 assert!(name.len() <= MAX_IFNAME_LEN);
1230 assert!(name.starts_with("zl-"));
1231 }
1232
1233 #[test]
1235 fn interface_name_is_deterministic() {
1236 let a = make_interface_name(&["zlayer-manager"], "g");
1237 let b = make_interface_name(&["zlayer-manager"], "g");
1238 assert_eq!(a, b);
1239
1240 let a = make_interface_name(&["deploy", "frontend"], "s");
1241 let b = make_interface_name(&["deploy", "frontend"], "s");
1242 assert_eq!(a, b);
1243 }
1244
1245 #[test]
1247 fn interface_name_uniqueness() {
1248 let a = make_interface_name(&["deploy-a"], "g");
1249 let b = make_interface_name(&["deploy-b"], "g");
1250 assert_ne!(a, b, "Different deployments should yield different names");
1251
1252 let a = make_interface_name(&["deploy", "svc-a"], "s");
1253 let b = make_interface_name(&["deploy", "svc-b"], "s");
1254 assert_ne!(a, b, "Different services should yield different names");
1255
1256 let a = make_interface_name(&["deploy"], "g");
1257 let b = make_interface_name(&["deploy"], "s");
1258 assert_ne!(a, b, "Different suffixes should yield different names");
1259 }
1260
1261 #[test]
1263 fn interface_name_short_inputs_are_readable() {
1264 let name = make_interface_name(&["app"], "g");
1266 assert_eq!(name, "zl-app-g");
1267
1268 let name = make_interface_name(&["my", "web"], "s");
1270 assert_eq!(name, "zl-my-web-s");
1271 }
1272
1273 #[test]
1275 fn global_overlay_realistic_names() {
1276 let deployments = [
1277 "zlayer-manager",
1278 "my-very-long-deployment-name",
1279 "a",
1280 "production",
1281 "zlayer",
1282 ];
1283
1284 for deployment in &deployments {
1285 let name = make_interface_name(&[deployment], "g");
1286 assert!(
1287 name.len() <= MAX_IFNAME_LEN,
1288 "Global overlay '{name}' for deployment '{deployment}' exceeds limit",
1289 );
1290 assert!(name.starts_with("zl-"));
1291 }
1292 }
1293
1294 #[test]
1296 fn service_overlay_realistic_names() {
1297 let cases = [
1298 ("zlayer-manager", "frontend"),
1299 ("zlayer-manager", "backend-api"),
1300 ("zlayer", "manager"),
1301 ("a", "b"),
1302 ("production", "auth-service-primary"),
1303 ("my-long-deploy", "my-long-service"),
1304 ];
1305
1306 for (deployment, service) in &cases {
1307 let name = make_interface_name(&[deployment, service], "s");
1308 assert!(
1309 name.len() <= MAX_IFNAME_LEN,
1310 "Service overlay '{name}' for ({deployment}, {service}) exceeds limit",
1311 );
1312 assert!(name.starts_with("zl-"));
1313 }
1314 }
1315
1316 #[test]
1318 fn interface_name_with_unicode() {
1319 let name = make_interface_name(&["\u{1F600}\u{1F600}\u{1F600}"], "g");
1320 assert!(name.len() <= MAX_IFNAME_LEN, "Name '{name}' too long");
1321
1322 let name = make_interface_name(&["\u{00E9}\u{00E9}\u{00E9}", "\u{00FC}\u{00FC}"], "s");
1323 assert!(name.len() <= MAX_IFNAME_LEN, "Name '{name}' too long");
1324 }
1325
1326 #[tokio::test]
1328 async fn test_node_ip_before_and_after_init() {
1329 let om = OverlayManager::new("test-deploy".to_string())
1330 .await
1331 .unwrap();
1332
1333 assert!(
1335 om.node_ip().is_none(),
1336 "node_ip should be None before setup_global_overlay"
1337 );
1338 }
1339
1340 #[test]
1342 fn ip_allocator_v4_sequential() {
1343 let alloc = IpAllocator::new("10.200.0.0/16".parse().unwrap());
1344 let ip1 = alloc.allocate().unwrap();
1345 let ip2 = alloc.allocate().unwrap();
1346 let ip3 = alloc.allocate().unwrap();
1347 assert_eq!(ip1, IpAddr::V4(Ipv4Addr::new(10, 200, 0, 1)));
1348 assert_eq!(ip2, IpAddr::V4(Ipv4Addr::new(10, 200, 0, 2)));
1349 assert_eq!(ip3, IpAddr::V4(Ipv4Addr::new(10, 200, 0, 3)));
1350 }
1351
1352 #[test]
1354 fn ip_allocator_v6_sequential() {
1355 let alloc = IpAllocator::new("fd00:200::0/48".parse().unwrap());
1356 let ip1 = alloc.allocate().unwrap();
1357 let ip2 = alloc.allocate().unwrap();
1358 let ip3 = alloc.allocate().unwrap();
1359 assert_eq!(ip1, "fd00:200::1".parse::<IpAddr>().unwrap());
1360 assert_eq!(ip2, "fd00:200::2".parse::<IpAddr>().unwrap());
1361 assert_eq!(ip3, "fd00:200::3".parse::<IpAddr>().unwrap());
1362 }
1363
1364 #[test]
1366 fn ip_allocator_service_delegates() {
1367 let alloc = IpAllocator::new("fd00:200::0/48".parse().unwrap());
1368 let ip1 = alloc.allocate_for_service("web").unwrap();
1369 let ip2 = alloc.allocate().unwrap();
1370 assert_eq!(ip1, "fd00:200::1".parse::<IpAddr>().unwrap());
1371 assert_eq!(ip2, "fd00:200::2".parse::<IpAddr>().unwrap());
1372 }
1373
1374 #[test]
1377 fn test_allocator_bounded_to_slice_v4() {
1378 let slice: IpNetwork = "10.200.42.0/28".parse().unwrap();
1379 let alloc = IpAllocator::new(slice);
1380
1381 let mut allocated = Vec::new();
1382 for _ in 0..14 {
1383 let ip = alloc
1384 .allocate()
1385 .expect("first 14 allocations should succeed");
1386 allocated.push(ip);
1387 }
1388
1389 for ip in &allocated {
1391 assert!(
1392 slice.contains(*ip),
1393 "Allocated IP {ip} outside slice {slice}"
1394 );
1395 }
1396
1397 let exhausted = alloc.allocate();
1399 assert!(
1400 exhausted.is_err(),
1401 "allocation past /28 exhaustion should fail, got {exhausted:?}"
1402 );
1403 }
1404
1405 #[test]
1408 fn test_allocator_rejects_oob() {
1409 let slice: IpNetwork = "10.200.42.16/28".parse().unwrap();
1410 let alloc = IpAllocator::new(slice);
1411
1412 for _ in 0..14 {
1415 let ip = alloc.allocate().expect("host allocation should succeed");
1416 assert!(slice.contains(ip), "Allocation {ip} escaped slice {slice}");
1417 if let (IpAddr::V4(a), IpNetwork::V4(v4)) = (ip, slice) {
1419 assert_ne!(a, v4.broadcast(), "handed out broadcast address");
1420 assert_ne!(a, v4.network(), "handed out network address");
1421 }
1422 }
1423
1424 assert!(alloc.allocate().is_err());
1426 }
1427
1428 #[test]
1430 fn test_overlay_manager_with_slice_stores_slice_cidr() {
1431 let cluster: IpNetwork = "10.200.0.0/16".parse().unwrap();
1432 let slice: IpNetwork = "10.200.42.0/28".parse().unwrap();
1433
1434 let om = OverlayManager::with_slice("test-deploy".to_string(), cluster, slice, 51820);
1435
1436 assert_eq!(om.slice_cidr(), Some(slice));
1437 assert_eq!(om.cluster_cidr(), Some(cluster));
1438 assert_eq!(om.overlay_port(), 51820);
1439 assert_eq!(om.deployment(), "test-deploy");
1440 }
1441
1442 #[tokio::test]
1445 async fn test_allocator_persistence_roundtrip() {
1446 let slice: IpNetwork = "10.200.42.0/28".parse().unwrap();
1447 let alloc = IpAllocator::new(slice);
1448
1449 let a1 = alloc.allocate().unwrap();
1450 let a2 = alloc.allocate().unwrap();
1451 let a3 = alloc.allocate().unwrap();
1452 assert_eq!(a1, IpAddr::V4(Ipv4Addr::new(10, 200, 42, 1)));
1453 assert_eq!(a2, IpAddr::V4(Ipv4Addr::new(10, 200, 42, 2)));
1454 assert_eq!(a3, IpAddr::V4(Ipv4Addr::new(10, 200, 42, 3)));
1455
1456 let dir = tempfile::tempdir().expect("tempdir");
1457 let state_path = dir.path().join("agent_ipam.json");
1458 alloc.save(&state_path).await.expect("save");
1459
1460 let restored = IpAllocator::load(&state_path, slice).await.expect("load");
1461 let a4 = restored.allocate().unwrap();
1462 assert_eq!(
1463 a4,
1464 IpAddr::V4(Ipv4Addr::new(10, 200, 42, 4)),
1465 "restored allocator should continue from the persisted counter"
1466 );
1467
1468 let missing_path = dir.path().join("does-not-exist.json");
1470 let mut fresh = IpAllocator::new(slice);
1471 fresh.restore(&missing_path).await.expect("restore missing");
1472 let first = fresh.allocate().unwrap();
1473 assert_eq!(first, IpAddr::V4(Ipv4Addr::new(10, 200, 42, 1)));
1474 }
1475
1476 #[cfg(target_os = "windows")]
1480 #[tokio::test]
1481 async fn test_attach_detach_container_hcn_tracks_cleanup_map() {
1482 let cluster: IpNetwork = "10.200.0.0/16".parse().unwrap();
1483 let slice: IpNetwork = "10.200.42.0/28".parse().unwrap();
1484 let om = OverlayManager::with_slice("test-deploy".to_string(), cluster, slice, 51820);
1485
1486 let ns = windows::core::GUID::zeroed();
1487 let fixed_ip: std::net::IpAddr = "10.200.42.5".parse().unwrap();
1488
1489 let ip = om
1491 .attach_container_hcn(ns, "svc-a", Some(fixed_ip), true, None, None)
1492 .await
1493 .expect("attach_container_hcn");
1494 assert_eq!(ip, fixed_ip);
1495 {
1496 let map = om.hcn_cleanup.lock().await;
1497 assert_eq!(map.len(), 1);
1498 let entry = map.get(&ns).expect("entry for zeroed GUID");
1499 assert_eq!(entry.0, "svc-a");
1500 assert_eq!(entry.1, fixed_ip);
1501 }
1502
1503 om.detach_container_hcn(ns).await.expect("detach");
1505 {
1506 let map = om.hcn_cleanup.lock().await;
1507 assert!(map.is_empty(), "detach should leave the cleanup map empty");
1508 }
1509
1510 om.detach_container_hcn(ns)
1512 .await
1513 .expect("unknown GUID is no-op");
1514
1515 let _ip = om
1517 .attach_container_hcn(ns, "svc-b", Some(fixed_ip), false, None, None)
1518 .await
1519 .expect("attach without autoclean");
1520 {
1521 let map = om.hcn_cleanup.lock().await;
1522 assert!(map.is_empty(), "autoclean=false should not populate map");
1523 }
1524 }
1525
1526 #[tokio::test]
1529 async fn dns_config_defaults_to_none() {
1530 let om = OverlayManager::new("dns-default".to_string())
1531 .await
1532 .expect("OverlayManager::new");
1533 assert!(om.dns_server_addr().is_none());
1534 assert!(om.dns_domain().is_none());
1535 }
1536
1537 #[tokio::test]
1540 async fn dns_config_set_and_round_trip() {
1541 let mut om = OverlayManager::new("dns-roundtrip".to_string())
1542 .await
1543 .expect("OverlayManager::new");
1544 let addr: SocketAddr = "10.200.42.1:15353".parse().unwrap();
1545 om.set_dns_config(Some(addr), Some("overlay.local".to_string()));
1546 assert_eq!(om.dns_server_addr(), Some(addr));
1547 assert_eq!(om.dns_domain(), Some("overlay.local"));
1548
1549 om.set_dns_config(None, None);
1551 assert!(om.dns_server_addr().is_none());
1552 assert!(om.dns_domain().is_none());
1553 }
1554
1555 #[test]
1558 fn with_dns_config_preserves_values() {
1559 let cluster: IpNetwork = "10.200.0.0/16".parse().unwrap();
1560 let slice: IpNetwork = "10.200.42.0/28".parse().unwrap();
1561 let addr: SocketAddr = "10.200.42.1:15353".parse().unwrap();
1562 let om = OverlayManager::with_slice("dns-builder".to_string(), cluster, slice, 51820)
1563 .with_dns_config(Some(addr), Some("overlay.local".to_string()));
1564 assert_eq!(om.dns_server_addr(), Some(addr));
1565 assert_eq!(om.dns_domain(), Some("overlay.local"));
1566 }
1567}