zlayer_overlay/edge_cache.rs
1//! Edge-cache eligibility registry.
2//!
3//! Tracks which nodes have been registered as eligible for edge caching by
4//! the upstream control plane (the consuming cluster manager).
5//! Propagates the eligibility decision via
6//! the gossip-label mechanism in [`crate::gossip`] so peer nodes can route
7//! cache-bearing traffic to eligible nodes by inspecting their advertised
8//! [`gossip::PeerInfo::labels`](crate::gossip::PeerInfo).
9//!
10//! The actual cache fill / eviction / hit-counting logic is out of scope
11//! for this module — it lives in upstream feature work (`Z3Fungi` sidecar or
12//! a future overlay-native cache primitive). [`EdgeCacheRegistry::stats`]
13//! returns a `(0, 0)` placeholder intentionally; the API surface exists so
14//! upstream integration tests (e.g. Zatabase Wave 1.3.3.7) can build
15//! against it.
16
17use std::collections::HashMap;
18use std::sync::Arc;
19use std::time::SystemTime;
20
21use thiserror::Error;
22use tokio::sync::RwLock;
23
24use crate::error::OverlayError;
25use crate::gossip::{GossipPool, PeerInfo};
26
27/// Cache capacity advertised by a node when it becomes eligible.
28#[derive(Debug, Clone)]
29pub struct NodeCapacity {
30 /// Number of CPU cores the node is willing to dedicate to cache work.
31 pub cpu_cores: u32,
32 /// Resident memory (MiB) the node will allow the cache to occupy.
33 pub ram_mib: u64,
34 /// On-disk cache budget (MiB).
35 pub disk_mib: u64,
36 /// Optional geo / region label used by future placement decisions.
37 pub geo: Option<String>,
38}
39
40/// One entry in the registry — a node currently marked edge-cache-eligible.
41#[derive(Debug, Clone)]
42pub struct EdgeCacheNode {
43 /// `ZLayer` node ID this entry describes.
44 pub node_id: u64,
45 /// Capacity the node declared at registration time.
46 pub capacity: NodeCapacity,
47 /// Wall-clock time the node was added to the registry.
48 pub enabled_at: SystemTime,
49}
50
51/// Per-node cache hit/miss counters.
52///
53/// This is a documented placeholder: today both fields are always `0`.
54/// The real counters will land in a follow-on patch when the cache fill /
55/// eviction subsystem itself is implemented. The type lives here now so
56/// upstream callers (Zatabase Wave 1.3.3.7 in particular) can build
57/// integration tests against the stable shape.
58#[derive(Debug, Clone, Copy, Default)]
59pub struct EdgeCacheStats {
60 /// Total cache hits observed for the node since registration.
61 pub hits: u64,
62 /// Total cache misses observed for the node since registration.
63 pub misses: u64,
64}
65
66/// Errors returned by [`EdgeCacheRegistry`].
67#[derive(Debug, Error)]
68pub enum EdgeCacheError {
69 /// Re-publishing the gossip self-info (carrying the updated labels)
70 /// failed at the underlying chitchat layer.
71 #[error("gossip label push failed: {0}")]
72 Gossip(String),
73 /// Caller asked to disable a node that wasn't registered. Treated as
74 /// an error so the caller knows their view of the world is stale.
75 #[error("node {0} is not registered as edge-cache eligible")]
76 NodeNotFound(u64),
77}
78
79impl From<OverlayError> for EdgeCacheError {
80 fn from(err: OverlayError) -> Self {
81 EdgeCacheError::Gossip(err.to_string())
82 }
83}
84
85/// Standard gossip-label keys this registry writes/clears.
86const LABEL_KEY_ENABLED: &str = "edge_cache";
87const LABEL_KEY_CPU: &str = "edge_cache_cpu";
88const LABEL_KEY_RAM_MIB: &str = "edge_cache_ram_mib";
89const LABEL_KEY_DISK_MIB: &str = "edge_cache_disk_mib";
90const LABEL_KEY_GEO: &str = "edge_cache_geo";
91
92/// Tracks which nodes are eligible for edge caching and propagates that
93/// state via gossip labels.
94///
95/// Construction is cheap; the registry is `Clone` (it's an
96/// `Arc`-wrapped pair of state + gossip handle) so it can be plumbed into
97/// Axum handlers behind `with_state`.
98#[derive(Clone)]
99pub struct EdgeCacheRegistry {
100 inner: Arc<RwLock<HashMap<u64, EdgeCacheNode>>>,
101 /// Gossip pool used to broadcast the `edge_cache=true` label, plus
102 /// the capacity sub-labels. `None` in test/standalone modes where no
103 /// gossip pool is wired — the registry still tracks eligibility
104 /// locally but doesn't propagate.
105 gossip: Option<Arc<GossipPool>>,
106 /// Self-info template used when re-announcing after a label change.
107 /// `None` when [`gossip`] is also `None`.
108 self_info: Arc<RwLock<Option<PeerInfo>>>,
109}
110
111impl std::fmt::Debug for EdgeCacheRegistry {
112 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113 f.debug_struct("EdgeCacheRegistry")
114 .field("gossip_attached", &self.gossip.is_some())
115 .finish_non_exhaustive()
116 }
117}
118
119impl EdgeCacheRegistry {
120 /// Create a registry with no gossip pool attached.
121 ///
122 /// Eligibility decisions are still tracked locally and returned by
123 /// [`Self::list_eligible`], but no labels are pushed to peers. Used
124 /// by tests and by standalone-mode daemons that don't participate in
125 /// the worker-tier gossip pool.
126 #[must_use]
127 pub fn new() -> Self {
128 Self {
129 inner: Arc::new(RwLock::new(HashMap::new())),
130 gossip: None,
131 self_info: Arc::new(RwLock::new(None)),
132 }
133 }
134
135 /// Create a registry that pushes eligibility labels into the
136 /// supplied gossip pool.
137 ///
138 /// `self_info` is the [`PeerInfo`] template the registry will mutate
139 /// (label keys are added on `enable` and removed on `disable`) and
140 /// then re-broadcast via [`GossipPool::announce_self`].
141 #[must_use]
142 pub fn with_gossip(gossip: Arc<GossipPool>, self_info: PeerInfo) -> Self {
143 Self {
144 inner: Arc::new(RwLock::new(HashMap::new())),
145 gossip: Some(gossip),
146 self_info: Arc::new(RwLock::new(Some(self_info))),
147 }
148 }
149
150 /// Mark `node_id` as eligible for edge caching with `capacity`.
151 ///
152 /// Inserts a new [`EdgeCacheNode`] into the registry (overwriting any
153 /// prior entry for the same `node_id`) and, when a gossip pool is
154 /// attached, pushes the standard label set into the local node's
155 /// gossip self-info and re-announces it.
156 ///
157 /// # Errors
158 ///
159 /// Returns [`EdgeCacheError::Gossip`] if the gossip pool rejects the
160 /// re-announcement (serialization error inside chitchat).
161 pub async fn enable(&self, node_id: u64, capacity: NodeCapacity) -> Result<(), EdgeCacheError> {
162 {
163 let mut guard = self.inner.write().await;
164 guard.insert(
165 node_id,
166 EdgeCacheNode {
167 node_id,
168 capacity: capacity.clone(),
169 enabled_at: SystemTime::now(),
170 },
171 );
172 }
173
174 if let Some(gossip) = self.gossip.as_ref() {
175 let mut info_guard = self.self_info.write().await;
176 if let Some(info) = info_guard.as_mut() {
177 info.labels
178 .insert(LABEL_KEY_ENABLED.to_string(), "true".to_string());
179 info.labels
180 .insert(LABEL_KEY_CPU.to_string(), capacity.cpu_cores.to_string());
181 info.labels
182 .insert(LABEL_KEY_RAM_MIB.to_string(), capacity.ram_mib.to_string());
183 info.labels.insert(
184 LABEL_KEY_DISK_MIB.to_string(),
185 capacity.disk_mib.to_string(),
186 );
187 if let Some(geo) = capacity.geo.as_ref() {
188 info.labels.insert(LABEL_KEY_GEO.to_string(), geo.clone());
189 } else {
190 info.labels.remove(LABEL_KEY_GEO);
191 }
192 gossip.announce_self(info).await?;
193 }
194 }
195
196 Ok(())
197 }
198
199 /// Remove `node_id` from the eligibility registry.
200 ///
201 /// When a gossip pool is attached, clears the edge-cache label set
202 /// from the local node's gossip self-info and re-announces.
203 ///
204 /// # Errors
205 ///
206 /// - [`EdgeCacheError::NodeNotFound`] if `node_id` was not registered.
207 /// - [`EdgeCacheError::Gossip`] if the gossip re-announcement fails.
208 pub async fn disable(&self, node_id: u64) -> Result<(), EdgeCacheError> {
209 {
210 let mut guard = self.inner.write().await;
211 if guard.remove(&node_id).is_none() {
212 return Err(EdgeCacheError::NodeNotFound(node_id));
213 }
214 }
215
216 if let Some(gossip) = self.gossip.as_ref() {
217 let mut info_guard = self.self_info.write().await;
218 if let Some(info) = info_guard.as_mut() {
219 info.labels.remove(LABEL_KEY_ENABLED);
220 info.labels.remove(LABEL_KEY_CPU);
221 info.labels.remove(LABEL_KEY_RAM_MIB);
222 info.labels.remove(LABEL_KEY_DISK_MIB);
223 info.labels.remove(LABEL_KEY_GEO);
224 gossip.announce_self(info).await?;
225 }
226 }
227
228 Ok(())
229 }
230
231 /// Snapshot of every currently-eligible node.
232 pub async fn list_eligible(&self) -> Vec<EdgeCacheNode> {
233 let guard = self.inner.read().await;
234 guard.values().cloned().collect()
235 }
236
237 /// Whether `node_id` is currently registered as edge-cache-eligible.
238 pub async fn is_enabled(&self, node_id: u64) -> bool {
239 let guard = self.inner.read().await;
240 guard.contains_key(&node_id)
241 }
242
243 /// Return placeholder hit/miss counters for `node_id`.
244 ///
245 /// **Documented stub**: always returns `(0, 0)` regardless of
246 /// `node_id`. The real counters land in the follow-on cache subsystem
247 /// (see module docs). This stays callable so upstream integration
248 /// tests can rely on the endpoint existing today. The `async` shape is
249 /// preserved so a future implementation can read shared cache state
250 /// without a breaking API churn.
251 #[allow(clippy::unused_async)]
252 pub async fn stats(&self, _node_id: u64) -> EdgeCacheStats {
253 EdgeCacheStats::default()
254 }
255}
256
257impl Default for EdgeCacheRegistry {
258 fn default() -> Self {
259 Self::new()
260 }
261}
262
263#[cfg(test)]
264mod tests {
265 use super::*;
266
267 fn cap(cpu: u32, ram: u64, disk: u64, geo: Option<&str>) -> NodeCapacity {
268 NodeCapacity {
269 cpu_cores: cpu,
270 ram_mib: ram,
271 disk_mib: disk,
272 geo: geo.map(str::to_string),
273 }
274 }
275
276 #[tokio::test]
277 async fn enable_then_disable_round_trip() {
278 let reg = EdgeCacheRegistry::new();
279 assert!(reg.list_eligible().await.is_empty());
280 assert!(!reg.is_enabled(7).await);
281
282 reg.enable(7, cap(4, 1024, 8192, Some("us-east-1")))
283 .await
284 .expect("enable");
285 assert!(reg.is_enabled(7).await);
286 let listed = reg.list_eligible().await;
287 assert_eq!(listed.len(), 1);
288 assert_eq!(listed[0].node_id, 7);
289 assert_eq!(listed[0].capacity.cpu_cores, 4);
290 assert_eq!(listed[0].capacity.geo.as_deref(), Some("us-east-1"));
291
292 reg.disable(7).await.expect("disable");
293 assert!(!reg.is_enabled(7).await);
294 assert!(reg.list_eligible().await.is_empty());
295 }
296
297 #[tokio::test]
298 async fn disable_unknown_node_errors() {
299 let reg = EdgeCacheRegistry::new();
300 let err = reg.disable(99).await.expect_err("should fail");
301 match err {
302 EdgeCacheError::NodeNotFound(99) => {}
303 other => panic!("unexpected error: {other:?}"),
304 }
305 }
306
307 #[tokio::test]
308 async fn stats_returns_placeholder_zero() {
309 let reg = EdgeCacheRegistry::new();
310 reg.enable(3, cap(1, 64, 256, None)).await.expect("enable");
311 let s = reg.stats(3).await;
312 assert_eq!(s.hits, 0);
313 assert_eq!(s.misses, 0);
314 // Unknown node also returns zeroes — stats is purely a stub.
315 let s2 = reg.stats(999).await;
316 assert_eq!(s2.hits, 0);
317 assert_eq!(s2.misses, 0);
318 }
319
320 #[tokio::test]
321 async fn enable_overwrites_capacity() {
322 let reg = EdgeCacheRegistry::new();
323 reg.enable(5, cap(2, 128, 512, None)).await.expect("enable");
324 reg.enable(5, cap(8, 4096, 16_384, Some("eu-west-1")))
325 .await
326 .expect("re-enable");
327 let listed = reg.list_eligible().await;
328 assert_eq!(listed.len(), 1);
329 assert_eq!(listed[0].capacity.cpu_cores, 8);
330 assert_eq!(listed[0].capacity.disk_mib, 16_384);
331 assert_eq!(listed[0].capacity.geo.as_deref(), Some("eu-west-1"));
332 }
333}