Skip to main content

zlayer_overlay/
allocator.rs

1//! IP address allocation for overlay networks
2//!
3//! Manages allocation and tracking of overlay IP addresses within a CIDR range.
4//! Supports both IPv4 and IPv6 (dual-stack) networks.
5
6use crate::error::{OverlayError, Result};
7use ipnet::{IpNet, Ipv4Net, Ipv6Net};
8use serde::{Deserialize, Serialize};
9use std::collections::{HashMap, HashSet};
10use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
11use std::path::Path;
12
13/// IP allocator for overlay network addresses
14///
15/// Tracks allocated IP addresses and provides next-available allocation
16/// from a configured CIDR range. Supports both IPv4 and IPv6 networks.
17#[derive(Debug, Clone)]
18pub struct IpAllocator {
19    /// Network CIDR range (IPv4 or IPv6)
20    network: IpNet,
21    /// Set of allocated IP addresses
22    allocated: HashSet<IpAddr>,
23}
24
25/// Persistent state for IP allocator
26#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct IpAllocatorState {
28    /// CIDR string
29    pub cidr: String,
30    /// List of allocated IPs (serializes as strings, backward-compatible)
31    pub allocated: Vec<IpAddr>,
32}
33
34/// Increment an IPv6 address by a u128 offset from a base address.
35///
36/// Returns `None` if the result would overflow.
37fn ipv6_add(base: Ipv6Addr, offset: u128) -> Option<Ipv6Addr> {
38    let base_u128 = u128::from(base);
39    base_u128.checked_add(offset).map(Ipv6Addr::from)
40}
41
42/// Compute the number of host addresses for a given address family and prefix length.
43///
44/// For IPv4: `2^(32 - prefix) - 2` (excludes network and broadcast).
45/// For IPv6: `2^(128 - prefix) - 1` (excludes the network address).
46///
47/// Returns `None` if the result overflows u128 (only for /0 edge cases).
48fn host_count(is_ipv6: bool, prefix_len: u8) -> u128 {
49    if is_ipv6 {
50        let bits = 128 - u32::from(prefix_len);
51        if bits == 128 {
52            // /0 network — saturate
53            u128::MAX
54        } else if bits == 0 {
55            // /128 — single host, no usable addresses (it IS the network address)
56            0
57        } else {
58            // 2^bits - 1 (skip network address)
59            (1u128 << bits) - 1
60        }
61    } else {
62        let bits = 32 - u32::from(prefix_len);
63        if bits <= 1 {
64            // /31 or /32 — no usable hosts in classical networking
65            0
66        } else {
67            // 2^bits - 2 (skip network and broadcast)
68            (1u128 << bits) - 2
69        }
70    }
71}
72
73impl IpAllocator {
74    /// Create a new IP allocator for the given CIDR range
75    ///
76    /// Supports both IPv4 (e.g., "10.200.0.0/16") and IPv6 (e.g., `fd00::/48`).
77    ///
78    /// # Arguments
79    /// * `cidr` - Network CIDR notation
80    ///
81    /// # Errors
82    ///
83    /// Returns `OverlayError::InvalidCidr` if the CIDR string cannot be parsed.
84    ///
85    /// # Example
86    /// ```
87    /// use zlayer_overlay::allocator::IpAllocator;
88    ///
89    /// let v4 = IpAllocator::new("10.200.0.0/16").unwrap();
90    /// let v6 = IpAllocator::new("fd00::/48").unwrap();
91    /// ```
92    pub fn new(cidr: &str) -> Result<Self> {
93        let network: IpNet = cidr
94            .parse()
95            .map_err(|e| OverlayError::InvalidCidr(format!("{cidr}: {e}")))?;
96
97        Ok(Self {
98            network,
99            allocated: HashSet::new(),
100        })
101    }
102
103    /// Create an allocator from persisted state
104    ///
105    /// # Errors
106    ///
107    /// Returns an error if the CIDR is invalid or any IP is out of range.
108    pub fn from_state(state: IpAllocatorState) -> Result<Self> {
109        let mut allocator = Self::new(&state.cidr)?;
110        for ip in state.allocated {
111            allocator.mark_allocated(ip)?;
112        }
113        Ok(allocator)
114    }
115
116    /// Get the current state for persistence
117    #[must_use]
118    pub fn to_state(&self) -> IpAllocatorState {
119        IpAllocatorState {
120            cidr: self.network.to_string(),
121            allocated: self.allocated.iter().copied().collect(),
122        }
123    }
124
125    /// Load allocator state from a file
126    ///
127    /// # Errors
128    ///
129    /// Returns an error if the file cannot be read or the state is invalid.
130    pub async fn load(path: &Path) -> Result<Self> {
131        let contents = tokio::fs::read_to_string(path).await?;
132        let state: IpAllocatorState = serde_json::from_str(&contents)?;
133        Self::from_state(state)
134    }
135
136    /// Save allocator state to a file
137    ///
138    /// # Errors
139    ///
140    /// Returns an error if the file cannot be written or serialization fails.
141    pub async fn save(&self, path: &Path) -> Result<()> {
142        let state = self.to_state();
143        let contents = serde_json::to_string_pretty(&state)?;
144        tokio::fs::write(path, contents).await?;
145        Ok(())
146    }
147
148    /// Allocate the next available IP address
149    ///
150    /// For IPv4, skips the network and broadcast addresses.
151    /// For IPv6, skips the network address.
152    ///
153    /// Returns `None` if all addresses in the CIDR range are allocated.
154    ///
155    /// # Example
156    /// ```
157    /// use zlayer_overlay::allocator::IpAllocator;
158    ///
159    /// let mut allocator = IpAllocator::new("10.200.0.0/24").unwrap();
160    /// let ip = allocator.allocate().unwrap();
161    /// assert_eq!(ip.to_string(), "10.200.0.1");
162    /// ```
163    pub fn allocate(&mut self) -> Option<IpAddr> {
164        match self.network {
165            IpNet::V4(v4net) => {
166                // IPv4: iterate hosts() which skips network and broadcast
167                for ip in v4net.hosts() {
168                    let addr = IpAddr::V4(ip);
169                    if !self.allocated.contains(&addr) {
170                        self.allocated.insert(addr);
171                        return Some(addr);
172                    }
173                }
174                None
175            }
176            IpNet::V6(v6net) => {
177                // IPv6: counter-based allocation starting from base+1
178                // We skip the network address itself (offset 0) and allocate from offset 1.
179                let base = v6net.network();
180                let total = host_count(true, v6net.prefix_len());
181
182                for offset in 1..=total {
183                    if let Some(candidate) = ipv6_add(base, offset) {
184                        let addr = IpAddr::V6(candidate);
185                        if !self.allocated.contains(&addr) {
186                            self.allocated.insert(addr);
187                            return Some(addr);
188                        }
189                    } else {
190                        break;
191                    }
192                }
193                None
194            }
195        }
196    }
197
198    /// Allocate a specific IP address
199    ///
200    /// # Errors
201    ///
202    /// Returns an error if the IP is already allocated or not in the CIDR range.
203    pub fn allocate_specific(&mut self, ip: IpAddr) -> Result<()> {
204        if !self.network.contains(&ip) {
205            return Err(OverlayError::IpNotInRange(ip, self.network.to_string()));
206        }
207
208        if self.allocated.contains(&ip) {
209            return Err(OverlayError::IpAlreadyAllocated(ip));
210        }
211
212        self.allocated.insert(ip);
213        Ok(())
214    }
215
216    /// Allocate the first usable IP in the range (typically for the leader)
217    ///
218    /// # Example
219    /// ```
220    /// use zlayer_overlay::allocator::IpAllocator;
221    ///
222    /// let mut allocator = IpAllocator::new("10.200.0.0/24").unwrap();
223    /// let ip = allocator.allocate_first().unwrap();
224    /// assert_eq!(ip.to_string(), "10.200.0.1");
225    /// ```
226    ///
227    /// # Errors
228    ///
229    /// Returns an error if no IPs are available or the first IP is already allocated.
230    pub fn allocate_first(&mut self) -> Result<IpAddr> {
231        let first_ip = self.first_host().ok_or(OverlayError::NoAvailableIps)?;
232
233        if self.allocated.contains(&first_ip) {
234            return Err(OverlayError::IpAlreadyAllocated(first_ip));
235        }
236
237        self.allocated.insert(first_ip);
238        Ok(first_ip)
239    }
240
241    /// Get the first usable host address in the network.
242    ///
243    /// For IPv4: first host from `hosts()` (skips network address).
244    /// For IPv6: network address + 1 (skips the network address).
245    fn first_host(&self) -> Option<IpAddr> {
246        match self.network {
247            IpNet::V4(v4net) => v4net.hosts().next().map(IpAddr::V4),
248            IpNet::V6(v6net) => {
249                let base = v6net.network();
250                ipv6_add(base, 1).map(IpAddr::V6)
251            }
252        }
253    }
254
255    /// Mark an IP address as allocated (for restoring state)
256    ///
257    /// # Errors
258    ///
259    /// Returns an error if the IP is not in the CIDR range.
260    pub fn mark_allocated(&mut self, ip: IpAddr) -> Result<()> {
261        if !self.network.contains(&ip) {
262            return Err(OverlayError::IpNotInRange(ip, self.network.to_string()));
263        }
264        self.allocated.insert(ip);
265        Ok(())
266    }
267
268    /// Release an IP address back to the pool
269    ///
270    /// Returns `true` if the IP was released, `false` if it wasn't allocated.
271    pub fn release(&mut self, ip: IpAddr) -> bool {
272        self.allocated.remove(&ip)
273    }
274
275    /// Check if an IP address is allocated
276    #[must_use]
277    pub fn is_allocated(&self, ip: IpAddr) -> bool {
278        self.allocated.contains(&ip)
279    }
280
281    /// Check if an IP address is within the CIDR range
282    #[must_use]
283    pub fn contains(&self, ip: IpAddr) -> bool {
284        self.network.contains(&ip)
285    }
286
287    /// Get the number of allocated addresses
288    #[must_use]
289    pub fn allocated_count(&self) -> usize {
290        self.allocated.len()
291    }
292
293    /// Get the total number of usable addresses in the range
294    ///
295    /// For IPv6 networks with large host spaces, this saturates at `u32::MAX`.
296    #[must_use]
297    #[allow(clippy::cast_possible_truncation)]
298    pub fn total_hosts(&self) -> u32 {
299        let is_v6 = matches!(self.network, IpNet::V6(_));
300        let count = host_count(is_v6, self.network.prefix_len());
301        // Saturate to u32::MAX for enormous IPv6 subnets
302        if count > u128::from(u32::MAX) {
303            u32::MAX
304        } else {
305            count as u32
306        }
307    }
308
309    /// Get the number of available addresses
310    #[must_use]
311    #[allow(clippy::cast_possible_truncation)]
312    pub fn available_count(&self) -> u32 {
313        self.total_hosts()
314            .saturating_sub(self.allocated.len() as u32)
315    }
316
317    /// Get the CIDR string
318    #[must_use]
319    pub fn cidr(&self) -> String {
320        self.network.to_string()
321    }
322
323    /// Get the network address
324    #[must_use]
325    pub fn network_addr(&self) -> IpAddr {
326        self.network.network()
327    }
328
329    /// Get the broadcast address
330    ///
331    /// For IPv6, returns the last address in the range (all host bits set to 1).
332    #[must_use]
333    pub fn broadcast_addr(&self) -> IpAddr {
334        self.network.broadcast()
335    }
336
337    /// Get the prefix length
338    #[must_use]
339    pub fn prefix_len(&self) -> u8 {
340        self.network.prefix_len()
341    }
342
343    /// Get the host prefix length (32 for IPv4, 128 for IPv6)
344    #[must_use]
345    pub fn host_prefix_len(&self) -> u8 {
346        self.network.max_prefix_len()
347    }
348
349    /// Get all allocated IPs
350    #[must_use]
351    pub fn allocated_ips(&self) -> Vec<IpAddr> {
352        self.allocated.iter().copied().collect()
353    }
354}
355
356/// Leader-side allocator that carves per-node slices out of a cluster CIDR.
357///
358/// Used to fix the latent IP-collision bug where every agent independently
359/// allocated container IPs from the full cluster `/16`. With a `NodeSliceAllocator`
360/// the leader hands each joining node its own non-overlapping slice, and the
361/// agent-local `IpAllocator` is bounded to that slice.
362///
363/// Slice assignment is deterministic within a leader process: the node ID hashes
364/// to a candidate slice index; collisions are resolved by linear probing forward
365/// until a free slot is found. Existing assignments are preserved across leader
366/// restart via `snapshot()` / `restore()`.
367#[derive(Debug, Clone)]
368pub struct NodeSliceAllocator {
369    cluster_cidr: IpNet,
370    slice_prefix: u8,
371    assigned: HashMap<String, IpNet>,
372}
373
374/// Persistent snapshot of a `NodeSliceAllocator` for raft/disk persistence.
375#[derive(Debug, Clone, Serialize, Deserialize)]
376pub struct NodeSliceAllocatorSnapshot {
377    pub cluster_cidr: String,
378    pub slice_prefix: u8,
379    pub assigned: Vec<(String, String)>,
380}
381
382/// Deterministic FNV-1a 64-bit hash for a node ID string.
383///
384/// Chosen over `DefaultHasher` because `DefaultHasher` is seeded per-process
385/// and slice assignments should be reproducible from a snapshot.
386fn hash_node_id(node_id: &str) -> u64 {
387    const FNV_OFFSET: u64 = 0xcbf2_9ce4_8422_2325;
388    const FNV_PRIME: u64 = 0x0000_0100_0000_01b3;
389    let mut hash = FNV_OFFSET;
390    for &b in node_id.as_bytes() {
391        hash ^= u64::from(b);
392        hash = hash.wrapping_mul(FNV_PRIME);
393    }
394    hash
395}
396
397impl NodeSliceAllocator {
398    /// Create a new slice allocator that carves `/slice_prefix`-sized slices
399    /// out of `cluster_cidr`.
400    ///
401    /// # Errors
402    ///
403    /// Returns `OverlayError::InvalidCidr` if `slice_prefix` is not strictly
404    /// more specific than `cluster_cidr.prefix_len()`, or if it exceeds the
405    /// address family's maximum prefix length.
406    pub fn new(cluster_cidr: IpNet, slice_prefix: u8) -> Result<Self> {
407        if slice_prefix <= cluster_cidr.prefix_len() {
408            return Err(OverlayError::InvalidCidr(format!(
409                "slice prefix /{} must be more specific than cluster prefix /{}",
410                slice_prefix,
411                cluster_cidr.prefix_len()
412            )));
413        }
414        if slice_prefix > cluster_cidr.max_prefix_len() {
415            return Err(OverlayError::InvalidCidr(format!(
416                "slice prefix /{} exceeds address family max /{}",
417                slice_prefix,
418                cluster_cidr.max_prefix_len()
419            )));
420        }
421        Ok(Self {
422            cluster_cidr,
423            slice_prefix,
424            assigned: HashMap::new(),
425        })
426    }
427
428    /// Assign (or return an existing) slice for `node_id`.
429    ///
430    /// Idempotent: calling `assign` with a node ID that already has a slice
431    /// returns the existing slice without re-assigning.
432    ///
433    /// # Errors
434    ///
435    /// Returns `OverlayError::NoAvailableIps` if every slice in the cluster
436    /// CIDR is already assigned.
437    pub fn assign(&mut self, node_id: &str) -> Result<IpNet> {
438        if let Some(existing) = self.assigned.get(node_id) {
439            return Ok(*existing);
440        }
441
442        let num_slices = self.num_slices();
443        if num_slices == 0 {
444            return Err(OverlayError::NoAvailableIps);
445        }
446
447        let taken: HashSet<IpNet> = self.assigned.values().copied().collect();
448        let start = hash_node_id(node_id) % num_slices;
449
450        for i in 0..num_slices {
451            let idx = (start + i) % num_slices;
452            let slice = self.slice_at_index(idx);
453            if !taken.contains(&slice) {
454                self.assigned.insert(node_id.to_string(), slice);
455                return Ok(slice);
456            }
457        }
458
459        Err(OverlayError::NoAvailableIps)
460    }
461
462    /// Release `node_id`'s slice back to the free pool.
463    ///
464    /// Returns `true` if a slice was released, `false` if the node was not assigned.
465    pub fn release(&mut self, node_id: &str) -> bool {
466        self.assigned.remove(node_id).is_some()
467    }
468
469    /// Look up a node's assigned slice without mutating state.
470    #[must_use]
471    pub fn slice_for(&self, node_id: &str) -> Option<IpNet> {
472        self.assigned.get(node_id).copied()
473    }
474
475    /// Number of currently-assigned slices.
476    #[must_use]
477    pub fn assigned_count(&self) -> usize {
478        self.assigned.len()
479    }
480
481    /// Total number of slices the cluster CIDR can hold at the configured slice prefix.
482    #[must_use]
483    pub fn capacity(&self) -> u64 {
484        self.num_slices()
485    }
486
487    /// Cluster CIDR the allocator operates over.
488    #[must_use]
489    pub fn cluster_cidr(&self) -> IpNet {
490        self.cluster_cidr
491    }
492
493    /// Slice prefix length (e.g. `28` for `/28` slices).
494    #[must_use]
495    pub fn slice_prefix(&self) -> u8 {
496        self.slice_prefix
497    }
498
499    /// Build a persistable snapshot for durable leader state.
500    #[must_use]
501    pub fn snapshot(&self) -> NodeSliceAllocatorSnapshot {
502        NodeSliceAllocatorSnapshot {
503            cluster_cidr: self.cluster_cidr.to_string(),
504            slice_prefix: self.slice_prefix,
505            assigned: self
506                .assigned
507                .iter()
508                .map(|(k, v)| (k.clone(), v.to_string()))
509                .collect(),
510        }
511    }
512
513    /// Rebuild an allocator from a snapshot.
514    ///
515    /// # Errors
516    ///
517    /// Returns `OverlayError::InvalidCidr` if the snapshot's CIDR or any
518    /// assigned slice fails to parse, or if the slice prefix is inconsistent.
519    pub fn restore(snapshot: NodeSliceAllocatorSnapshot) -> Result<Self> {
520        let cluster_cidr: IpNet = snapshot
521            .cluster_cidr
522            .parse()
523            .map_err(|e| OverlayError::InvalidCidr(format!("{}: {e}", snapshot.cluster_cidr)))?;
524        let mut allocator = Self::new(cluster_cidr, snapshot.slice_prefix)?;
525        for (node_id, slice_str) in snapshot.assigned {
526            let slice: IpNet = slice_str
527                .parse()
528                .map_err(|e| OverlayError::InvalidCidr(format!("{slice_str}: {e}")))?;
529            if slice.prefix_len() != snapshot.slice_prefix {
530                return Err(OverlayError::InvalidCidr(format!(
531                    "assigned slice {slice} does not match configured prefix /{}",
532                    snapshot.slice_prefix
533                )));
534            }
535            if !cluster_cidr.contains(&slice.network()) {
536                return Err(OverlayError::InvalidCidr(format!(
537                    "assigned slice {slice} is not contained in cluster CIDR {cluster_cidr}"
538                )));
539            }
540            allocator.assigned.insert(node_id, slice);
541        }
542        Ok(allocator)
543    }
544
545    fn num_slices(&self) -> u64 {
546        let bits = self.slice_prefix - self.cluster_cidr.prefix_len();
547        // bits is in 1..=32 for v4 or 1..=128 for v6. For a /16 cluster with /28
548        // slices, bits = 12 → 4096 slices, safely inside u64 range.
549        if bits >= 64 {
550            u64::MAX
551        } else {
552            1u64 << bits
553        }
554    }
555
556    fn slice_at_index(&self, idx: u64) -> IpNet {
557        let shift = u32::from(self.cluster_cidr.max_prefix_len() - self.slice_prefix);
558        match self.cluster_cidr {
559            IpNet::V4(v4) => {
560                let base = u32::from(v4.network());
561                // idx fits in 32 bits whenever slice_prefix − cluster_prefix ≤ 32.
562                #[allow(clippy::cast_possible_truncation)]
563                let offset = (idx as u32).wrapping_shl(shift);
564                let slice_addr = Ipv4Addr::from(base.wrapping_add(offset));
565                IpNet::V4(
566                    Ipv4Net::new(slice_addr, self.slice_prefix)
567                        .expect("slice_prefix validated in constructor"),
568                )
569            }
570            IpNet::V6(v6) => {
571                let base = u128::from(v6.network());
572                let offset = u128::from(idx).wrapping_shl(shift);
573                let slice_addr = Ipv6Addr::from(base.wrapping_add(offset));
574                IpNet::V6(
575                    Ipv6Net::new(slice_addr, self.slice_prefix)
576                        .expect("slice_prefix validated in constructor"),
577                )
578            }
579        }
580    }
581}
582
583/// Tracks per-service-per-node subnet assignments carved from the cluster
584/// CIDR. Each `(service_name, node_id)` pair gets its own slice of size
585/// `slice_prefix` (default `/26`). Assignments are deterministic — the same
586/// `(service, node)` pair always maps to the same starting slot via FNV
587/// hash, with linear probing on collision. Mirrors `NodeSliceAllocator`'s
588/// pattern; see that type for the rationale (in particular the choice of
589/// FNV over `DefaultHasher` for cross-process reproducibility).
590///
591/// Snapshot/restore is wired the same way `NodeSliceAllocator` does it, so
592/// the scheduler's Raft state can persist + replay assignments. The
593/// snapshot's `Vec<((String, String), IpNet)>` is the wire-stable shape:
594/// avoid `HashMap` here because non-deterministic map ordering would yield
595/// unstable serialized bytes under postcard/serde.
596///
597/// Node IDs are stored as `String` (matching `NodeSliceAllocator`); the
598/// scheduler converts its own `NodeId` to/from `String` at the boundary.
599#[derive(Debug, Clone)]
600pub struct ServiceSubnetRegistry {
601    cluster_cidr: IpNet,
602    slice_prefix: u8,
603    /// Map from `(service_name, node_id)` -> assigned slice.
604    assignments: HashMap<(String, String), IpNet>,
605    /// IPs that no assigned slice may contain — chiefly the node's own overlay
606    /// IP (where the overlay DNS server listens on `<node_ip>:53`). If a
607    /// per-service bridge subnet contained the node IP, that IP would look
608    /// on-link to the bridge's containers and they would ARP for it on the
609    /// bridge (where nothing answers), black-holing DNS. Not persisted in the
610    /// snapshot — re-applied from the live node IP by the daemon on startup.
611    reserved: Vec<IpAddr>,
612}
613
614/// Persistent snapshot of a `ServiceSubnetRegistry` for raft/disk persistence.
615///
616/// Uses a `Vec` of pairs (rather than a `HashMap`) so the serialized byte
617/// layout is deterministic when Raft replicates / snapshots this state.
618#[derive(Debug, Clone, Serialize, Deserialize)]
619pub struct ServiceSubnetRegistrySnapshot {
620    pub cluster_cidr: IpNet,
621    pub slice_prefix: u8,
622    pub assignments: Vec<((String, String), IpNet)>,
623}
624
625/// Deterministic FNV-1a 64-bit hash over a `(service, node)` pair.
626///
627/// Uses the same FNV constants as `hash_node_id` so the two allocators have
628/// matching reproducibility guarantees. The pair is hashed by feeding the
629/// service bytes, a single `0x1f` (ASCII unit-separator) delimiter, then
630/// the node bytes — the delimiter prevents the pair `("ab", "c")` from
631/// hashing identically to `("a", "bc")`.
632fn hash_service_node(service: &str, node: &str) -> u64 {
633    const FNV_OFFSET: u64 = 0xcbf2_9ce4_8422_2325;
634    const FNV_PRIME: u64 = 0x0000_0100_0000_01b3;
635    let mut hash = FNV_OFFSET;
636    for &b in service.as_bytes() {
637        hash ^= u64::from(b);
638        hash = hash.wrapping_mul(FNV_PRIME);
639    }
640    hash ^= 0x1f_u64;
641    hash = hash.wrapping_mul(FNV_PRIME);
642    for &b in node.as_bytes() {
643        hash ^= u64::from(b);
644        hash = hash.wrapping_mul(FNV_PRIME);
645    }
646    hash
647}
648
649impl ServiceSubnetRegistry {
650    /// Create a new service subnet registry that carves `/slice_prefix`-sized
651    /// slices out of `cluster_cidr`.
652    ///
653    /// # Errors
654    ///
655    /// Returns `OverlayError::InvalidCidr` if `slice_prefix` is not strictly
656    /// more specific than `cluster_cidr.prefix_len()`, or if it exceeds the
657    /// address family's maximum prefix length.
658    pub fn new(cluster_cidr: IpNet, slice_prefix: u8) -> Result<Self> {
659        if slice_prefix <= cluster_cidr.prefix_len() {
660            return Err(OverlayError::InvalidCidr(format!(
661                "slice prefix /{} must be more specific than cluster prefix /{}",
662                slice_prefix,
663                cluster_cidr.prefix_len()
664            )));
665        }
666        if slice_prefix > cluster_cidr.max_prefix_len() {
667            return Err(OverlayError::InvalidCidr(format!(
668                "slice prefix /{} exceeds address family max /{}",
669                slice_prefix,
670                cluster_cidr.max_prefix_len()
671            )));
672        }
673        Ok(Self {
674            cluster_cidr,
675            slice_prefix,
676            assignments: HashMap::new(),
677            reserved: Vec::new(),
678        })
679    }
680
681    /// Reserve an IP so no future `assign` hands out a slice that contains it.
682    /// Idempotent. Reserve the node overlay IP here so per-service bridges
683    /// never overlap the node's DNS listener address.
684    pub fn reserve_ip(&mut self, ip: IpAddr) {
685        if !self.reserved.contains(&ip) {
686            self.reserved.push(ip);
687        }
688    }
689
690    /// Assign (or return an existing) subnet for `(service, node)`.
691    ///
692    /// Idempotent: repeated calls with the same key return the same slice
693    /// without re-assigning.
694    ///
695    /// # Errors
696    ///
697    /// Returns `OverlayError::NoAvailableIps` if every slice in the cluster
698    /// CIDR is already assigned to some other `(service, node)` pair.
699    pub fn assign(&mut self, service: &str, node: &str) -> Result<IpNet> {
700        let key = (service.to_string(), node.to_string());
701        if let Some(existing) = self.assignments.get(&key) {
702            return Ok(*existing);
703        }
704
705        let num_slices = self.num_slices();
706        if num_slices == 0 {
707            return Err(OverlayError::NoAvailableIps);
708        }
709
710        let taken: HashSet<IpNet> = self.assignments.values().copied().collect();
711        let start = hash_service_node(service, node) % num_slices;
712
713        for i in 0..num_slices {
714            let idx = (start + i) % num_slices;
715            let slice = self.slice_at_index(idx);
716            if taken.contains(&slice) {
717                continue;
718            }
719            // Never hand out a slice that contains a reserved IP (the node
720            // overlay/DNS address). Such a slice would make the node IP appear
721            // on-link to this bridge's containers and black-hole their DNS.
722            if self.reserved.iter().any(|ip| slice.contains(ip)) {
723                continue;
724            }
725            self.assignments.insert(key, slice);
726            return Ok(slice);
727        }
728
729        Err(OverlayError::NoAvailableIps)
730    }
731
732    /// Release the subnet for `(service, node)`. Returns the freed slice if
733    /// one was assigned, `None` otherwise.
734    pub fn release(&mut self, service: &str, node: &str) -> Option<IpNet> {
735        let key = (service.to_string(), node.to_string());
736        self.assignments.remove(&key)
737    }
738
739    /// Look up the current assignment for `(service, node)`, if any.
740    #[must_use]
741    pub fn get(&self, service: &str, node: &str) -> Option<IpNet> {
742        let key = (service.to_string(), node.to_string());
743        self.assignments.get(&key).copied()
744    }
745
746    /// Number of currently-assigned `(service, node)` pairs.
747    #[must_use]
748    pub fn assigned_count(&self) -> usize {
749        self.assignments.len()
750    }
751
752    /// Total number of slices the cluster CIDR can hold at the configured
753    /// slice prefix.
754    #[must_use]
755    pub fn capacity(&self) -> u64 {
756        self.num_slices()
757    }
758
759    /// Cluster CIDR the registry operates over.
760    #[must_use]
761    pub fn cluster_cidr(&self) -> IpNet {
762        self.cluster_cidr
763    }
764
765    /// Slice prefix length (e.g. `28` for `/28` slices).
766    #[must_use]
767    pub fn slice_prefix(&self) -> u8 {
768        self.slice_prefix
769    }
770
771    /// Build a persistable snapshot for Raft / durable leader state.
772    ///
773    /// The returned snapshot has assignments sorted by `(service, node)` so
774    /// the serialized bytes are deterministic across processes — important
775    /// when Raft compares snapshots by hash.
776    #[must_use]
777    pub fn snapshot(&self) -> ServiceSubnetRegistrySnapshot {
778        let mut assignments: Vec<((String, String), IpNet)> = self
779            .assignments
780            .iter()
781            .map(|(k, v)| (k.clone(), *v))
782            .collect();
783        assignments.sort_by(|a, b| a.0.cmp(&b.0));
784        ServiceSubnetRegistrySnapshot {
785            cluster_cidr: self.cluster_cidr,
786            slice_prefix: self.slice_prefix,
787            assignments,
788        }
789    }
790
791    /// Rebuild a registry from a snapshot.
792    ///
793    /// # Errors
794    ///
795    /// Returns `OverlayError::InvalidCidr` if the snapshot's slice prefix is
796    /// inconsistent with its assignments, or if any assigned slice is not
797    /// contained in the cluster CIDR.
798    pub fn restore(snapshot: ServiceSubnetRegistrySnapshot) -> Result<Self> {
799        let mut registry = Self::new(snapshot.cluster_cidr, snapshot.slice_prefix)?;
800        for (key, slice) in snapshot.assignments {
801            if slice.prefix_len() != snapshot.slice_prefix {
802                return Err(OverlayError::InvalidCidr(format!(
803                    "assigned slice {slice} does not match configured prefix /{}",
804                    snapshot.slice_prefix
805                )));
806            }
807            if !snapshot.cluster_cidr.contains(&slice.network()) {
808                return Err(OverlayError::InvalidCidr(format!(
809                    "assigned slice {slice} is not contained in cluster CIDR {}",
810                    snapshot.cluster_cidr
811                )));
812            }
813            registry.assignments.insert(key, slice);
814        }
815        Ok(registry)
816    }
817
818    fn num_slices(&self) -> u64 {
819        let bits = self.slice_prefix - self.cluster_cidr.prefix_len();
820        if bits >= 64 {
821            u64::MAX
822        } else {
823            1u64 << bits
824        }
825    }
826
827    fn slice_at_index(&self, idx: u64) -> IpNet {
828        let shift = u32::from(self.cluster_cidr.max_prefix_len() - self.slice_prefix);
829        match self.cluster_cidr {
830            IpNet::V4(v4) => {
831                let base = u32::from(v4.network());
832                #[allow(clippy::cast_possible_truncation)]
833                let offset = (idx as u32).wrapping_shl(shift);
834                let slice_addr = Ipv4Addr::from(base.wrapping_add(offset));
835                IpNet::V4(
836                    Ipv4Net::new(slice_addr, self.slice_prefix)
837                        .expect("slice_prefix validated in constructor"),
838                )
839            }
840            IpNet::V6(v6) => {
841                let base = u128::from(v6.network());
842                let offset = u128::from(idx).wrapping_shl(shift);
843                let slice_addr = Ipv6Addr::from(base.wrapping_add(offset));
844                IpNet::V6(
845                    Ipv6Net::new(slice_addr, self.slice_prefix)
846                        .expect("slice_prefix validated in constructor"),
847                )
848            }
849        }
850    }
851}
852
853/// Helper function to get the first usable IP from a CIDR
854///
855/// Supports both IPv4 and IPv6 CIDR notation.
856///
857/// # Errors
858///
859/// Returns an error if the CIDR is invalid or has no usable hosts.
860pub fn first_ip_from_cidr(cidr: &str) -> Result<IpAddr> {
861    let network: IpNet = cidr
862        .parse()
863        .map_err(|e| OverlayError::InvalidCidr(format!("{cidr}: {e}")))?;
864
865    match network {
866        IpNet::V4(v4net) => v4net
867            .hosts()
868            .next()
869            .map(IpAddr::V4)
870            .ok_or(OverlayError::NoAvailableIps),
871        IpNet::V6(v6net) => {
872            let base = v6net.network();
873            ipv6_add(base, 1)
874                .map(IpAddr::V6)
875                .ok_or(OverlayError::NoAvailableIps)
876        }
877    }
878}
879
880#[cfg(test)]
881mod tests {
882    use super::*;
883    use std::net::{Ipv4Addr, Ipv6Addr};
884
885    /// Increment an IPv4 address by a u32 offset from a base address.
886    ///
887    /// Returns `None` if the result would overflow.
888    fn ipv4_add(base: Ipv4Addr, offset: u32) -> Option<Ipv4Addr> {
889        let base_u32 = u32::from(base);
890        base_u32.checked_add(offset).map(Ipv4Addr::from)
891    }
892
893    // ========================
894    // IPv4 Tests (existing, updated for IpAddr)
895    // ========================
896
897    #[test]
898    fn test_allocator_new() {
899        let allocator = IpAllocator::new("10.200.0.0/24").unwrap();
900        assert_eq!(allocator.cidr(), "10.200.0.0/24");
901        assert_eq!(allocator.allocated_count(), 0);
902    }
903
904    #[test]
905    fn test_allocator_invalid_cidr() {
906        let result = IpAllocator::new("invalid");
907        assert!(result.is_err());
908    }
909
910    #[test]
911    fn test_allocate_sequential() {
912        let mut allocator = IpAllocator::new("10.200.0.0/30").unwrap();
913
914        // /30 has 2 usable hosts (excluding network and broadcast)
915        let ip1 = allocator.allocate().unwrap();
916        let ip2 = allocator.allocate().unwrap();
917
918        assert_eq!(ip1.to_string(), "10.200.0.1");
919        assert_eq!(ip2.to_string(), "10.200.0.2");
920
921        // Should be exhausted
922        assert!(allocator.allocate().is_none());
923    }
924
925    #[test]
926    fn test_allocate_first() {
927        let mut allocator = IpAllocator::new("10.200.0.0/24").unwrap();
928
929        let first = allocator.allocate_first().unwrap();
930        assert_eq!(first.to_string(), "10.200.0.1");
931
932        // Can't allocate first again
933        assert!(allocator.allocate_first().is_err());
934    }
935
936    #[test]
937    fn test_allocate_specific() {
938        let mut allocator = IpAllocator::new("10.200.0.0/24").unwrap();
939
940        let specific_ip: IpAddr = "10.200.0.50".parse().unwrap();
941        allocator.allocate_specific(specific_ip).unwrap();
942
943        assert!(allocator.is_allocated(specific_ip));
944
945        // Can't allocate same IP again
946        assert!(allocator.allocate_specific(specific_ip).is_err());
947    }
948
949    #[test]
950    fn test_allocate_specific_out_of_range() {
951        let mut allocator = IpAllocator::new("10.200.0.0/24").unwrap();
952
953        let out_of_range: IpAddr = "192.168.1.1".parse().unwrap();
954        assert!(allocator.allocate_specific(out_of_range).is_err());
955    }
956
957    #[test]
958    fn test_release() {
959        let mut allocator = IpAllocator::new("10.200.0.0/24").unwrap();
960
961        let ip = allocator.allocate().unwrap();
962        assert!(allocator.is_allocated(ip));
963
964        assert!(allocator.release(ip));
965        assert!(!allocator.is_allocated(ip));
966
967        // Can allocate same IP again
968        let ip2 = allocator.allocate().unwrap();
969        assert_eq!(ip, ip2);
970    }
971
972    #[test]
973    fn test_mark_allocated() {
974        let mut allocator = IpAllocator::new("10.200.0.0/24").unwrap();
975
976        let ip: IpAddr = "10.200.0.100".parse().unwrap();
977        allocator.mark_allocated(ip).unwrap();
978
979        assert!(allocator.is_allocated(ip));
980    }
981
982    #[test]
983    fn test_contains() {
984        let allocator = IpAllocator::new("10.200.0.0/24").unwrap();
985
986        assert!(allocator.contains("10.200.0.50".parse().unwrap()));
987        assert!(!allocator.contains("10.201.0.50".parse().unwrap()));
988    }
989
990    #[test]
991    fn test_total_hosts() {
992        // /24 has 254 usable hosts
993        let allocator = IpAllocator::new("10.200.0.0/24").unwrap();
994        assert_eq!(allocator.total_hosts(), 254);
995
996        // /30 has 2 usable hosts
997        let allocator = IpAllocator::new("10.200.0.0/30").unwrap();
998        assert_eq!(allocator.total_hosts(), 2);
999    }
1000
1001    #[test]
1002    fn test_available_count() {
1003        let mut allocator = IpAllocator::new("10.200.0.0/30").unwrap();
1004
1005        assert_eq!(allocator.available_count(), 2);
1006
1007        allocator.allocate();
1008        assert_eq!(allocator.available_count(), 1);
1009
1010        allocator.allocate();
1011        assert_eq!(allocator.available_count(), 0);
1012    }
1013
1014    #[test]
1015    fn test_state_roundtrip() {
1016        let mut allocator = IpAllocator::new("10.200.0.0/24").unwrap();
1017        allocator.allocate();
1018        allocator.allocate();
1019
1020        let state = allocator.to_state();
1021        let restored = IpAllocator::from_state(state).unwrap();
1022
1023        assert_eq!(allocator.cidr(), restored.cidr());
1024        assert_eq!(allocator.allocated_count(), restored.allocated_count());
1025    }
1026
1027    #[test]
1028    fn test_first_ip_from_cidr() {
1029        let ip = first_ip_from_cidr("10.200.0.0/24").unwrap();
1030        assert_eq!(ip.to_string(), "10.200.0.1");
1031    }
1032
1033    #[test]
1034    fn test_network_addr_v4() {
1035        let allocator = IpAllocator::new("10.200.0.0/24").unwrap();
1036        assert_eq!(
1037            allocator.network_addr(),
1038            IpAddr::V4("10.200.0.0".parse().unwrap())
1039        );
1040    }
1041
1042    #[test]
1043    fn test_broadcast_addr_v4() {
1044        let allocator = IpAllocator::new("10.200.0.0/24").unwrap();
1045        assert_eq!(
1046            allocator.broadcast_addr(),
1047            IpAddr::V4("10.200.0.255".parse().unwrap())
1048        );
1049    }
1050
1051    #[test]
1052    fn test_host_prefix_len_v4() {
1053        let allocator = IpAllocator::new("10.200.0.0/24").unwrap();
1054        assert_eq!(allocator.host_prefix_len(), 32);
1055    }
1056
1057    // ========================
1058    // IPv6 Tests
1059    // ========================
1060
1061    #[test]
1062    fn test_allocator_new_v6() {
1063        let allocator = IpAllocator::new("fd00::/48").unwrap();
1064        assert_eq!(allocator.cidr(), "fd00::/48");
1065        assert_eq!(allocator.allocated_count(), 0);
1066    }
1067
1068    #[test]
1069    fn test_allocate_sequential_v6() {
1070        let mut allocator = IpAllocator::new("fd00::/126").unwrap();
1071
1072        // /126 has 3 usable hosts (4 addresses total, minus the network address)
1073        let ip1 = allocator.allocate().unwrap();
1074        let ip2 = allocator.allocate().unwrap();
1075        let ip3 = allocator.allocate().unwrap();
1076
1077        assert_eq!(ip1.to_string(), "fd00::1");
1078        assert_eq!(ip2.to_string(), "fd00::2");
1079        assert_eq!(ip3.to_string(), "fd00::3");
1080
1081        // Should be exhausted
1082        assert!(allocator.allocate().is_none());
1083    }
1084
1085    #[test]
1086    fn test_allocate_first_v6() {
1087        let mut allocator = IpAllocator::new("fd00::/48").unwrap();
1088
1089        let first = allocator.allocate_first().unwrap();
1090        assert_eq!(first.to_string(), "fd00::1");
1091
1092        // Can't allocate first again
1093        assert!(allocator.allocate_first().is_err());
1094    }
1095
1096    #[test]
1097    fn test_allocate_specific_v6() {
1098        let mut allocator = IpAllocator::new("fd00::/48").unwrap();
1099
1100        let specific_ip: IpAddr = "fd00::beef".parse().unwrap();
1101        allocator.allocate_specific(specific_ip).unwrap();
1102
1103        assert!(allocator.is_allocated(specific_ip));
1104
1105        // Can't allocate same IP again
1106        assert!(allocator.allocate_specific(specific_ip).is_err());
1107    }
1108
1109    #[test]
1110    fn test_allocate_specific_out_of_range_v6() {
1111        let mut allocator = IpAllocator::new("fd00::/48").unwrap();
1112
1113        let out_of_range: IpAddr = "fe80::1".parse().unwrap();
1114        assert!(allocator.allocate_specific(out_of_range).is_err());
1115    }
1116
1117    #[test]
1118    fn test_release_v6() {
1119        let mut allocator = IpAllocator::new("fd00::/48").unwrap();
1120
1121        let ip = allocator.allocate().unwrap();
1122        assert!(allocator.is_allocated(ip));
1123
1124        assert!(allocator.release(ip));
1125        assert!(!allocator.is_allocated(ip));
1126
1127        // Can allocate same IP again
1128        let ip2 = allocator.allocate().unwrap();
1129        assert_eq!(ip, ip2);
1130    }
1131
1132    #[test]
1133    fn test_mark_allocated_v6() {
1134        let mut allocator = IpAllocator::new("fd00::/48").unwrap();
1135
1136        let ip: IpAddr = "fd00::ff".parse().unwrap();
1137        allocator.mark_allocated(ip).unwrap();
1138
1139        assert!(allocator.is_allocated(ip));
1140    }
1141
1142    #[test]
1143    fn test_contains_v6() {
1144        let allocator = IpAllocator::new("fd00::/48").unwrap();
1145
1146        assert!(allocator.contains("fd00::50".parse().unwrap()));
1147        assert!(!allocator.contains("fe80::1".parse().unwrap()));
1148    }
1149
1150    #[test]
1151    fn test_total_hosts_v6_small() {
1152        // /126 has 3 usable hosts (skip network addr)
1153        let allocator = IpAllocator::new("fd00::/126").unwrap();
1154        assert_eq!(allocator.total_hosts(), 3);
1155
1156        // /127 has 1 usable host
1157        let allocator = IpAllocator::new("fd00::/127").unwrap();
1158        assert_eq!(allocator.total_hosts(), 1);
1159    }
1160
1161    #[test]
1162    fn test_total_hosts_v6_large() {
1163        // /48 has 2^80 - 1 usable hosts, which saturates to u32::MAX
1164        let allocator = IpAllocator::new("fd00::/48").unwrap();
1165        assert_eq!(allocator.total_hosts(), u32::MAX);
1166    }
1167
1168    #[test]
1169    fn test_available_count_v6() {
1170        let mut allocator = IpAllocator::new("fd00::/126").unwrap();
1171
1172        assert_eq!(allocator.available_count(), 3);
1173
1174        allocator.allocate();
1175        assert_eq!(allocator.available_count(), 2);
1176
1177        allocator.allocate();
1178        assert_eq!(allocator.available_count(), 1);
1179
1180        allocator.allocate();
1181        assert_eq!(allocator.available_count(), 0);
1182    }
1183
1184    #[test]
1185    fn test_state_roundtrip_v6() {
1186        let mut allocator = IpAllocator::new("fd00::/48").unwrap();
1187        allocator.allocate();
1188        allocator.allocate();
1189
1190        let state = allocator.to_state();
1191
1192        // Verify IpAddr serializes as strings (backward-compatible)
1193        let json = serde_json::to_string_pretty(&state).unwrap();
1194        assert!(json.contains("fd00::1"));
1195        assert!(json.contains("fd00::2"));
1196
1197        let restored = IpAllocator::from_state(state).unwrap();
1198
1199        assert_eq!(allocator.cidr(), restored.cidr());
1200        assert_eq!(allocator.allocated_count(), restored.allocated_count());
1201    }
1202
1203    #[test]
1204    fn test_first_ip_from_cidr_v6() {
1205        let ip = first_ip_from_cidr("fd00::/48").unwrap();
1206        assert_eq!(ip.to_string(), "fd00::1");
1207    }
1208
1209    #[test]
1210    fn test_network_addr_v6() {
1211        let allocator = IpAllocator::new("fd00::/48").unwrap();
1212        assert_eq!(
1213            allocator.network_addr(),
1214            IpAddr::V6("fd00::".parse().unwrap())
1215        );
1216    }
1217
1218    #[test]
1219    fn test_broadcast_addr_v6() {
1220        let allocator = IpAllocator::new("fd00::/126").unwrap();
1221        assert_eq!(
1222            allocator.broadcast_addr(),
1223            IpAddr::V6("fd00::3".parse().unwrap())
1224        );
1225    }
1226
1227    #[test]
1228    fn test_host_prefix_len_v6() {
1229        let allocator = IpAllocator::new("fd00::/48").unwrap();
1230        assert_eq!(allocator.host_prefix_len(), 128);
1231    }
1232
1233    // ========================
1234    // Cross-protocol tests
1235    // ========================
1236
1237    #[test]
1238    fn test_v4_and_v6_allocators_independent() {
1239        let mut v4 = IpAllocator::new("10.200.0.0/30").unwrap();
1240        let mut v6 = IpAllocator::new("fd00::/126").unwrap();
1241
1242        let v4_ip = v4.allocate().unwrap();
1243        let v6_ip = v6.allocate().unwrap();
1244
1245        assert!(v4_ip.is_ipv4());
1246        assert!(v6_ip.is_ipv6());
1247        assert_eq!(v4_ip.to_string(), "10.200.0.1");
1248        assert_eq!(v6_ip.to_string(), "fd00::1");
1249    }
1250
1251    #[test]
1252    fn test_ipv6_does_not_contain_ipv4() {
1253        let allocator = IpAllocator::new("fd00::/48").unwrap();
1254        assert!(!allocator.contains("10.200.0.1".parse().unwrap()));
1255    }
1256
1257    #[test]
1258    fn test_ipv4_does_not_contain_ipv6() {
1259        let allocator = IpAllocator::new("10.200.0.0/24").unwrap();
1260        assert!(!allocator.contains("fd00::1".parse().unwrap()));
1261    }
1262
1263    #[test]
1264    fn test_allocate_specific_wrong_family() {
1265        let mut v4_alloc = IpAllocator::new("10.200.0.0/24").unwrap();
1266        let v6_ip: IpAddr = "fd00::1".parse().unwrap();
1267        assert!(v4_alloc.allocate_specific(v6_ip).is_err());
1268
1269        let mut v6_alloc = IpAllocator::new("fd00::/48").unwrap();
1270        let v4_ip: IpAddr = "10.200.0.1".parse().unwrap();
1271        assert!(v6_alloc.allocate_specific(v4_ip).is_err());
1272    }
1273
1274    // ========================
1275    // Helper function tests
1276    // ========================
1277
1278    #[test]
1279    fn test_ipv4_add() {
1280        let base: Ipv4Addr = "10.0.0.0".parse().unwrap();
1281        assert_eq!(ipv4_add(base, 1), Some("10.0.0.1".parse().unwrap()));
1282        assert_eq!(ipv4_add(base, 256), Some("10.0.1.0".parse().unwrap()));
1283    }
1284
1285    #[test]
1286    fn test_ipv4_add_overflow() {
1287        let base: Ipv4Addr = "255.255.255.255".parse().unwrap();
1288        assert_eq!(ipv4_add(base, 1), None);
1289    }
1290
1291    #[test]
1292    fn test_ipv6_add() {
1293        let base: Ipv6Addr = "fd00::".parse().unwrap();
1294        assert_eq!(ipv6_add(base, 1), Some("fd00::1".parse().unwrap()));
1295        assert_eq!(ipv6_add(base, 0xffff), Some("fd00::ffff".parse().unwrap()));
1296    }
1297
1298    #[test]
1299    fn test_ipv6_add_overflow() {
1300        let base: Ipv6Addr = "ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff".parse().unwrap();
1301        assert_eq!(ipv6_add(base, 1), None);
1302    }
1303
1304    #[test]
1305    fn test_host_count_v4() {
1306        assert_eq!(host_count(false, 24), 254); // 2^8 - 2
1307        assert_eq!(host_count(false, 30), 2); // 2^2 - 2
1308        assert_eq!(host_count(false, 16), 65534); // 2^16 - 2
1309        assert_eq!(host_count(false, 31), 0); // /31 — no classical hosts
1310        assert_eq!(host_count(false, 32), 0); // /32 — single address
1311    }
1312
1313    #[test]
1314    fn test_host_count_v6() {
1315        assert_eq!(host_count(true, 126), 3); // 2^2 - 1
1316        assert_eq!(host_count(true, 127), 1); // 2^1 - 1
1317        assert_eq!(host_count(true, 128), 0); // /128 — single address (is network addr)
1318        assert_eq!(host_count(true, 64), (1u128 << 64) - 1); // 2^64 - 1
1319    }
1320
1321    // ========================
1322    // NodeSliceAllocator tests
1323    // ========================
1324
1325    fn cluster() -> IpNet {
1326        "10.200.0.0/16".parse().unwrap()
1327    }
1328
1329    #[test]
1330    fn test_slice_new_rejects_equal_prefix() {
1331        let err = NodeSliceAllocator::new(cluster(), 16).unwrap_err();
1332        assert!(matches!(err, OverlayError::InvalidCidr(_)));
1333    }
1334
1335    #[test]
1336    fn test_slice_new_rejects_smaller_prefix() {
1337        let err = NodeSliceAllocator::new(cluster(), 8).unwrap_err();
1338        assert!(matches!(err, OverlayError::InvalidCidr(_)));
1339    }
1340
1341    #[test]
1342    fn test_slice_new_rejects_over_max() {
1343        let err = NodeSliceAllocator::new(cluster(), 33).unwrap_err();
1344        assert!(matches!(err, OverlayError::InvalidCidr(_)));
1345    }
1346
1347    #[test]
1348    fn test_slice_capacity_28_in_16() {
1349        let allocator = NodeSliceAllocator::new(cluster(), 28).unwrap();
1350        // /16 → /28 ⇒ 2^12 = 4096 slices
1351        assert_eq!(allocator.capacity(), 4096);
1352    }
1353
1354    #[test]
1355    fn test_slice_capacity_24_in_16() {
1356        let allocator = NodeSliceAllocator::new(cluster(), 24).unwrap();
1357        // /16 → /24 ⇒ 2^8 = 256 slices
1358        assert_eq!(allocator.capacity(), 256);
1359    }
1360
1361    #[test]
1362    fn test_slice_assign_is_within_cluster() {
1363        let mut allocator = NodeSliceAllocator::new(cluster(), 28).unwrap();
1364        let slice = allocator.assign("node-a").unwrap();
1365        assert_eq!(slice.prefix_len(), 28);
1366        assert!(cluster().contains(&slice.network()));
1367    }
1368
1369    #[test]
1370    fn test_slice_assign_is_idempotent() {
1371        let mut allocator = NodeSliceAllocator::new(cluster(), 28).unwrap();
1372        let first = allocator.assign("node-a").unwrap();
1373        let second = allocator.assign("node-a").unwrap();
1374        assert_eq!(first, second);
1375        assert_eq!(allocator.assigned_count(), 1);
1376    }
1377
1378    #[test]
1379    fn test_slice_assign_different_nodes_get_different_slices() {
1380        let mut allocator = NodeSliceAllocator::new(cluster(), 28).unwrap();
1381        let a = allocator.assign("node-a").unwrap();
1382        let b = allocator.assign("node-b").unwrap();
1383        let c = allocator.assign("node-c").unwrap();
1384        assert_ne!(a, b);
1385        assert_ne!(b, c);
1386        assert_ne!(a, c);
1387    }
1388
1389    #[test]
1390    fn test_slice_release() {
1391        let mut allocator = NodeSliceAllocator::new(cluster(), 28).unwrap();
1392        let slice = allocator.assign("node-a").unwrap();
1393        assert_eq!(allocator.slice_for("node-a"), Some(slice));
1394
1395        assert!(allocator.release("node-a"));
1396        assert_eq!(allocator.slice_for("node-a"), None);
1397
1398        // Release of unknown node returns false.
1399        assert!(!allocator.release("node-a"));
1400    }
1401
1402    #[test]
1403    fn test_slice_collision_probes_forward() {
1404        // Use a very small cluster → few slices → high probability that two
1405        // arbitrary IDs hash to the same candidate index. Force a true collision
1406        // by manually occupying the slot a second node's hash maps to.
1407        let small: IpNet = "10.200.0.0/28".parse().unwrap();
1408        let mut allocator = NodeSliceAllocator::new(small, 30).unwrap();
1409        // /28 → /30 ⇒ 2^2 = 4 slices
1410        assert_eq!(allocator.capacity(), 4);
1411
1412        // Assign 4 nodes — all must succeed and all must land on distinct slices.
1413        let ids = ["a", "b", "c", "d"];
1414        let mut slices: Vec<IpNet> = Vec::new();
1415        for id in ids {
1416            let slice = allocator.assign(id).unwrap();
1417            assert!(
1418                !slices.contains(&slice),
1419                "slice {slice} re-assigned; all slices must be distinct"
1420            );
1421            slices.push(slice);
1422        }
1423        assert_eq!(allocator.assigned_count(), 4);
1424    }
1425
1426    #[test]
1427    fn test_slice_exhaustion_4096() {
1428        let mut allocator = NodeSliceAllocator::new(cluster(), 28).unwrap();
1429        // Fill every one of the 4096 slices.
1430        for i in 0..4096u32 {
1431            let id = format!("node-{i}");
1432            allocator.assign(&id).unwrap();
1433        }
1434        assert_eq!(allocator.assigned_count(), 4096);
1435
1436        // The next assignment must fail with NoAvailableIps.
1437        let err = allocator.assign("node-4096").unwrap_err();
1438        assert!(matches!(err, OverlayError::NoAvailableIps));
1439    }
1440
1441    #[test]
1442    fn test_slice_snapshot_roundtrip() {
1443        let mut allocator = NodeSliceAllocator::new(cluster(), 28).unwrap();
1444        let slice_a = allocator.assign("node-a").unwrap();
1445        let slice_b = allocator.assign("node-b").unwrap();
1446        let slice_c = allocator.assign("node-c").unwrap();
1447
1448        let snapshot = allocator.snapshot();
1449
1450        // Round-trip through JSON serialization too.
1451        let json = serde_json::to_string(&snapshot).unwrap();
1452        let snapshot_restored: NodeSliceAllocatorSnapshot = serde_json::from_str(&json).unwrap();
1453
1454        let restored = NodeSliceAllocator::restore(snapshot_restored).unwrap();
1455        assert_eq!(restored.slice_for("node-a"), Some(slice_a));
1456        assert_eq!(restored.slice_for("node-b"), Some(slice_b));
1457        assert_eq!(restored.slice_for("node-c"), Some(slice_c));
1458        assert_eq!(restored.capacity(), 4096);
1459        assert_eq!(restored.slice_prefix(), 28);
1460        assert_eq!(restored.cluster_cidr(), cluster());
1461    }
1462
1463    #[test]
1464    fn test_slice_restore_rejects_mismatched_prefix() {
1465        let snapshot = NodeSliceAllocatorSnapshot {
1466            cluster_cidr: "10.200.0.0/16".to_string(),
1467            slice_prefix: 28,
1468            assigned: vec![("node-a".to_string(), "10.200.0.0/24".to_string())],
1469        };
1470        let err = NodeSliceAllocator::restore(snapshot).unwrap_err();
1471        assert!(matches!(err, OverlayError::InvalidCidr(_)));
1472    }
1473
1474    #[test]
1475    fn test_slice_restore_rejects_out_of_cluster() {
1476        let snapshot = NodeSliceAllocatorSnapshot {
1477            cluster_cidr: "10.200.0.0/16".to_string(),
1478            slice_prefix: 28,
1479            assigned: vec![("node-a".to_string(), "10.201.0.0/28".to_string())],
1480        };
1481        let err = NodeSliceAllocator::restore(snapshot).unwrap_err();
1482        assert!(matches!(err, OverlayError::InvalidCidr(_)));
1483    }
1484
1485    #[test]
1486    fn test_slice_hash_is_deterministic() {
1487        // Two allocators built fresh should produce the same first-assignment
1488        // index for the same node ID — critical for consistency across leader
1489        // restart on a fresh cluster (before any snapshot exists).
1490        let mut a = NodeSliceAllocator::new(cluster(), 28).unwrap();
1491        let mut b = NodeSliceAllocator::new(cluster(), 28).unwrap();
1492        let slice_a = a.assign("my-node-id").unwrap();
1493        let slice_b = b.assign("my-node-id").unwrap();
1494        assert_eq!(slice_a, slice_b);
1495    }
1496
1497    #[test]
1498    fn test_slice_allocator_v6() {
1499        let cluster_v6: IpNet = "fd00:200::/48".parse().unwrap();
1500        let mut allocator = NodeSliceAllocator::new(cluster_v6, 64).unwrap();
1501        // /48 → /64 ⇒ 2^16 = 65536 slices
1502        assert_eq!(allocator.capacity(), 65536);
1503
1504        let slice = allocator.assign("node-a").unwrap();
1505        assert_eq!(slice.prefix_len(), 64);
1506        assert!(cluster_v6.contains(&slice.network()));
1507    }
1508
1509    // ========================
1510    // ServiceSubnetRegistry tests
1511    // ========================
1512
1513    #[test]
1514    fn service_subnet_assign_is_idempotent() {
1515        let mut reg = ServiceSubnetRegistry::new(cluster(), 28).unwrap();
1516        let first = reg.assign("svc-a", "node-1").unwrap();
1517        let second = reg.assign("svc-a", "node-1").unwrap();
1518        assert_eq!(first, second);
1519        assert_eq!(reg.assigned_count(), 1);
1520        assert_eq!(reg.get("svc-a", "node-1"), Some(first));
1521    }
1522
1523    #[test]
1524    fn service_subnet_skips_slices_containing_reserved_ip() {
1525        // The node overlay/DNS IP (10.200.0.5) lives inside the first /28 slice
1526        // (10.200.0.0/28). Reserving it must keep EVERY assigned bridge slice
1527        // clear of it, so a container never thinks the node DNS is on-link.
1528        let mut reg = ServiceSubnetRegistry::new(cluster(), 28).unwrap();
1529        let node_ip: std::net::IpAddr = "10.200.0.5".parse().unwrap();
1530        reg.reserve_ip(node_ip);
1531        reg.reserve_ip(node_ip); // idempotent
1532
1533        for i in 0..64 {
1534            let slice = reg.assign(&format!("svc-{i}"), "node-1").unwrap();
1535            assert!(
1536                !slice.contains(&node_ip),
1537                "assigned slice {slice} must not contain reserved node IP {node_ip}"
1538            );
1539        }
1540        // The specific colliding /28 is never handed out.
1541        let colliding: IpNet = "10.200.0.0/28".parse().unwrap();
1542        assert!(reg.assignments.values().all(|s| *s != colliding));
1543    }
1544
1545    #[test]
1546    fn service_subnet_two_services_disjoint() {
1547        let mut reg = ServiceSubnetRegistry::new(cluster(), 28).unwrap();
1548        let a = reg.assign("svc-a", "node-1").unwrap();
1549        let b = reg.assign("svc-b", "node-1").unwrap();
1550        assert_ne!(a, b);
1551        // Slices must be disjoint (neither contains the other's network address).
1552        assert!(!a.contains(&b.network()));
1553        assert!(!b.contains(&a.network()));
1554    }
1555
1556    #[test]
1557    fn service_subnet_same_service_two_nodes_disjoint() {
1558        let mut reg = ServiceSubnetRegistry::new(cluster(), 28).unwrap();
1559        let a = reg.assign("svc-a", "node-1").unwrap();
1560        let b = reg.assign("svc-a", "node-2").unwrap();
1561        assert_ne!(a, b);
1562        assert!(!a.contains(&b.network()));
1563        assert!(!b.contains(&a.network()));
1564    }
1565
1566    #[test]
1567    fn service_subnet_release_reclaims_slot() {
1568        let mut reg = ServiceSubnetRegistry::new(cluster(), 28).unwrap();
1569        let first = reg.assign("svc-a", "node-1").unwrap();
1570        let released = reg.release("svc-a", "node-1");
1571        assert_eq!(released, Some(first));
1572        assert_eq!(reg.get("svc-a", "node-1"), None);
1573        assert_eq!(reg.assigned_count(), 0);
1574
1575        // Re-assign should land on the same slot because the hash is
1576        // deterministic and no other assignment is occupying it.
1577        let again = reg.assign("svc-a", "node-1").unwrap();
1578        assert_eq!(again, first);
1579
1580        // Releasing an unknown key returns None.
1581        assert_eq!(reg.release("svc-z", "node-z"), None);
1582    }
1583
1584    #[test]
1585    fn service_subnet_snapshot_restore_roundtrip() {
1586        let mut reg = ServiceSubnetRegistry::new(cluster(), 28).unwrap();
1587        let a = reg.assign("svc-a", "node-1").unwrap();
1588        let b = reg.assign("svc-a", "node-2").unwrap();
1589        let c = reg.assign("svc-b", "node-1").unwrap();
1590        let d = reg.assign("svc-b", "node-2").unwrap();
1591
1592        let snapshot = reg.snapshot();
1593
1594        // Round-trip through JSON to mimic the Raft serialization boundary.
1595        let json = serde_json::to_string(&snapshot).unwrap();
1596        let snapshot_restored: ServiceSubnetRegistrySnapshot = serde_json::from_str(&json).unwrap();
1597
1598        // Snapshot ordering must be deterministic — re-snapshotting the same
1599        // state must serialize to the same bytes (critical for Raft hashing).
1600        let json2 = serde_json::to_string(&reg.snapshot()).unwrap();
1601        assert_eq!(json, json2);
1602
1603        let restored = ServiceSubnetRegistry::restore(snapshot_restored).unwrap();
1604        assert_eq!(restored.get("svc-a", "node-1"), Some(a));
1605        assert_eq!(restored.get("svc-a", "node-2"), Some(b));
1606        assert_eq!(restored.get("svc-b", "node-1"), Some(c));
1607        assert_eq!(restored.get("svc-b", "node-2"), Some(d));
1608        assert_eq!(restored.assigned_count(), 4);
1609        assert_eq!(restored.slice_prefix(), 28);
1610        assert_eq!(restored.cluster_cidr(), cluster());
1611        assert_eq!(restored.capacity(), 4096);
1612    }
1613
1614    #[test]
1615    fn service_subnet_exhaustion_errors() {
1616        // /29 with /30 slices → 2 slots total.
1617        let small: IpNet = "10.200.0.0/29".parse().unwrap();
1618        let mut reg = ServiceSubnetRegistry::new(small, 30).unwrap();
1619        assert_eq!(reg.capacity(), 2);
1620
1621        reg.assign("svc-a", "node-1").unwrap();
1622        reg.assign("svc-a", "node-2").unwrap();
1623        assert_eq!(reg.assigned_count(), 2);
1624
1625        let err = reg.assign("svc-a", "node-3").unwrap_err();
1626        assert!(matches!(err, OverlayError::NoAvailableIps));
1627
1628        // But re-assigning an existing pair still succeeds (idempotent).
1629        let existing = reg.get("svc-a", "node-1").unwrap();
1630        assert_eq!(reg.assign("svc-a", "node-1").unwrap(), existing);
1631    }
1632
1633    #[test]
1634    fn service_subnet_rejects_bad_prefix() {
1635        // slice prefix equal to cluster prefix.
1636        let err = ServiceSubnetRegistry::new(cluster(), 16).unwrap_err();
1637        assert!(matches!(err, OverlayError::InvalidCidr(_)));
1638        // slice prefix shorter than cluster prefix.
1639        let err = ServiceSubnetRegistry::new(cluster(), 8).unwrap_err();
1640        assert!(matches!(err, OverlayError::InvalidCidr(_)));
1641        // slice prefix beyond max for family.
1642        let err = ServiceSubnetRegistry::new(cluster(), 33).unwrap_err();
1643        assert!(matches!(err, OverlayError::InvalidCidr(_)));
1644    }
1645
1646    #[test]
1647    fn service_subnet_hash_is_deterministic_across_instances() {
1648        // Two registries built fresh must assign the same (service, node)
1649        // pair to the same starting slot — same guarantee as
1650        // `NodeSliceAllocator::test_slice_hash_is_deterministic`.
1651        let mut a = ServiceSubnetRegistry::new(cluster(), 28).unwrap();
1652        let mut b = ServiceSubnetRegistry::new(cluster(), 28).unwrap();
1653        let slice_a = a.assign("svc-x", "node-x").unwrap();
1654        let slice_b = b.assign("svc-x", "node-x").unwrap();
1655        assert_eq!(slice_a, slice_b);
1656    }
1657
1658    #[test]
1659    fn service_subnet_restore_rejects_mismatched_prefix() {
1660        let snapshot = ServiceSubnetRegistrySnapshot {
1661            cluster_cidr: "10.200.0.0/16".parse().unwrap(),
1662            slice_prefix: 28,
1663            assignments: vec![(
1664                ("svc-a".to_string(), "node-1".to_string()),
1665                "10.200.0.0/24".parse().unwrap(),
1666            )],
1667        };
1668        let err = ServiceSubnetRegistry::restore(snapshot).unwrap_err();
1669        assert!(matches!(err, OverlayError::InvalidCidr(_)));
1670    }
1671
1672    #[test]
1673    fn service_subnet_restore_rejects_out_of_cluster() {
1674        let snapshot = ServiceSubnetRegistrySnapshot {
1675            cluster_cidr: "10.200.0.0/16".parse().unwrap(),
1676            slice_prefix: 28,
1677            assignments: vec![(
1678                ("svc-a".to_string(), "node-1".to_string()),
1679                "10.201.0.0/28".parse().unwrap(),
1680            )],
1681        };
1682        let err = ServiceSubnetRegistry::restore(snapshot).unwrap_err();
1683        assert!(matches!(err, OverlayError::InvalidCidr(_)));
1684    }
1685}