1use crate::error::AgentError;
2use ipnetwork::IpNetwork;
3use serde::{Deserialize, Serialize};
4use std::collections::hash_map::DefaultHasher;
5use std::collections::HashMap;
6use std::hash::{Hash, Hasher};
7use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
8#[cfg(target_os = "linux")]
9use std::os::fd::AsFd;
10use std::path::Path;
11use std::sync::atomic::{AtomicU64, Ordering};
12use tokio::sync::RwLock;
13use zlayer_overlay::{OverlayConfig, OverlayTransport};
14
15const MAX_IFNAME_LEN: usize = 15;
17
18#[must_use]
24pub fn make_interface_name(parts: &[&str], suffix: &str) -> String {
25 let base = format!("zl-{}", parts.join("-"));
26 let candidate = if suffix.is_empty() {
27 base
28 } else {
29 format!("{base}-{suffix}")
30 };
31
32 if candidate.len() <= MAX_IFNAME_LEN {
33 return candidate;
34 }
35
36 let mut hasher = DefaultHasher::new();
38 for part in parts {
39 part.hash(&mut hasher);
40 }
41 suffix.hash(&mut hasher);
42 let hash = format!("{:x}", hasher.finish());
43
44 if suffix.is_empty() {
45 let budget = MAX_IFNAME_LEN - 3;
47 format!("zl-{}", &hash[..budget.min(hash.len())])
48 } else {
49 let suffix_cost = 1 + suffix.len(); let hash_budget = MAX_IFNAME_LEN.saturating_sub(3 + suffix_cost);
52 if hash_budget == 0 {
53 let budget = MAX_IFNAME_LEN - 3;
55 format!("zl-{}", &hash[..budget.min(hash.len())])
56 } else {
57 format!("zl-{}-{}", &hash[..hash_budget.min(hash.len())], suffix)
58 }
59 }
60}
61
62pub struct OverlayManager {
64 deployment: String,
66 global_interface: Option<String>,
68 global_transport: Option<OverlayTransport>,
70 service_interfaces: RwLock<HashMap<String, String>>,
72 service_transports: RwLock<HashMap<String, OverlayTransport>>,
74 ip_allocator: IpAllocator,
76 node_ip: Option<IpAddr>,
79 overlay_port: u16,
81 cluster_cidr: Option<IpNetwork>,
85 slice_cidr: Option<IpNetwork>,
89 #[cfg(target_os = "windows")]
93 hcn_cleanup: std::sync::Arc<
94 tokio::sync::Mutex<
95 std::collections::HashMap<windows::core::GUID, (String, std::net::IpAddr)>,
96 >,
97 >,
98 dns_server_addr: Option<SocketAddr>,
104 dns_domain: Option<String>,
110}
111
112impl OverlayManager {
113 #[allow(clippy::unused_async)]
128 pub async fn new(deployment: String) -> Result<Self, AgentError> {
129 tracing::warn!(
130 deployment = %deployment,
131 "OverlayManager::new uses full /16 default; prefer with_slice for cluster deployments"
132 );
133 let default_cidr: IpNetwork = "10.200.0.0/16".parse().unwrap();
134 Ok(Self {
135 deployment,
136 global_interface: None,
137 global_transport: None,
138 service_interfaces: RwLock::new(HashMap::new()),
139 service_transports: RwLock::new(HashMap::new()),
140 ip_allocator: IpAllocator::new(default_cidr),
141 node_ip: None,
142 overlay_port: zlayer_core::DEFAULT_WG_PORT,
143 cluster_cidr: Some(default_cidr),
144 slice_cidr: None,
145 #[cfg(target_os = "windows")]
146 hcn_cleanup: std::sync::Arc::new(tokio::sync::Mutex::new(
147 std::collections::HashMap::new(),
148 )),
149 dns_server_addr: None,
150 dns_domain: None,
151 })
152 }
153
154 #[must_use]
165 pub fn with_slice(
166 deployment: String,
167 cluster_cidr: IpNetwork,
168 slice_cidr: IpNetwork,
169 port: u16,
170 ) -> Self {
171 Self {
172 deployment,
173 global_interface: None,
174 global_transport: None,
175 service_interfaces: RwLock::new(HashMap::new()),
176 service_transports: RwLock::new(HashMap::new()),
177 ip_allocator: IpAllocator::new(slice_cidr),
178 node_ip: None,
179 overlay_port: port,
180 cluster_cidr: Some(cluster_cidr),
181 slice_cidr: Some(slice_cidr),
182 #[cfg(target_os = "windows")]
183 hcn_cleanup: std::sync::Arc::new(tokio::sync::Mutex::new(
184 std::collections::HashMap::new(),
185 )),
186 dns_server_addr: None,
187 dns_domain: None,
188 }
189 }
190
191 #[must_use]
193 pub fn with_overlay_port(mut self, port: u16) -> Self {
194 self.overlay_port = port;
195 self
196 }
197
198 pub fn set_dns_config(&mut self, addr: Option<SocketAddr>, domain: Option<String>) {
206 self.dns_server_addr = addr;
207 self.dns_domain = domain;
208 }
209
210 #[must_use]
212 pub fn with_dns_config(mut self, addr: Option<SocketAddr>, domain: Option<String>) -> Self {
213 self.dns_server_addr = addr;
214 self.dns_domain = domain;
215 self
216 }
217
218 #[must_use]
220 pub fn dns_server_addr(&self) -> Option<SocketAddr> {
221 self.dns_server_addr
222 }
223
224 #[must_use]
226 pub fn dns_domain(&self) -> Option<&str> {
227 self.dns_domain.as_deref()
228 }
229
230 pub async fn setup_global_overlay(&mut self) -> Result<(), AgentError> {
235 if self.global_transport.is_some() {
240 tracing::debug!(
241 deployment = %self.deployment,
242 "Global overlay already active, reusing existing transport"
243 );
244 return Ok(());
245 }
246
247 let interface_name = make_interface_name(&[&self.deployment], "g");
248
249 let (private_key, public_key) = OverlayTransport::generate_keys()
250 .await
251 .map_err(|e| AgentError::Network(format!("Failed to generate keys: {e}")))?;
252
253 let node_ip = self.ip_allocator.allocate()?;
254 let config = self.build_config(private_key, public_key, node_ip, 16, self.overlay_port);
255 let mut transport = OverlayTransport::new(config, interface_name.clone());
256
257 transport
258 .create_interface()
259 .await
260 .map_err(|e| AgentError::Network(format!("Failed to create global overlay: {e}")))?;
261 transport
262 .configure(&[])
263 .await
264 .map_err(|e| AgentError::Network(format!("Failed to configure global overlay: {e}")))?;
265
266 let actual_name = transport.interface_name().to_string();
268
269 self.node_ip = Some(node_ip);
270 self.global_interface = Some(actual_name);
271 self.global_transport = Some(transport);
272 Ok(())
273 }
274
275 pub async fn setup_service_overlay(&self, service_name: &str) -> Result<String, AgentError> {
280 {
286 let transports = self.service_transports.read().await;
287 if let Some(existing) = transports.get(service_name) {
288 let existing_name = existing.interface_name().to_string();
289 tracing::debug!(
290 service = %service_name,
291 interface = %existing_name,
292 "Service overlay already active, reusing existing transport"
293 );
294 return Ok(existing_name);
295 }
296 }
297
298 let interface_name = make_interface_name(&[&self.deployment, service_name], "s");
299
300 match self.try_create_overlay(&interface_name, service_name).await {
303 Ok(()) => {
304 tracing::info!(
305 service = %service_name,
306 interface = %interface_name,
307 "Service overlay created"
308 );
309 }
310 Err(e) => {
311 tracing::warn!(
312 service = %service_name,
313 error = %e,
314 "Overlay unavailable, using direct networking"
315 );
316 }
317 }
318
319 self.service_interfaces
322 .write()
323 .await
324 .insert(service_name.to_string(), interface_name.clone());
325 Ok(interface_name)
326 }
327
328 async fn try_create_overlay(
330 &self,
331 interface_name: &str,
332 service_name: &str,
333 ) -> Result<(), AgentError> {
334 let (private_key, public_key) = OverlayTransport::generate_keys()
335 .await
336 .map_err(|e| AgentError::Network(format!("Failed to generate keys: {e}")))?;
337
338 let service_ip = self.ip_allocator.allocate_for_service(service_name)?;
339 let config = self.build_config(private_key, public_key, service_ip, 24, 0);
340 let mut transport = OverlayTransport::new(config, interface_name.to_string());
341
342 transport
343 .create_interface()
344 .await
345 .map_err(|e| AgentError::Network(format!("Failed to create service overlay: {e}")))?;
346 transport.configure(&[]).await.map_err(|e| {
347 AgentError::Network(format!("Failed to configure service overlay: {e}"))
348 })?;
349
350 let actual_name = transport.interface_name().to_string();
352 self.service_interfaces
353 .write()
354 .await
355 .insert(service_name.to_string(), actual_name);
356
357 self.service_transports
358 .write()
359 .await
360 .insert(service_name.to_string(), transport);
361 Ok(())
362 }
363
364 #[cfg_attr(
377 not(target_os = "linux"),
378 allow(clippy::needless_return, clippy::unused_async)
379 )]
380 pub async fn attach_container(
381 &self,
382 container_pid: u32,
383 service_name: &str,
384 join_global: bool,
385 ) -> Result<IpAddr, AgentError> {
386 #[cfg(not(target_os = "linux"))]
389 {
390 let _ = (container_pid, join_global);
392 tracing::debug!(
393 service = %service_name,
394 "Skipping per-container overlay attachment (not supported on this platform). \
395 Containers will use the node's overlay IP via host networking."
396 );
397 return Ok(self.node_ip.unwrap_or(IpAddr::V4(Ipv4Addr::LOCALHOST)));
398 }
399
400 #[cfg(target_os = "linux")]
401 {
402 let interfaces = self.service_interfaces.read().await;
403 let service_iface = interfaces.get(service_name).ok_or_else(|| {
404 AgentError::Network(format!("No overlay for service: {service_name}"))
405 })?;
406
407 let container_ip = self.ip_allocator.allocate()?;
408 self.attach_to_interface(
409 container_pid,
410 service_iface,
411 container_ip,
412 "s",
413 "eth0",
414 true,
415 )
416 .await?;
417
418 if join_global {
419 if let Some(global_iface) = &self.global_interface {
420 let global_ip = self.ip_allocator.allocate()?;
421 self.attach_to_interface(
422 container_pid,
423 global_iface,
424 global_ip,
425 "g",
426 "eth1",
427 false,
428 )
429 .await?;
430 }
431 }
432
433 Ok(container_ip)
434 }
435 }
436
437 #[cfg(target_os = "windows")]
438 pub async fn attach_container_hcn(
465 &self,
466 namespace_id: windows::core::GUID,
467 service_name: &str,
468 ip_override: Option<std::net::IpAddr>,
469 autoclean: bool,
470 dns_server: Option<std::net::IpAddr>,
471 dns_domain: Option<String>,
472 ) -> Result<std::net::IpAddr, AgentError> {
473 let ip = match ip_override {
474 Some(ip) => ip,
475 None => self.ip_allocator.allocate()?,
476 };
477 if autoclean {
478 let mut cleanup = self.hcn_cleanup.lock().await;
479 cleanup.insert(namespace_id, (service_name.to_string(), ip));
480 }
481 tracing::info!(
482 ns = ?namespace_id,
483 service = %service_name,
484 ip = %ip,
485 dns_server = ?dns_server,
486 dns_domain = ?dns_domain,
487 "Attached container to HCN overlay",
488 );
489 Ok(ip)
490 }
491
492 #[cfg(target_os = "windows")]
493 pub async fn detach_container_hcn(
504 &self,
505 namespace_id: windows::core::GUID,
506 ) -> Result<(), AgentError> {
507 let mut cleanup = self.hcn_cleanup.lock().await;
508 if let Some((service_name, ip)) = cleanup.remove(&namespace_id) {
509 tracing::info!(ns = ?namespace_id, service = %service_name, ip = %ip, "Released HCN overlay attachment");
510 }
511 Ok(())
512 }
513
514 #[cfg(target_os = "linux")]
515 #[allow(clippy::too_many_lines)]
516 async fn attach_to_interface(
517 &self,
518 container_pid: u32,
519 _interface: &str,
520 ip: IpAddr,
521 tag: &str,
522 container_iface: &str,
523 add_default_route: bool,
524 ) -> Result<(), AgentError> {
525 self.sweep_orphan_veths().await;
527
528 let is_v6 = ip.is_ipv6();
529 let prefix_len: u8 = if is_v6 { 64 } else { 24 };
530 let host_prefix: u8 = if is_v6 { 128 } else { 32 };
531
532 let veth_host = format!("veth-{container_pid}-{tag}");
533 let veth_pending = format!("vc-{container_pid}-{tag}");
534 let veth_container = container_iface.to_string();
535
536 let container_ns_fd = std::os::fd::OwnedFd::from(
539 std::fs::File::open(format!("/proc/{container_pid}/ns/net")).map_err(|e| {
540 AgentError::Network(format!("Failed to open /proc/{container_pid}/ns/net: {e}"))
541 })?,
542 );
543
544 crate::netlink::delete_link_by_name(&veth_host)
547 .await
548 .map_err(|e| AgentError::Network(format!("pre-cleanup delete {veth_host}: {e}")))?;
549 crate::netlink::delete_link_by_name(&veth_pending)
550 .await
551 .map_err(|e| AgentError::Network(format!("pre-cleanup delete {veth_pending}: {e}")))?;
552
553 let result: Result<(), AgentError> = async {
555 crate::netlink::create_veth_pair(&veth_host, &veth_pending)
557 .await
558 .map_err(|e| AgentError::Network(format!("create veth pair: {e}")))?;
559
560 crate::netlink::move_link_into_netns_fd_and_rename(
563 &veth_pending,
564 AsFd::as_fd(&container_ns_fd),
565 &veth_container,
566 )
567 .map_err(|e| AgentError::Network(format!("move veth into netns: {e}")))?;
568
569 let vc = veth_container.clone();
573 tokio::task::spawn_blocking(move || {
574 crate::netlink::with_netns_fd_async(container_ns_fd, move || async move {
575 crate::netlink::add_address_to_link_by_name(&vc, ip, prefix_len).await?;
576 crate::netlink::set_link_up_by_name(&vc).await?;
577 crate::netlink::set_link_up_by_name("lo").await?;
578 if add_default_route {
579 crate::netlink::add_default_route_via_dev(&vc, is_v6).await?;
580 }
581 Ok(())
582 })
583 })
584 .await
585 .map_err(|e| AgentError::Network(format!("container netns task panicked: {e}")))?
586 .map_err(|e| AgentError::Network(format!("container netns ops: {e}")))?;
587
588 crate::netlink::set_link_up_by_name(&veth_host)
590 .await
591 .map_err(|e| AgentError::Network(format!("set {veth_host} up: {e}")))?;
592
593 crate::netlink::replace_route_via_dev(ip, host_prefix, &veth_host, self.node_ip)
595 .await
596 .map_err(|e| {
597 AgentError::Network(format!("host route for {ip}/{host_prefix}: {e}"))
598 })?;
599
600 let _ = crate::netlink::set_sysctl("net.ipv4.ip_forward", "1");
602 let _ = crate::netlink::set_sysctl("net.ipv6.conf.all.forwarding", "1");
603
604 Ok(())
605 }
606 .await;
607
608 if result.is_err() {
611 let _ = crate::netlink::delete_link_by_name(&veth_host).await;
612 let _ = crate::netlink::delete_link_by_name(&veth_pending).await;
613 }
614
615 result
616 }
617
618 #[cfg(target_os = "linux")]
625 async fn sweep_orphan_veths(&self) {
626 let links = match crate::netlink::list_all_links().await {
627 Ok(links) => links,
628 Err(e) => {
629 tracing::warn!(error = %e, "Failed to list links for orphan sweep");
630 return;
631 }
632 };
633
634 for (_index, name) in links {
635 let remainder = if let Some(r) = name.strip_prefix("veth-") {
637 r
638 } else if let Some(r) = name.strip_prefix("vc-") {
639 r
640 } else {
641 continue;
642 };
643
644 let Some(pid_str) = remainder.split('-').next() else {
646 continue;
647 };
648
649 let pid: u32 = match pid_str.parse() {
650 Ok(p) => p,
651 Err(_) => continue,
652 };
653
654 if std::path::Path::new(&format!("/proc/{pid}")).exists() {
656 continue;
657 }
658
659 tracing::info!(link = %name, pid = pid, "Deleting orphan veth");
660 if let Err(e) = crate::netlink::delete_link_by_name(&name).await {
661 tracing::warn!(link = %name, error = %e, "Failed to delete orphan veth");
662 }
663 }
664 }
665
666 pub async fn teardown_service_overlay(&self, service_name: &str) {
672 if let Some(mut transport) = self.service_transports.write().await.remove(service_name) {
674 tracing::info!(service = %service_name, "Shutting down service overlay transport");
675 transport.shutdown();
676 }
677
678 if let Some(iface) = self.service_interfaces.write().await.remove(service_name) {
680 tracing::info!(
681 service = %service_name,
682 interface = %iface,
683 "Removed service overlay interface"
684 );
685 }
686 }
687
688 pub async fn cleanup(&mut self) -> Result<(), AgentError> {
693 let mut transports = self.service_transports.write().await;
695 for (name, mut transport) in transports.drain() {
696 tracing::info!(service = %name, "Shutting down service overlay");
697 transport.shutdown();
698 }
699 drop(transports);
700
701 if let Some(mut transport) = self.global_transport.take() {
703 tracing::info!("Shutting down global overlay");
704 transport.shutdown();
705 }
706
707 self.service_interfaces.write().await.clear();
709 self.global_interface = None;
710
711 Ok(())
712 }
713
714 pub fn node_ip(&self) -> Option<IpAddr> {
718 self.node_ip
719 }
720
721 pub fn deployment(&self) -> &str {
723 &self.deployment
724 }
725
726 pub fn global_interface(&self) -> Option<&str> {
728 self.global_interface.as_deref()
729 }
730
731 pub fn overlay_port(&self) -> u16 {
733 self.overlay_port
734 }
735
736 pub fn has_global_transport(&self) -> bool {
738 self.global_transport.is_some()
739 }
740
741 pub async fn service_transport_count(&self) -> usize {
743 self.service_transports.read().await.len()
744 }
745
746 pub fn overlay_cidr(&self) -> String {
748 match self.ip_allocator.base {
749 IpAddr::V4(_) => format!("{}/16", self.ip_allocator.base),
750 IpAddr::V6(_) => format!("{}/48", self.ip_allocator.base),
751 }
752 }
753
754 pub fn slice_cidr(&self) -> Option<IpNetwork> {
757 self.slice_cidr
758 }
759
760 pub fn cluster_cidr(&self) -> Option<IpNetwork> {
764 self.cluster_cidr
765 }
766
767 pub async fn persist_ipam_state(&self, path: &Path) -> Result<(), AgentError> {
776 self.ip_allocator.save(path).await
777 }
778
779 pub async fn restore_ipam_state(&mut self, path: &Path) -> Result<(), AgentError> {
789 self.ip_allocator.restore(path).await
790 }
791
792 pub fn ip_alloc_stats(&self) -> (u64, IpAddr) {
794 let offset = self
795 .ip_allocator
796 .next_offset
797 .load(std::sync::atomic::Ordering::SeqCst);
798 (offset.saturating_sub(1), self.ip_allocator.base)
799 }
800
801 #[allow(clippy::unused_self)]
802 fn build_config(
803 &self,
804 private_key: String,
805 public_key: String,
806 ip: IpAddr,
807 mask: u8,
808 listen_port: u16,
809 ) -> OverlayConfig {
810 let local_addr = match ip {
812 IpAddr::V4(_) => IpAddr::V4(Ipv4Addr::UNSPECIFIED),
813 IpAddr::V6(_) => IpAddr::V6(Ipv6Addr::UNSPECIFIED),
814 };
815 OverlayConfig {
816 local_endpoint: SocketAddr::new(local_addr, listen_port),
817 private_key,
818 public_key,
819 overlay_cidr: format!("{ip}/{mask}"),
820 ..OverlayConfig::default()
821 }
822 }
823}
824
825struct IpAllocator {
836 base: IpAddr,
840 cidr: IpNetwork,
843 next_offset: AtomicU64,
845}
846
847#[derive(Debug, Serialize, Deserialize)]
853struct IpAllocatorState {
854 cidr: String,
855 next_offset: u64,
856}
857
858impl IpAllocator {
859 fn new(cidr: IpNetwork) -> Self {
860 Self {
861 base: cidr.network(),
862 cidr,
863 next_offset: AtomicU64::new(1),
864 }
865 }
866
867 #[allow(clippy::cast_possible_truncation)]
868 fn compute_addr(&self, offset: u64) -> IpAddr {
869 match self.base {
870 IpAddr::V4(base_v4) => {
871 let base_u32 = u32::from_be_bytes(base_v4.octets());
872 let addr = base_u32.wrapping_add(offset as u32);
873 IpAddr::V4(Ipv4Addr::from(addr.to_be_bytes()))
874 }
875 IpAddr::V6(base_v6) => {
876 let base_u128 = u128::from(base_v6);
877 let addr = base_u128.wrapping_add(u128::from(offset));
878 IpAddr::V6(Ipv6Addr::from(addr))
879 }
880 }
881 }
882
883 fn allocate(&self) -> Result<IpAddr, AgentError> {
889 let offset = self.next_offset.fetch_add(1, Ordering::SeqCst);
893 let addr = self.compute_addr(offset);
894
895 let in_cidr = self.cidr.contains(addr);
898 let is_v4_broadcast = matches!(
899 (&self.cidr, &addr),
900 (IpNetwork::V4(v4), IpAddr::V4(a)) if *a == v4.broadcast()
901 );
902 if !in_cidr || is_v4_broadcast {
903 return Err(AgentError::Network(format!(
904 "IP allocator exhausted: next address {addr} is outside slice {}",
905 self.cidr
906 )));
907 }
908 Ok(addr)
909 }
910
911 fn allocate_for_service(&self, _service: &str) -> Result<IpAddr, AgentError> {
912 self.allocate()
913 }
914
915 async fn save(&self, path: &Path) -> Result<(), AgentError> {
917 let state = IpAllocatorState {
918 cidr: self.cidr.to_string(),
919 next_offset: self.next_offset.load(Ordering::SeqCst),
920 };
921 let json = serde_json::to_vec_pretty(&state)
922 .map_err(|e| AgentError::Network(format!("serialize ipam state: {e}")))?;
923 if let Some(parent) = path.parent() {
924 if !parent.as_os_str().is_empty() {
925 tokio::fs::create_dir_all(parent).await.map_err(|e| {
926 AgentError::Network(format!("create ipam state dir {}: {e}", parent.display()))
927 })?;
928 }
929 }
930 tokio::fs::write(path, json).await.map_err(|e| {
931 AgentError::Network(format!("write ipam state {}: {e}", path.display()))
932 })?;
933 Ok(())
934 }
935
936 async fn restore(&mut self, path: &Path) -> Result<(), AgentError> {
943 let raw = match tokio::fs::read_to_string(path).await {
944 Ok(s) => s,
945 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(()),
946 Err(e) => {
947 return Err(AgentError::Network(format!(
948 "read ipam state {}: {e}",
949 path.display()
950 )));
951 }
952 };
953 let state: IpAllocatorState = serde_json::from_str(&raw).map_err(|e| {
954 AgentError::Network(format!("parse ipam state {}: {e}", path.display()))
955 })?;
956
957 if state.cidr != self.cidr.to_string() {
958 tracing::warn!(
959 persisted_cidr = %state.cidr,
960 current_cidr = %self.cidr,
961 path = %path.display(),
962 "IPAM state CIDR mismatch; ignoring persisted counter"
963 );
964 return Ok(());
965 }
966
967 self.next_offset.store(state.next_offset, Ordering::SeqCst);
968 Ok(())
969 }
970
971 #[allow(dead_code)]
977 async fn load(path: &Path, cidr: IpNetwork) -> Result<Self, AgentError> {
978 let mut alloc = Self::new(cidr);
979 alloc.restore(path).await?;
980 Ok(alloc)
981 }
982}
983
984#[cfg(test)]
985mod tests {
986 use super::*;
987
988 #[test]
990 fn interface_name_never_exceeds_limit() {
991 let cases: Vec<(&[&str], &str)> = vec![
992 (&["a"], "g"),
993 (&["zlayer-manager"], "g"),
994 (&["my-very-long-deployment-name-that-goes-on-and-on"], "g"),
995 (&["zlayer", "manager"], "s"),
996 (&["zlayer-manager", "frontend-service"], "s"),
997 (&["a", "b"], "s"),
998 (
999 &["abcdefghijklmnopqrstuvwxyz", "abcdefghijklmnopqrstuvwxyz"],
1000 "s",
1001 ),
1002 (&["x"], ""),
1003 (&["deployment"], ""),
1004 (&["a-really-long-name-exceeding-everything"], "suffix"),
1005 ];
1006
1007 for (parts, suffix) in &cases {
1008 let name = make_interface_name(parts, suffix);
1009 assert!(
1010 name.len() <= MAX_IFNAME_LEN,
1011 "Name '{}' is {} chars (parts={:?}, suffix='{}')",
1012 name,
1013 name.len(),
1014 parts,
1015 suffix,
1016 );
1017 }
1018 }
1019
1020 #[test]
1022 fn interface_name_with_extreme_lengths() {
1023 let long = "a".repeat(200);
1024 let long_ref = long.as_str();
1025
1026 let name = make_interface_name(&[long_ref], "g");
1027 assert!(name.len() <= MAX_IFNAME_LEN, "Name '{name}' too long");
1028
1029 let name = make_interface_name(&[long_ref, long_ref, long_ref], "s");
1030 assert!(name.len() <= MAX_IFNAME_LEN, "Name '{name}' too long");
1031
1032 let name = make_interface_name(&[long_ref], "");
1033 assert!(name.len() <= MAX_IFNAME_LEN, "Name '{name}' too long");
1034 }
1035
1036 #[test]
1038 fn interface_name_with_empty_inputs() {
1039 let name = make_interface_name(&[""], "");
1040 assert!(name.len() <= MAX_IFNAME_LEN);
1041 assert!(name.starts_with("zl-"));
1042
1043 let name = make_interface_name(&["", ""], "s");
1044 assert!(name.len() <= MAX_IFNAME_LEN);
1045 assert!(name.starts_with("zl-"));
1046
1047 let name = make_interface_name(&[], "g");
1048 assert!(name.len() <= MAX_IFNAME_LEN);
1049 assert!(name.starts_with("zl-"));
1050 }
1051
1052 #[test]
1054 fn interface_name_is_deterministic() {
1055 let a = make_interface_name(&["zlayer-manager"], "g");
1056 let b = make_interface_name(&["zlayer-manager"], "g");
1057 assert_eq!(a, b);
1058
1059 let a = make_interface_name(&["deploy", "frontend"], "s");
1060 let b = make_interface_name(&["deploy", "frontend"], "s");
1061 assert_eq!(a, b);
1062 }
1063
1064 #[test]
1066 fn interface_name_uniqueness() {
1067 let a = make_interface_name(&["deploy-a"], "g");
1068 let b = make_interface_name(&["deploy-b"], "g");
1069 assert_ne!(a, b, "Different deployments should yield different names");
1070
1071 let a = make_interface_name(&["deploy", "svc-a"], "s");
1072 let b = make_interface_name(&["deploy", "svc-b"], "s");
1073 assert_ne!(a, b, "Different services should yield different names");
1074
1075 let a = make_interface_name(&["deploy"], "g");
1076 let b = make_interface_name(&["deploy"], "s");
1077 assert_ne!(a, b, "Different suffixes should yield different names");
1078 }
1079
1080 #[test]
1082 fn interface_name_short_inputs_are_readable() {
1083 let name = make_interface_name(&["app"], "g");
1085 assert_eq!(name, "zl-app-g");
1086
1087 let name = make_interface_name(&["my", "web"], "s");
1089 assert_eq!(name, "zl-my-web-s");
1090 }
1091
1092 #[test]
1094 fn global_overlay_realistic_names() {
1095 let deployments = [
1096 "zlayer-manager",
1097 "my-very-long-deployment-name",
1098 "a",
1099 "production",
1100 "zlayer",
1101 ];
1102
1103 for deployment in &deployments {
1104 let name = make_interface_name(&[deployment], "g");
1105 assert!(
1106 name.len() <= MAX_IFNAME_LEN,
1107 "Global overlay '{name}' for deployment '{deployment}' exceeds limit",
1108 );
1109 assert!(name.starts_with("zl-"));
1110 }
1111 }
1112
1113 #[test]
1115 fn service_overlay_realistic_names() {
1116 let cases = [
1117 ("zlayer-manager", "frontend"),
1118 ("zlayer-manager", "backend-api"),
1119 ("zlayer", "manager"),
1120 ("a", "b"),
1121 ("production", "auth-service-primary"),
1122 ("my-long-deploy", "my-long-service"),
1123 ];
1124
1125 for (deployment, service) in &cases {
1126 let name = make_interface_name(&[deployment, service], "s");
1127 assert!(
1128 name.len() <= MAX_IFNAME_LEN,
1129 "Service overlay '{name}' for ({deployment}, {service}) exceeds limit",
1130 );
1131 assert!(name.starts_with("zl-"));
1132 }
1133 }
1134
1135 #[test]
1137 fn interface_name_with_unicode() {
1138 let name = make_interface_name(&["\u{1F600}\u{1F600}\u{1F600}"], "g");
1139 assert!(name.len() <= MAX_IFNAME_LEN, "Name '{name}' too long");
1140
1141 let name = make_interface_name(&["\u{00E9}\u{00E9}\u{00E9}", "\u{00FC}\u{00FC}"], "s");
1142 assert!(name.len() <= MAX_IFNAME_LEN, "Name '{name}' too long");
1143 }
1144
1145 #[tokio::test]
1147 async fn test_node_ip_before_and_after_init() {
1148 let om = OverlayManager::new("test-deploy".to_string())
1149 .await
1150 .unwrap();
1151
1152 assert!(
1154 om.node_ip().is_none(),
1155 "node_ip should be None before setup_global_overlay"
1156 );
1157 }
1158
1159 #[test]
1161 fn ip_allocator_v4_sequential() {
1162 let alloc = IpAllocator::new("10.200.0.0/16".parse().unwrap());
1163 let ip1 = alloc.allocate().unwrap();
1164 let ip2 = alloc.allocate().unwrap();
1165 let ip3 = alloc.allocate().unwrap();
1166 assert_eq!(ip1, IpAddr::V4(Ipv4Addr::new(10, 200, 0, 1)));
1167 assert_eq!(ip2, IpAddr::V4(Ipv4Addr::new(10, 200, 0, 2)));
1168 assert_eq!(ip3, IpAddr::V4(Ipv4Addr::new(10, 200, 0, 3)));
1169 }
1170
1171 #[test]
1173 fn ip_allocator_v6_sequential() {
1174 let alloc = IpAllocator::new("fd00:200::0/48".parse().unwrap());
1175 let ip1 = alloc.allocate().unwrap();
1176 let ip2 = alloc.allocate().unwrap();
1177 let ip3 = alloc.allocate().unwrap();
1178 assert_eq!(ip1, "fd00:200::1".parse::<IpAddr>().unwrap());
1179 assert_eq!(ip2, "fd00:200::2".parse::<IpAddr>().unwrap());
1180 assert_eq!(ip3, "fd00:200::3".parse::<IpAddr>().unwrap());
1181 }
1182
1183 #[test]
1185 fn ip_allocator_service_delegates() {
1186 let alloc = IpAllocator::new("fd00:200::0/48".parse().unwrap());
1187 let ip1 = alloc.allocate_for_service("web").unwrap();
1188 let ip2 = alloc.allocate().unwrap();
1189 assert_eq!(ip1, "fd00:200::1".parse::<IpAddr>().unwrap());
1190 assert_eq!(ip2, "fd00:200::2".parse::<IpAddr>().unwrap());
1191 }
1192
1193 #[test]
1196 fn test_allocator_bounded_to_slice_v4() {
1197 let slice: IpNetwork = "10.200.42.0/28".parse().unwrap();
1198 let alloc = IpAllocator::new(slice);
1199
1200 let mut allocated = Vec::new();
1201 for _ in 0..14 {
1202 let ip = alloc
1203 .allocate()
1204 .expect("first 14 allocations should succeed");
1205 allocated.push(ip);
1206 }
1207
1208 for ip in &allocated {
1210 assert!(
1211 slice.contains(*ip),
1212 "Allocated IP {ip} outside slice {slice}"
1213 );
1214 }
1215
1216 let exhausted = alloc.allocate();
1218 assert!(
1219 exhausted.is_err(),
1220 "allocation past /28 exhaustion should fail, got {exhausted:?}"
1221 );
1222 }
1223
1224 #[test]
1227 fn test_allocator_rejects_oob() {
1228 let slice: IpNetwork = "10.200.42.16/28".parse().unwrap();
1229 let alloc = IpAllocator::new(slice);
1230
1231 for _ in 0..14 {
1234 let ip = alloc.allocate().expect("host allocation should succeed");
1235 assert!(slice.contains(ip), "Allocation {ip} escaped slice {slice}");
1236 if let (IpAddr::V4(a), IpNetwork::V4(v4)) = (ip, slice) {
1238 assert_ne!(a, v4.broadcast(), "handed out broadcast address");
1239 assert_ne!(a, v4.network(), "handed out network address");
1240 }
1241 }
1242
1243 assert!(alloc.allocate().is_err());
1245 }
1246
1247 #[test]
1249 fn test_overlay_manager_with_slice_stores_slice_cidr() {
1250 let cluster: IpNetwork = "10.200.0.0/16".parse().unwrap();
1251 let slice: IpNetwork = "10.200.42.0/28".parse().unwrap();
1252
1253 let om = OverlayManager::with_slice("test-deploy".to_string(), cluster, slice, 51820);
1254
1255 assert_eq!(om.slice_cidr(), Some(slice));
1256 assert_eq!(om.cluster_cidr(), Some(cluster));
1257 assert_eq!(om.overlay_port(), 51820);
1258 assert_eq!(om.deployment(), "test-deploy");
1259 }
1260
1261 #[tokio::test]
1264 async fn test_allocator_persistence_roundtrip() {
1265 let slice: IpNetwork = "10.200.42.0/28".parse().unwrap();
1266 let alloc = IpAllocator::new(slice);
1267
1268 let a1 = alloc.allocate().unwrap();
1269 let a2 = alloc.allocate().unwrap();
1270 let a3 = alloc.allocate().unwrap();
1271 assert_eq!(a1, IpAddr::V4(Ipv4Addr::new(10, 200, 42, 1)));
1272 assert_eq!(a2, IpAddr::V4(Ipv4Addr::new(10, 200, 42, 2)));
1273 assert_eq!(a3, IpAddr::V4(Ipv4Addr::new(10, 200, 42, 3)));
1274
1275 let dir = tempfile::tempdir().expect("tempdir");
1276 let state_path = dir.path().join("agent_ipam.json");
1277 alloc.save(&state_path).await.expect("save");
1278
1279 let restored = IpAllocator::load(&state_path, slice).await.expect("load");
1280 let a4 = restored.allocate().unwrap();
1281 assert_eq!(
1282 a4,
1283 IpAddr::V4(Ipv4Addr::new(10, 200, 42, 4)),
1284 "restored allocator should continue from the persisted counter"
1285 );
1286
1287 let missing_path = dir.path().join("does-not-exist.json");
1289 let mut fresh = IpAllocator::new(slice);
1290 fresh.restore(&missing_path).await.expect("restore missing");
1291 let first = fresh.allocate().unwrap();
1292 assert_eq!(first, IpAddr::V4(Ipv4Addr::new(10, 200, 42, 1)));
1293 }
1294
1295 #[cfg(target_os = "windows")]
1299 #[tokio::test]
1300 async fn test_attach_detach_container_hcn_tracks_cleanup_map() {
1301 let cluster: IpNetwork = "10.200.0.0/16".parse().unwrap();
1302 let slice: IpNetwork = "10.200.42.0/28".parse().unwrap();
1303 let om = OverlayManager::with_slice("test-deploy".to_string(), cluster, slice, 51820);
1304
1305 let ns = windows::core::GUID::zeroed();
1306 let fixed_ip: std::net::IpAddr = "10.200.42.5".parse().unwrap();
1307
1308 let ip = om
1310 .attach_container_hcn(ns, "svc-a", Some(fixed_ip), true, None, None)
1311 .await
1312 .expect("attach_container_hcn");
1313 assert_eq!(ip, fixed_ip);
1314 {
1315 let map = om.hcn_cleanup.lock().await;
1316 assert_eq!(map.len(), 1);
1317 let entry = map.get(&ns).expect("entry for zeroed GUID");
1318 assert_eq!(entry.0, "svc-a");
1319 assert_eq!(entry.1, fixed_ip);
1320 }
1321
1322 om.detach_container_hcn(ns).await.expect("detach");
1324 {
1325 let map = om.hcn_cleanup.lock().await;
1326 assert!(map.is_empty(), "detach should leave the cleanup map empty");
1327 }
1328
1329 om.detach_container_hcn(ns)
1331 .await
1332 .expect("unknown GUID is no-op");
1333
1334 let _ip = om
1336 .attach_container_hcn(ns, "svc-b", Some(fixed_ip), false, None, None)
1337 .await
1338 .expect("attach without autoclean");
1339 {
1340 let map = om.hcn_cleanup.lock().await;
1341 assert!(map.is_empty(), "autoclean=false should not populate map");
1342 }
1343 }
1344
1345 #[tokio::test]
1348 async fn dns_config_defaults_to_none() {
1349 let om = OverlayManager::new("dns-default".to_string())
1350 .await
1351 .expect("OverlayManager::new");
1352 assert!(om.dns_server_addr().is_none());
1353 assert!(om.dns_domain().is_none());
1354 }
1355
1356 #[tokio::test]
1359 async fn dns_config_set_and_round_trip() {
1360 let mut om = OverlayManager::new("dns-roundtrip".to_string())
1361 .await
1362 .expect("OverlayManager::new");
1363 let addr: SocketAddr = "10.200.42.1:15353".parse().unwrap();
1364 om.set_dns_config(Some(addr), Some("overlay.local".to_string()));
1365 assert_eq!(om.dns_server_addr(), Some(addr));
1366 assert_eq!(om.dns_domain(), Some("overlay.local"));
1367
1368 om.set_dns_config(None, None);
1370 assert!(om.dns_server_addr().is_none());
1371 assert!(om.dns_domain().is_none());
1372 }
1373
1374 #[test]
1377 fn with_dns_config_preserves_values() {
1378 let cluster: IpNetwork = "10.200.0.0/16".parse().unwrap();
1379 let slice: IpNetwork = "10.200.42.0/28".parse().unwrap();
1380 let addr: SocketAddr = "10.200.42.1:15353".parse().unwrap();
1381 let om = OverlayManager::with_slice("dns-builder".to_string(), cluster, slice, 51820)
1382 .with_dns_config(Some(addr), Some("overlay.local".to_string()));
1383 assert_eq!(om.dns_server_addr(), Some(addr));
1384 assert_eq!(om.dns_domain(), Some("overlay.local"));
1385 }
1386}