1use crate::error::AgentError;
2use ipnetwork::IpNetwork;
3use serde::{Deserialize, Serialize};
4use std::collections::hash_map::DefaultHasher;
5use std::collections::HashMap;
6use std::hash::{Hash, Hasher};
7use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
8#[cfg(target_os = "linux")]
9use std::os::fd::AsFd;
10use std::path::Path;
11use std::sync::atomic::{AtomicU64, Ordering};
12use tokio::sync::RwLock;
13use zlayer_overlay::{
14 NatConfig, NatPeerSnapshot, NatStatusSnapshot, NatTraversal, OverlayConfig, OverlayTransport,
15};
16
17const MAX_IFNAME_LEN: usize = 15;
19
20#[must_use]
26pub fn make_interface_name(parts: &[&str], suffix: &str) -> String {
27 let base = format!("zl-{}", parts.join("-"));
28 let candidate = if suffix.is_empty() {
29 base
30 } else {
31 format!("{base}-{suffix}")
32 };
33
34 if candidate.len() <= MAX_IFNAME_LEN {
35 return candidate;
36 }
37
38 let mut hasher = DefaultHasher::new();
40 for part in parts {
41 part.hash(&mut hasher);
42 }
43 suffix.hash(&mut hasher);
44 let hash = format!("{:x}", hasher.finish());
45
46 if suffix.is_empty() {
47 let budget = MAX_IFNAME_LEN - 3;
49 format!("zl-{}", &hash[..budget.min(hash.len())])
50 } else {
51 let suffix_cost = 1 + suffix.len(); let hash_budget = MAX_IFNAME_LEN.saturating_sub(3 + suffix_cost);
54 if hash_budget == 0 {
55 let budget = MAX_IFNAME_LEN - 3;
57 format!("zl-{}", &hash[..budget.min(hash.len())])
58 } else {
59 format!("zl-{}-{}", &hash[..hash_budget.min(hash.len())], suffix)
60 }
61 }
62}
63
64pub struct OverlayManager {
66 deployment: String,
68 global_interface: Option<String>,
70 global_transport: Option<OverlayTransport>,
72 service_interfaces: RwLock<HashMap<String, String>>,
74 service_transports: RwLock<HashMap<String, OverlayTransport>>,
76 ip_allocator: IpAllocator,
78 node_ip: Option<IpAddr>,
81 overlay_port: u16,
83 cluster_cidr: Option<IpNetwork>,
87 slice_cidr: Option<IpNetwork>,
91 #[cfg(target_os = "windows")]
95 hcn_cleanup: std::sync::Arc<
96 tokio::sync::Mutex<
97 std::collections::HashMap<windows::core::GUID, (String, std::net::IpAddr)>,
98 >,
99 >,
100 dns_server_addr: Option<SocketAddr>,
106 dns_domain: Option<String>,
112 nat_config: Option<NatConfig>,
117 uapi_sock_dir: Option<std::path::PathBuf>,
125 nat_traversal: tokio::sync::RwLock<Option<NatTraversal>>,
131 nat_last_refresh: AtomicU64,
134}
135
136impl OverlayManager {
137 #[allow(clippy::unused_async)]
152 pub async fn new(deployment: String) -> Result<Self, AgentError> {
153 tracing::debug!(
154 deployment = %deployment,
155 "OverlayManager::new uses full /16 default; cluster deployments should use with_slice"
156 );
157 let default_cidr: IpNetwork = "10.200.0.0/16".parse().unwrap();
158 Ok(Self {
159 deployment,
160 global_interface: None,
161 global_transport: None,
162 service_interfaces: RwLock::new(HashMap::new()),
163 service_transports: RwLock::new(HashMap::new()),
164 ip_allocator: IpAllocator::new(default_cidr),
165 node_ip: None,
166 overlay_port: zlayer_core::DEFAULT_WG_PORT,
167 cluster_cidr: Some(default_cidr),
168 slice_cidr: None,
169 #[cfg(target_os = "windows")]
170 hcn_cleanup: std::sync::Arc::new(tokio::sync::Mutex::new(
171 std::collections::HashMap::new(),
172 )),
173 dns_server_addr: None,
174 dns_domain: None,
175 nat_config: None,
176 uapi_sock_dir: None,
177 nat_traversal: tokio::sync::RwLock::new(None),
178 nat_last_refresh: AtomicU64::new(0),
179 })
180 }
181
182 #[must_use]
193 pub fn with_slice(
194 deployment: String,
195 cluster_cidr: IpNetwork,
196 slice_cidr: IpNetwork,
197 port: u16,
198 ) -> Self {
199 Self {
200 deployment,
201 global_interface: None,
202 global_transport: None,
203 service_interfaces: RwLock::new(HashMap::new()),
204 service_transports: RwLock::new(HashMap::new()),
205 ip_allocator: IpAllocator::new(slice_cidr),
206 node_ip: None,
207 overlay_port: port,
208 cluster_cidr: Some(cluster_cidr),
209 slice_cidr: Some(slice_cidr),
210 #[cfg(target_os = "windows")]
211 hcn_cleanup: std::sync::Arc::new(tokio::sync::Mutex::new(
212 std::collections::HashMap::new(),
213 )),
214 dns_server_addr: None,
215 dns_domain: None,
216 nat_config: None,
217 uapi_sock_dir: None,
218 nat_traversal: tokio::sync::RwLock::new(None),
219 nat_last_refresh: AtomicU64::new(0),
220 }
221 }
222
223 #[must_use]
225 pub fn with_overlay_port(mut self, port: u16) -> Self {
226 self.overlay_port = port;
227 self
228 }
229
230 #[must_use]
237 pub fn with_nat_config(mut self, nat: NatConfig) -> Self {
238 self.nat_config = Some(nat);
239 self
240 }
241
242 #[must_use]
255 pub fn with_uapi_sock_dir(mut self, dir: impl Into<std::path::PathBuf>) -> Self {
256 self.uapi_sock_dir = Some(dir.into());
257 self
258 }
259
260 pub async fn service_count(&self) -> usize {
268 self.service_interfaces.read().await.len()
269 }
270
271 #[must_use]
277 pub fn nat_enabled(&self) -> bool {
278 self.nat_config
279 .as_ref()
280 .map_or_else(|| NatConfig::default().enabled, |c| c.enabled)
281 }
282
283 #[must_use]
288 pub fn nat_config(&self) -> Option<NatConfig> {
289 self.nat_config.clone()
290 }
291
292 pub async fn start_nat_traversal(&self) -> Result<bool, AgentError> {
313 let config = self.nat_config.clone().unwrap_or_default();
314 if !config.enabled {
315 tracing::debug!("NAT traversal disabled in config; skipping start");
316 return Ok(false);
317 }
318
319 let mut nat = NatTraversal::new(config, self.overlay_port);
320 match nat.gather_candidates().await {
321 Ok(candidates) => {
322 tracing::info!(
323 count = candidates.len(),
324 "Gathered NAT candidates for overlay manager",
325 );
326 let now = std::time::SystemTime::now()
327 .duration_since(std::time::UNIX_EPOCH)
328 .unwrap_or_default()
329 .as_secs();
330 self.nat_last_refresh.store(now, Ordering::SeqCst);
331 *self.nat_traversal.write().await = Some(nat);
332 Ok(true)
333 }
334 Err(e) => {
335 tracing::warn!(error = %e, "NAT candidate gathering failed");
336 Ok(false)
337 }
338 }
339 }
340
341 pub async fn nat_maintenance_tick(&self) -> Result<(), AgentError> {
354 let mut guard = self.nat_traversal.write().await;
355 let Some(nat) = guard.as_mut() else {
356 return Ok(());
357 };
358
359 match nat.refresh().await {
360 Ok(changed) => {
361 if changed {
362 tracing::info!("NAT reflexive address changed during refresh");
363 }
364 let now = std::time::SystemTime::now()
365 .duration_since(std::time::UNIX_EPOCH)
366 .unwrap_or_default()
367 .as_secs();
368 self.nat_last_refresh.store(now, Ordering::SeqCst);
369 Ok(())
370 }
371 Err(e) => Err(AgentError::Network(format!(
372 "NAT maintenance tick failed: {e}"
373 ))),
374 }
375 }
376
377 pub async fn nat_status_snapshot(&self) -> NatStatusSnapshot {
384 let guard = self.nat_traversal.read().await;
385 let Some(nat) = guard.as_ref() else {
386 return NatStatusSnapshot::empty();
387 };
388 let candidates = nat.local_candidates().to_vec();
389 let last_refresh = self.nat_last_refresh.load(Ordering::SeqCst);
390 let peers: Vec<NatPeerSnapshot> = Vec::new();
395 NatStatusSnapshot {
396 candidates,
397 peers,
398 last_refresh,
399 }
400 }
401
402 pub fn set_dns_config(&mut self, addr: Option<SocketAddr>, domain: Option<String>) {
410 self.dns_server_addr = addr;
411 self.dns_domain = domain;
412 }
413
414 #[must_use]
416 pub fn with_dns_config(mut self, addr: Option<SocketAddr>, domain: Option<String>) -> Self {
417 self.dns_server_addr = addr;
418 self.dns_domain = domain;
419 self
420 }
421
422 #[must_use]
424 pub fn dns_server_addr(&self) -> Option<SocketAddr> {
425 self.dns_server_addr
426 }
427
428 #[must_use]
430 pub fn dns_domain(&self) -> Option<&str> {
431 self.dns_domain.as_deref()
432 }
433
434 pub async fn setup_global_overlay(&mut self) -> Result<(), AgentError> {
439 if self.global_transport.is_some() {
443 tracing::debug!(
444 deployment = %self.deployment,
445 "Global overlay already active, reusing existing transport"
446 );
447 return Ok(());
448 }
449
450 let interface_name = make_interface_name(&[&self.deployment], "g");
451
452 let (private_key, public_key) = OverlayTransport::generate_keys()
453 .await
454 .map_err(|e| AgentError::Network(format!("Failed to generate keys: {e}")))?;
455
456 let node_ip = self.ip_allocator.allocate()?;
457 let config = self.build_config(private_key, public_key, node_ip, 16, self.overlay_port);
458 let mut transport = OverlayTransport::new(config, interface_name.clone());
459
460 transport
461 .create_interface()
462 .await
463 .map_err(|e| AgentError::Network(format!("Failed to create global overlay: {e}")))?;
464 transport
465 .configure(&[])
466 .await
467 .map_err(|e| AgentError::Network(format!("Failed to configure global overlay: {e}")))?;
468
469 let actual_name = transport.interface_name().to_string();
471
472 self.node_ip = Some(node_ip);
473 self.global_interface = Some(actual_name);
474 self.global_transport = Some(transport);
475 Ok(())
476 }
477
478 pub async fn setup_service_overlay(&self, service_name: &str) -> Result<String, AgentError> {
483 let mut transports = self.service_transports.write().await;
489
490 if let Some(existing) = transports.get(service_name) {
491 let existing_name = existing.interface_name().to_string();
492 tracing::debug!(
493 service = %service_name,
494 interface = %existing_name,
495 "Service overlay already active, reusing existing transport"
496 );
497 drop(transports);
498 return Ok(existing_name);
499 }
500
501 let interface_name = make_interface_name(&[&self.deployment, service_name], "s");
502
503 match self
506 .build_service_transport(&interface_name, service_name)
507 .await
508 {
509 Ok(transport) => {
510 let actual_name = transport.interface_name().to_string();
511 transports.insert(service_name.to_string(), transport);
512 drop(transports);
513 tracing::info!(
514 service = %service_name,
515 interface = %actual_name,
516 "Service overlay created"
517 );
518 self.service_interfaces
520 .write()
521 .await
522 .insert(service_name.to_string(), actual_name.clone());
523 Ok(actual_name)
524 }
525 Err(e) => {
526 drop(transports);
527 tracing::warn!(
528 service = %service_name,
529 error = %e,
530 "Overlay unavailable, using direct networking"
531 );
532 self.service_interfaces
535 .write()
536 .await
537 .insert(service_name.to_string(), interface_name.clone());
538 Ok(interface_name)
539 }
540 }
541 }
542
543 async fn build_service_transport(
547 &self,
548 interface_name: &str,
549 service_name: &str,
550 ) -> Result<OverlayTransport, AgentError> {
551 let (private_key, public_key) = OverlayTransport::generate_keys()
552 .await
553 .map_err(|e| AgentError::Network(format!("Failed to generate keys: {e}")))?;
554
555 let service_ip = self.ip_allocator.allocate_for_service(service_name)?;
556 let config = self.build_config(private_key, public_key, service_ip, 24, 0);
557 let mut transport = OverlayTransport::new(config, interface_name.to_string());
558
559 transport
560 .create_interface()
561 .await
562 .map_err(|e| AgentError::Network(format!("Failed to create service overlay: {e}")))?;
563 transport.configure(&[]).await.map_err(|e| {
564 AgentError::Network(format!("Failed to configure service overlay: {e}"))
565 })?;
566
567 Ok(transport)
568 }
569
570 #[cfg_attr(
583 not(target_os = "linux"),
584 allow(clippy::needless_return, clippy::unused_async)
585 )]
586 pub async fn attach_container(
587 &self,
588 container_pid: u32,
589 service_name: &str,
590 join_global: bool,
591 ) -> Result<IpAddr, AgentError> {
592 #[cfg(not(target_os = "linux"))]
595 {
596 let _ = (container_pid, join_global);
598 tracing::debug!(
599 service = %service_name,
600 "Skipping per-container overlay attachment (not supported on this platform). \
601 Containers will use the node's overlay IP via host networking."
602 );
603 return Ok(self.node_ip.unwrap_or(IpAddr::V4(Ipv4Addr::LOCALHOST)));
604 }
605
606 #[cfg(target_os = "linux")]
607 {
608 let interfaces = self.service_interfaces.read().await;
609 let service_iface = interfaces.get(service_name).ok_or_else(|| {
610 AgentError::Network(format!("No overlay for service: {service_name}"))
611 })?;
612
613 let container_ip = self.ip_allocator.allocate()?;
614 self.attach_to_interface(
615 container_pid,
616 service_iface,
617 container_ip,
618 "s",
619 "eth0",
620 true,
621 )
622 .await?;
623
624 if join_global {
625 if let Some(global_iface) = &self.global_interface {
626 let global_ip = self.ip_allocator.allocate()?;
627 self.attach_to_interface(
628 container_pid,
629 global_iface,
630 global_ip,
631 "g",
632 "eth1",
633 false,
634 )
635 .await?;
636 }
637 }
638
639 Ok(container_ip)
640 }
641 }
642
643 #[cfg(target_os = "windows")]
644 pub async fn attach_container_hcn(
671 &self,
672 namespace_id: windows::core::GUID,
673 service_name: &str,
674 ip_override: Option<std::net::IpAddr>,
675 autoclean: bool,
676 dns_server: Option<std::net::IpAddr>,
677 dns_domain: Option<String>,
678 ) -> Result<std::net::IpAddr, AgentError> {
679 let ip = match ip_override {
680 Some(ip) => ip,
681 None => self.ip_allocator.allocate()?,
682 };
683 if autoclean {
684 let mut cleanup = self.hcn_cleanup.lock().await;
685 cleanup.insert(namespace_id, (service_name.to_string(), ip));
686 }
687 tracing::info!(
688 ns = ?namespace_id,
689 service = %service_name,
690 ip = %ip,
691 dns_server = ?dns_server,
692 dns_domain = ?dns_domain,
693 "Attached container to HCN overlay",
694 );
695 Ok(ip)
696 }
697
698 #[cfg(target_os = "windows")]
699 pub async fn detach_container_hcn(
710 &self,
711 namespace_id: windows::core::GUID,
712 ) -> Result<(), AgentError> {
713 let mut cleanup = self.hcn_cleanup.lock().await;
714 if let Some((service_name, ip)) = cleanup.remove(&namespace_id) {
715 tracing::info!(ns = ?namespace_id, service = %service_name, ip = %ip, "Released HCN overlay attachment");
716 }
717 Ok(())
718 }
719
720 #[cfg(target_os = "linux")]
721 #[allow(clippy::too_many_lines)]
722 async fn attach_to_interface(
723 &self,
724 container_pid: u32,
725 _interface: &str,
726 ip: IpAddr,
727 tag: &str,
728 container_iface: &str,
729 add_default_route: bool,
730 ) -> Result<(), AgentError> {
731 self.sweep_orphan_veths().await;
733
734 let is_v6 = ip.is_ipv6();
735 let prefix_len: u8 = if is_v6 { 64 } else { 24 };
736 let host_prefix: u8 = if is_v6 { 128 } else { 32 };
737
738 let veth_host = format!("veth-{container_pid}-{tag}");
739 let veth_pending = format!("vc-{container_pid}-{tag}");
740 let veth_container = container_iface.to_string();
741
742 let container_ns_fd = std::os::fd::OwnedFd::from(
745 std::fs::File::open(format!("/proc/{container_pid}/ns/net")).map_err(|e| {
746 AgentError::Network(format!("Failed to open /proc/{container_pid}/ns/net: {e}"))
747 })?,
748 );
749
750 crate::netlink::delete_link_by_name(&veth_host)
753 .await
754 .map_err(|e| AgentError::Network(format!("pre-cleanup delete {veth_host}: {e}")))?;
755 crate::netlink::delete_link_by_name(&veth_pending)
756 .await
757 .map_err(|e| AgentError::Network(format!("pre-cleanup delete {veth_pending}: {e}")))?;
758
759 let result: Result<(), AgentError> = async {
761 crate::netlink::create_veth_pair(&veth_host, &veth_pending)
763 .await
764 .map_err(|e| AgentError::Network(format!("create veth pair: {e}")))?;
765
766 crate::netlink::move_link_into_netns_fd_and_rename(
769 &veth_pending,
770 AsFd::as_fd(&container_ns_fd),
771 &veth_container,
772 )
773 .map_err(|e| AgentError::Network(format!("move veth into netns: {e}")))?;
774
775 let vc = veth_container.clone();
779 tokio::task::spawn_blocking(move || {
780 crate::netlink::with_netns_fd_async(container_ns_fd, move || async move {
781 crate::netlink::add_address_to_link_by_name(&vc, ip, prefix_len).await?;
782 crate::netlink::set_link_up_by_name(&vc).await?;
783 crate::netlink::set_link_up_by_name("lo").await?;
784 if add_default_route {
785 crate::netlink::add_default_route_via_dev(&vc, is_v6).await?;
786 }
787 Ok(())
788 })
789 })
790 .await
791 .map_err(|e| AgentError::Network(format!("container netns task panicked: {e}")))?
792 .map_err(|e| AgentError::Network(format!("container netns ops: {e}")))?;
793
794 crate::netlink::set_link_up_by_name(&veth_host)
796 .await
797 .map_err(|e| AgentError::Network(format!("set {veth_host} up: {e}")))?;
798
799 crate::netlink::replace_route_via_dev(ip, host_prefix, &veth_host, self.node_ip)
801 .await
802 .map_err(|e| {
803 AgentError::Network(format!("host route for {ip}/{host_prefix}: {e}"))
804 })?;
805
806 let _ = crate::netlink::set_sysctl("net.ipv4.ip_forward", "1");
808 let _ = crate::netlink::set_sysctl("net.ipv6.conf.all.forwarding", "1");
809
810 Ok(())
811 }
812 .await;
813
814 if result.is_err() {
817 let _ = crate::netlink::delete_link_by_name(&veth_host).await;
818 let _ = crate::netlink::delete_link_by_name(&veth_pending).await;
819 }
820
821 result
822 }
823
824 #[cfg(target_os = "linux")]
831 async fn sweep_orphan_veths(&self) {
832 let links = match crate::netlink::list_all_links().await {
833 Ok(links) => links,
834 Err(e) => {
835 tracing::warn!(error = %e, "Failed to list links for orphan sweep");
836 return;
837 }
838 };
839
840 for (_index, name) in links {
841 let remainder = if let Some(r) = name.strip_prefix("veth-") {
843 r
844 } else if let Some(r) = name.strip_prefix("vc-") {
845 r
846 } else {
847 continue;
848 };
849
850 let Some(pid_str) = remainder.split('-').next() else {
852 continue;
853 };
854
855 let pid: u32 = match pid_str.parse() {
856 Ok(p) => p,
857 Err(_) => continue,
858 };
859
860 if std::path::Path::new(&format!("/proc/{pid}")).exists() {
862 continue;
863 }
864
865 tracing::info!(link = %name, pid = pid, "Deleting orphan veth");
866 if let Err(e) = crate::netlink::delete_link_by_name(&name).await {
867 tracing::warn!(link = %name, error = %e, "Failed to delete orphan veth");
868 }
869 }
870 }
871
872 pub async fn teardown_service_overlay(&self, service_name: &str) {
878 if let Some(mut transport) = self.service_transports.write().await.remove(service_name) {
880 tracing::info!(service = %service_name, "Shutting down service overlay transport");
881 transport.shutdown();
882 }
883
884 if let Some(iface) = self.service_interfaces.write().await.remove(service_name) {
886 tracing::info!(
887 service = %service_name,
888 interface = %iface,
889 "Removed service overlay interface"
890 );
891 }
892 }
893
894 pub async fn cleanup(&mut self) -> Result<(), AgentError> {
899 let mut transports = self.service_transports.write().await;
901 for (name, mut transport) in transports.drain() {
902 tracing::info!(service = %name, "Shutting down service overlay");
903 transport.shutdown();
904 }
905 drop(transports);
906
907 if let Some(mut transport) = self.global_transport.take() {
909 tracing::info!("Shutting down global overlay");
910 transport.shutdown();
911 }
912
913 self.service_interfaces.write().await.clear();
915 self.global_interface = None;
916
917 Ok(())
918 }
919
920 pub fn node_ip(&self) -> Option<IpAddr> {
924 self.node_ip
925 }
926
927 pub fn deployment(&self) -> &str {
929 &self.deployment
930 }
931
932 pub fn global_interface(&self) -> Option<&str> {
934 self.global_interface.as_deref()
935 }
936
937 pub fn overlay_port(&self) -> u16 {
939 self.overlay_port
940 }
941
942 pub fn has_global_transport(&self) -> bool {
944 self.global_transport.is_some()
945 }
946
947 pub async fn service_transport_count(&self) -> usize {
949 self.service_transports.read().await.len()
950 }
951
952 pub fn overlay_cidr(&self) -> String {
954 match self.ip_allocator.base {
955 IpAddr::V4(_) => format!("{}/16", self.ip_allocator.base),
956 IpAddr::V6(_) => format!("{}/48", self.ip_allocator.base),
957 }
958 }
959
960 pub fn slice_cidr(&self) -> Option<IpNetwork> {
963 self.slice_cidr
964 }
965
966 pub fn cluster_cidr(&self) -> Option<IpNetwork> {
970 self.cluster_cidr
971 }
972
973 pub async fn persist_ipam_state(&self, path: &Path) -> Result<(), AgentError> {
982 self.ip_allocator.save(path).await
983 }
984
985 pub async fn restore_ipam_state(&mut self, path: &Path) -> Result<(), AgentError> {
995 self.ip_allocator.restore(path).await
996 }
997
998 pub fn ip_alloc_stats(&self) -> (u64, IpAddr) {
1000 let offset = self
1001 .ip_allocator
1002 .next_offset
1003 .load(std::sync::atomic::Ordering::SeqCst);
1004 (offset.saturating_sub(1), self.ip_allocator.base)
1005 }
1006
1007 fn build_config(
1008 &self,
1009 private_key: String,
1010 public_key: String,
1011 ip: IpAddr,
1012 mask: u8,
1013 listen_port: u16,
1014 ) -> OverlayConfig {
1015 let local_addr = match ip {
1017 IpAddr::V4(_) => IpAddr::V4(Ipv4Addr::UNSPECIFIED),
1018 IpAddr::V6(_) => IpAddr::V6(Ipv6Addr::UNSPECIFIED),
1019 };
1020 let mut config = OverlayConfig {
1021 local_endpoint: SocketAddr::new(local_addr, listen_port),
1022 private_key,
1023 public_key,
1024 overlay_cidr: format!("{ip}/{mask}"),
1025 ..OverlayConfig::default()
1026 };
1027 if let Some(nat) = self.nat_config.clone() {
1028 config.nat = nat;
1029 }
1030 if let Some(dir) = self.uapi_sock_dir.clone() {
1031 config.uapi_sock_dir = dir;
1032 }
1033 config
1034 }
1035}
1036
1037struct IpAllocator {
1048 base: IpAddr,
1052 cidr: IpNetwork,
1055 next_offset: AtomicU64,
1057}
1058
1059#[derive(Debug, Serialize, Deserialize)]
1065struct IpAllocatorState {
1066 cidr: String,
1067 next_offset: u64,
1068}
1069
1070impl IpAllocator {
1071 fn new(cidr: IpNetwork) -> Self {
1072 Self {
1073 base: cidr.network(),
1074 cidr,
1075 next_offset: AtomicU64::new(1),
1076 }
1077 }
1078
1079 #[allow(clippy::cast_possible_truncation)]
1080 fn compute_addr(&self, offset: u64) -> IpAddr {
1081 match self.base {
1082 IpAddr::V4(base_v4) => {
1083 let base_u32 = u32::from_be_bytes(base_v4.octets());
1084 let addr = base_u32.wrapping_add(offset as u32);
1085 IpAddr::V4(Ipv4Addr::from(addr.to_be_bytes()))
1086 }
1087 IpAddr::V6(base_v6) => {
1088 let base_u128 = u128::from(base_v6);
1089 let addr = base_u128.wrapping_add(u128::from(offset));
1090 IpAddr::V6(Ipv6Addr::from(addr))
1091 }
1092 }
1093 }
1094
1095 fn allocate(&self) -> Result<IpAddr, AgentError> {
1101 let offset = self.next_offset.fetch_add(1, Ordering::SeqCst);
1105 let addr = self.compute_addr(offset);
1106
1107 let in_cidr = self.cidr.contains(addr);
1110 let is_v4_broadcast = matches!(
1111 (&self.cidr, &addr),
1112 (IpNetwork::V4(v4), IpAddr::V4(a)) if *a == v4.broadcast()
1113 );
1114 if !in_cidr || is_v4_broadcast {
1115 return Err(AgentError::Network(format!(
1116 "IP allocator exhausted: next address {addr} is outside slice {}",
1117 self.cidr
1118 )));
1119 }
1120 Ok(addr)
1121 }
1122
1123 fn allocate_for_service(&self, _service: &str) -> Result<IpAddr, AgentError> {
1124 self.allocate()
1125 }
1126
1127 async fn save(&self, path: &Path) -> Result<(), AgentError> {
1129 let state = IpAllocatorState {
1130 cidr: self.cidr.to_string(),
1131 next_offset: self.next_offset.load(Ordering::SeqCst),
1132 };
1133 let json = serde_json::to_vec_pretty(&state)
1134 .map_err(|e| AgentError::Network(format!("serialize ipam state: {e}")))?;
1135 if let Some(parent) = path.parent() {
1136 if !parent.as_os_str().is_empty() {
1137 tokio::fs::create_dir_all(parent).await.map_err(|e| {
1138 AgentError::Network(format!("create ipam state dir {}: {e}", parent.display()))
1139 })?;
1140 }
1141 }
1142 tokio::fs::write(path, json).await.map_err(|e| {
1143 AgentError::Network(format!("write ipam state {}: {e}", path.display()))
1144 })?;
1145 Ok(())
1146 }
1147
1148 async fn restore(&mut self, path: &Path) -> Result<(), AgentError> {
1155 let raw = match tokio::fs::read_to_string(path).await {
1156 Ok(s) => s,
1157 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(()),
1158 Err(e) => {
1159 return Err(AgentError::Network(format!(
1160 "read ipam state {}: {e}",
1161 path.display()
1162 )));
1163 }
1164 };
1165 let state: IpAllocatorState = serde_json::from_str(&raw).map_err(|e| {
1166 AgentError::Network(format!("parse ipam state {}: {e}", path.display()))
1167 })?;
1168
1169 if state.cidr != self.cidr.to_string() {
1170 tracing::warn!(
1171 persisted_cidr = %state.cidr,
1172 current_cidr = %self.cidr,
1173 path = %path.display(),
1174 "IPAM state CIDR mismatch; ignoring persisted counter"
1175 );
1176 return Ok(());
1177 }
1178
1179 self.next_offset.store(state.next_offset, Ordering::SeqCst);
1180 Ok(())
1181 }
1182
1183 #[allow(dead_code)]
1189 async fn load(path: &Path, cidr: IpNetwork) -> Result<Self, AgentError> {
1190 let mut alloc = Self::new(cidr);
1191 alloc.restore(path).await?;
1192 Ok(alloc)
1193 }
1194}
1195
1196#[cfg(test)]
1197mod tests {
1198 use super::*;
1199
1200 #[test]
1202 fn interface_name_never_exceeds_limit() {
1203 let cases: Vec<(&[&str], &str)> = vec![
1204 (&["a"], "g"),
1205 (&["zlayer-manager"], "g"),
1206 (&["my-very-long-deployment-name-that-goes-on-and-on"], "g"),
1207 (&["zlayer", "manager"], "s"),
1208 (&["zlayer-manager", "frontend-service"], "s"),
1209 (&["a", "b"], "s"),
1210 (
1211 &["abcdefghijklmnopqrstuvwxyz", "abcdefghijklmnopqrstuvwxyz"],
1212 "s",
1213 ),
1214 (&["x"], ""),
1215 (&["deployment"], ""),
1216 (&["a-really-long-name-exceeding-everything"], "suffix"),
1217 ];
1218
1219 for (parts, suffix) in &cases {
1220 let name = make_interface_name(parts, suffix);
1221 assert!(
1222 name.len() <= MAX_IFNAME_LEN,
1223 "Name '{}' is {} chars (parts={:?}, suffix='{}')",
1224 name,
1225 name.len(),
1226 parts,
1227 suffix,
1228 );
1229 }
1230 }
1231
1232 #[test]
1234 fn interface_name_with_extreme_lengths() {
1235 let long = "a".repeat(200);
1236 let long_ref = long.as_str();
1237
1238 let name = make_interface_name(&[long_ref], "g");
1239 assert!(name.len() <= MAX_IFNAME_LEN, "Name '{name}' too long");
1240
1241 let name = make_interface_name(&[long_ref, long_ref, long_ref], "s");
1242 assert!(name.len() <= MAX_IFNAME_LEN, "Name '{name}' too long");
1243
1244 let name = make_interface_name(&[long_ref], "");
1245 assert!(name.len() <= MAX_IFNAME_LEN, "Name '{name}' too long");
1246 }
1247
1248 #[test]
1250 fn interface_name_with_empty_inputs() {
1251 let name = make_interface_name(&[""], "");
1252 assert!(name.len() <= MAX_IFNAME_LEN);
1253 assert!(name.starts_with("zl-"));
1254
1255 let name = make_interface_name(&["", ""], "s");
1256 assert!(name.len() <= MAX_IFNAME_LEN);
1257 assert!(name.starts_with("zl-"));
1258
1259 let name = make_interface_name(&[], "g");
1260 assert!(name.len() <= MAX_IFNAME_LEN);
1261 assert!(name.starts_with("zl-"));
1262 }
1263
1264 #[test]
1266 fn interface_name_is_deterministic() {
1267 let a = make_interface_name(&["zlayer-manager"], "g");
1268 let b = make_interface_name(&["zlayer-manager"], "g");
1269 assert_eq!(a, b);
1270
1271 let a = make_interface_name(&["deploy", "frontend"], "s");
1272 let b = make_interface_name(&["deploy", "frontend"], "s");
1273 assert_eq!(a, b);
1274 }
1275
1276 #[test]
1278 fn interface_name_uniqueness() {
1279 let a = make_interface_name(&["deploy-a"], "g");
1280 let b = make_interface_name(&["deploy-b"], "g");
1281 assert_ne!(a, b, "Different deployments should yield different names");
1282
1283 let a = make_interface_name(&["deploy", "svc-a"], "s");
1284 let b = make_interface_name(&["deploy", "svc-b"], "s");
1285 assert_ne!(a, b, "Different services should yield different names");
1286
1287 let a = make_interface_name(&["deploy"], "g");
1288 let b = make_interface_name(&["deploy"], "s");
1289 assert_ne!(a, b, "Different suffixes should yield different names");
1290 }
1291
1292 #[test]
1294 fn interface_name_short_inputs_are_readable() {
1295 let name = make_interface_name(&["app"], "g");
1297 assert_eq!(name, "zl-app-g");
1298
1299 let name = make_interface_name(&["my", "web"], "s");
1301 assert_eq!(name, "zl-my-web-s");
1302 }
1303
1304 #[test]
1306 fn global_overlay_realistic_names() {
1307 let deployments = [
1308 "zlayer-manager",
1309 "my-very-long-deployment-name",
1310 "a",
1311 "production",
1312 "zlayer",
1313 ];
1314
1315 for deployment in &deployments {
1316 let name = make_interface_name(&[deployment], "g");
1317 assert!(
1318 name.len() <= MAX_IFNAME_LEN,
1319 "Global overlay '{name}' for deployment '{deployment}' exceeds limit",
1320 );
1321 assert!(name.starts_with("zl-"));
1322 }
1323 }
1324
1325 #[test]
1327 fn service_overlay_realistic_names() {
1328 let cases = [
1329 ("zlayer-manager", "frontend"),
1330 ("zlayer-manager", "backend-api"),
1331 ("zlayer", "manager"),
1332 ("a", "b"),
1333 ("production", "auth-service-primary"),
1334 ("my-long-deploy", "my-long-service"),
1335 ];
1336
1337 for (deployment, service) in &cases {
1338 let name = make_interface_name(&[deployment, service], "s");
1339 assert!(
1340 name.len() <= MAX_IFNAME_LEN,
1341 "Service overlay '{name}' for ({deployment}, {service}) exceeds limit",
1342 );
1343 assert!(name.starts_with("zl-"));
1344 }
1345 }
1346
1347 #[test]
1349 fn interface_name_with_unicode() {
1350 let name = make_interface_name(&["\u{1F600}\u{1F600}\u{1F600}"], "g");
1351 assert!(name.len() <= MAX_IFNAME_LEN, "Name '{name}' too long");
1352
1353 let name = make_interface_name(&["\u{00E9}\u{00E9}\u{00E9}", "\u{00FC}\u{00FC}"], "s");
1354 assert!(name.len() <= MAX_IFNAME_LEN, "Name '{name}' too long");
1355 }
1356
1357 #[tokio::test]
1359 async fn test_node_ip_before_and_after_init() {
1360 let om = OverlayManager::new("test-deploy".to_string())
1361 .await
1362 .unwrap();
1363
1364 assert!(
1366 om.node_ip().is_none(),
1367 "node_ip should be None before setup_global_overlay"
1368 );
1369 }
1370
1371 #[test]
1373 fn ip_allocator_v4_sequential() {
1374 let alloc = IpAllocator::new("10.200.0.0/16".parse().unwrap());
1375 let ip1 = alloc.allocate().unwrap();
1376 let ip2 = alloc.allocate().unwrap();
1377 let ip3 = alloc.allocate().unwrap();
1378 assert_eq!(ip1, IpAddr::V4(Ipv4Addr::new(10, 200, 0, 1)));
1379 assert_eq!(ip2, IpAddr::V4(Ipv4Addr::new(10, 200, 0, 2)));
1380 assert_eq!(ip3, IpAddr::V4(Ipv4Addr::new(10, 200, 0, 3)));
1381 }
1382
1383 #[test]
1385 fn ip_allocator_v6_sequential() {
1386 let alloc = IpAllocator::new("fd00:200::0/48".parse().unwrap());
1387 let ip1 = alloc.allocate().unwrap();
1388 let ip2 = alloc.allocate().unwrap();
1389 let ip3 = alloc.allocate().unwrap();
1390 assert_eq!(ip1, "fd00:200::1".parse::<IpAddr>().unwrap());
1391 assert_eq!(ip2, "fd00:200::2".parse::<IpAddr>().unwrap());
1392 assert_eq!(ip3, "fd00:200::3".parse::<IpAddr>().unwrap());
1393 }
1394
1395 #[test]
1397 fn ip_allocator_service_delegates() {
1398 let alloc = IpAllocator::new("fd00:200::0/48".parse().unwrap());
1399 let ip1 = alloc.allocate_for_service("web").unwrap();
1400 let ip2 = alloc.allocate().unwrap();
1401 assert_eq!(ip1, "fd00:200::1".parse::<IpAddr>().unwrap());
1402 assert_eq!(ip2, "fd00:200::2".parse::<IpAddr>().unwrap());
1403 }
1404
1405 #[test]
1408 fn test_allocator_bounded_to_slice_v4() {
1409 let slice: IpNetwork = "10.200.42.0/28".parse().unwrap();
1410 let alloc = IpAllocator::new(slice);
1411
1412 let mut allocated = Vec::new();
1413 for _ in 0..14 {
1414 let ip = alloc
1415 .allocate()
1416 .expect("first 14 allocations should succeed");
1417 allocated.push(ip);
1418 }
1419
1420 for ip in &allocated {
1422 assert!(
1423 slice.contains(*ip),
1424 "Allocated IP {ip} outside slice {slice}"
1425 );
1426 }
1427
1428 let exhausted = alloc.allocate();
1430 assert!(
1431 exhausted.is_err(),
1432 "allocation past /28 exhaustion should fail, got {exhausted:?}"
1433 );
1434 }
1435
1436 #[test]
1439 fn test_allocator_rejects_oob() {
1440 let slice: IpNetwork = "10.200.42.16/28".parse().unwrap();
1441 let alloc = IpAllocator::new(slice);
1442
1443 for _ in 0..14 {
1446 let ip = alloc.allocate().expect("host allocation should succeed");
1447 assert!(slice.contains(ip), "Allocation {ip} escaped slice {slice}");
1448 if let (IpAddr::V4(a), IpNetwork::V4(v4)) = (ip, slice) {
1450 assert_ne!(a, v4.broadcast(), "handed out broadcast address");
1451 assert_ne!(a, v4.network(), "handed out network address");
1452 }
1453 }
1454
1455 assert!(alloc.allocate().is_err());
1457 }
1458
1459 #[test]
1461 fn test_overlay_manager_with_slice_stores_slice_cidr() {
1462 let cluster: IpNetwork = "10.200.0.0/16".parse().unwrap();
1463 let slice: IpNetwork = "10.200.42.0/28".parse().unwrap();
1464
1465 let om = OverlayManager::with_slice("test-deploy".to_string(), cluster, slice, 51820);
1466
1467 assert_eq!(om.slice_cidr(), Some(slice));
1468 assert_eq!(om.cluster_cidr(), Some(cluster));
1469 assert_eq!(om.overlay_port(), 51820);
1470 assert_eq!(om.deployment(), "test-deploy");
1471 }
1472
1473 #[tokio::test]
1476 async fn test_allocator_persistence_roundtrip() {
1477 let slice: IpNetwork = "10.200.42.0/28".parse().unwrap();
1478 let alloc = IpAllocator::new(slice);
1479
1480 let a1 = alloc.allocate().unwrap();
1481 let a2 = alloc.allocate().unwrap();
1482 let a3 = alloc.allocate().unwrap();
1483 assert_eq!(a1, IpAddr::V4(Ipv4Addr::new(10, 200, 42, 1)));
1484 assert_eq!(a2, IpAddr::V4(Ipv4Addr::new(10, 200, 42, 2)));
1485 assert_eq!(a3, IpAddr::V4(Ipv4Addr::new(10, 200, 42, 3)));
1486
1487 let dir = tempfile::tempdir().expect("tempdir");
1488 let state_path = dir.path().join("agent_ipam.json");
1489 alloc.save(&state_path).await.expect("save");
1490
1491 let restored = IpAllocator::load(&state_path, slice).await.expect("load");
1492 let a4 = restored.allocate().unwrap();
1493 assert_eq!(
1494 a4,
1495 IpAddr::V4(Ipv4Addr::new(10, 200, 42, 4)),
1496 "restored allocator should continue from the persisted counter"
1497 );
1498
1499 let missing_path = dir.path().join("does-not-exist.json");
1501 let mut fresh = IpAllocator::new(slice);
1502 fresh.restore(&missing_path).await.expect("restore missing");
1503 let first = fresh.allocate().unwrap();
1504 assert_eq!(first, IpAddr::V4(Ipv4Addr::new(10, 200, 42, 1)));
1505 }
1506
1507 #[cfg(target_os = "windows")]
1511 #[tokio::test]
1512 async fn test_attach_detach_container_hcn_tracks_cleanup_map() {
1513 let cluster: IpNetwork = "10.200.0.0/16".parse().unwrap();
1514 let slice: IpNetwork = "10.200.42.0/28".parse().unwrap();
1515 let om = OverlayManager::with_slice("test-deploy".to_string(), cluster, slice, 51820);
1516
1517 let ns = windows::core::GUID::zeroed();
1518 let fixed_ip: std::net::IpAddr = "10.200.42.5".parse().unwrap();
1519
1520 let ip = om
1522 .attach_container_hcn(ns, "svc-a", Some(fixed_ip), true, None, None)
1523 .await
1524 .expect("attach_container_hcn");
1525 assert_eq!(ip, fixed_ip);
1526 {
1527 let map = om.hcn_cleanup.lock().await;
1528 assert_eq!(map.len(), 1);
1529 let entry = map.get(&ns).expect("entry for zeroed GUID");
1530 assert_eq!(entry.0, "svc-a");
1531 assert_eq!(entry.1, fixed_ip);
1532 }
1533
1534 om.detach_container_hcn(ns).await.expect("detach");
1536 {
1537 let map = om.hcn_cleanup.lock().await;
1538 assert!(map.is_empty(), "detach should leave the cleanup map empty");
1539 }
1540
1541 om.detach_container_hcn(ns)
1543 .await
1544 .expect("unknown GUID is no-op");
1545
1546 let _ip = om
1548 .attach_container_hcn(ns, "svc-b", Some(fixed_ip), false, None, None)
1549 .await
1550 .expect("attach without autoclean");
1551 {
1552 let map = om.hcn_cleanup.lock().await;
1553 assert!(map.is_empty(), "autoclean=false should not populate map");
1554 }
1555 }
1556
1557 #[tokio::test]
1560 async fn dns_config_defaults_to_none() {
1561 let om = OverlayManager::new("dns-default".to_string())
1562 .await
1563 .expect("OverlayManager::new");
1564 assert!(om.dns_server_addr().is_none());
1565 assert!(om.dns_domain().is_none());
1566 }
1567
1568 #[tokio::test]
1571 async fn dns_config_set_and_round_trip() {
1572 let mut om = OverlayManager::new("dns-roundtrip".to_string())
1573 .await
1574 .expect("OverlayManager::new");
1575 let addr: SocketAddr = "10.200.42.1:15353".parse().unwrap();
1576 om.set_dns_config(Some(addr), Some("overlay.local".to_string()));
1577 assert_eq!(om.dns_server_addr(), Some(addr));
1578 assert_eq!(om.dns_domain(), Some("overlay.local"));
1579
1580 om.set_dns_config(None, None);
1582 assert!(om.dns_server_addr().is_none());
1583 assert!(om.dns_domain().is_none());
1584 }
1585
1586 #[test]
1589 fn with_dns_config_preserves_values() {
1590 let cluster: IpNetwork = "10.200.0.0/16".parse().unwrap();
1591 let slice: IpNetwork = "10.200.42.0/28".parse().unwrap();
1592 let addr: SocketAddr = "10.200.42.1:15353".parse().unwrap();
1593 let om = OverlayManager::with_slice("dns-builder".to_string(), cluster, slice, 51820)
1594 .with_dns_config(Some(addr), Some("overlay.local".to_string()));
1595 assert_eq!(om.dns_server_addr(), Some(addr));
1596 assert_eq!(om.dns_domain(), Some("overlay.local"));
1597 }
1598}