Skip to main content

zlayer_overlay/
edge_cache.rs

1//! Edge-cache eligibility registry.
2//!
3//! Tracks which nodes have been registered as eligible for edge caching by
4//! the upstream control plane (the consuming cluster manager).
5//! Propagates the eligibility decision via
6//! the gossip-label mechanism in [`crate::gossip`] so peer nodes can route
7//! cache-bearing traffic to eligible nodes by inspecting their advertised
8//! [`gossip::PeerInfo::labels`](crate::gossip::PeerInfo).
9//!
10//! The actual cache fill / eviction / hit-counting logic is out of scope
11//! for this module — it lives in upstream feature work (`Z3Fungi` sidecar or
12//! a future overlay-native cache primitive). [`EdgeCacheRegistry::stats`]
13//! returns a `(0, 0)` placeholder intentionally; the API surface exists so
14//! upstream integration tests (e.g. Zatabase Wave 1.3.3.7) can build
15//! against it.
16
17use std::collections::HashMap;
18use std::sync::Arc;
19use std::time::SystemTime;
20
21use thiserror::Error;
22use tokio::sync::RwLock;
23
24use crate::error::OverlayError;
25use crate::gossip::{GossipPool, PeerInfo};
26
27/// Cache capacity advertised by a node when it becomes eligible.
28#[derive(Debug, Clone)]
29pub struct NodeCapacity {
30    /// Number of CPU cores the node is willing to dedicate to cache work.
31    pub cpu_cores: u32,
32    /// Resident memory (MiB) the node will allow the cache to occupy.
33    pub ram_mib: u64,
34    /// On-disk cache budget (MiB).
35    pub disk_mib: u64,
36    /// Optional geo / region label used by future placement decisions.
37    pub geo: Option<String>,
38}
39
40/// One entry in the registry — a node currently marked edge-cache-eligible.
41#[derive(Debug, Clone)]
42pub struct EdgeCacheNode {
43    /// `ZLayer` node ID this entry describes.
44    pub node_id: u64,
45    /// Capacity the node declared at registration time.
46    pub capacity: NodeCapacity,
47    /// Wall-clock time the node was added to the registry.
48    pub enabled_at: SystemTime,
49}
50
51/// Per-node cache hit/miss counters.
52///
53/// This is a documented placeholder: today both fields are always `0`.
54/// The real counters will land in a follow-on patch when the cache fill /
55/// eviction subsystem itself is implemented. The type lives here now so
56/// upstream callers (Zatabase Wave 1.3.3.7 in particular) can build
57/// integration tests against the stable shape.
58#[derive(Debug, Clone, Copy, Default)]
59pub struct EdgeCacheStats {
60    /// Total cache hits observed for the node since registration.
61    pub hits: u64,
62    /// Total cache misses observed for the node since registration.
63    pub misses: u64,
64}
65
66/// Errors returned by [`EdgeCacheRegistry`].
67#[derive(Debug, Error)]
68pub enum EdgeCacheError {
69    /// Re-publishing the gossip self-info (carrying the updated labels)
70    /// failed at the underlying chitchat layer.
71    #[error("gossip label push failed: {0}")]
72    Gossip(String),
73    /// Caller asked to disable a node that wasn't registered. Treated as
74    /// an error so the caller knows their view of the world is stale.
75    #[error("node {0} is not registered as edge-cache eligible")]
76    NodeNotFound(u64),
77}
78
79impl From<OverlayError> for EdgeCacheError {
80    fn from(err: OverlayError) -> Self {
81        EdgeCacheError::Gossip(err.to_string())
82    }
83}
84
85/// Standard gossip-label keys this registry writes/clears.
86const LABEL_KEY_ENABLED: &str = "edge_cache";
87const LABEL_KEY_CPU: &str = "edge_cache_cpu";
88const LABEL_KEY_RAM_MIB: &str = "edge_cache_ram_mib";
89const LABEL_KEY_DISK_MIB: &str = "edge_cache_disk_mib";
90const LABEL_KEY_GEO: &str = "edge_cache_geo";
91
92/// Tracks which nodes are eligible for edge caching and propagates that
93/// state via gossip labels.
94///
95/// Construction is cheap; the registry is `Clone` (it's an
96/// `Arc`-wrapped pair of state + gossip handle) so it can be plumbed into
97/// Axum handlers behind `with_state`.
98#[derive(Clone)]
99pub struct EdgeCacheRegistry {
100    inner: Arc<RwLock<HashMap<u64, EdgeCacheNode>>>,
101    /// Gossip pool used to broadcast the `edge_cache=true` label, plus
102    /// the capacity sub-labels. `None` in test/standalone modes where no
103    /// gossip pool is wired — the registry still tracks eligibility
104    /// locally but doesn't propagate.
105    gossip: Option<Arc<GossipPool>>,
106    /// Self-info template used when re-announcing after a label change.
107    /// `None` when [`gossip`] is also `None`.
108    self_info: Arc<RwLock<Option<PeerInfo>>>,
109}
110
111impl std::fmt::Debug for EdgeCacheRegistry {
112    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113        f.debug_struct("EdgeCacheRegistry")
114            .field("gossip_attached", &self.gossip.is_some())
115            .finish_non_exhaustive()
116    }
117}
118
119impl EdgeCacheRegistry {
120    /// Create a registry with no gossip pool attached.
121    ///
122    /// Eligibility decisions are still tracked locally and returned by
123    /// [`Self::list_eligible`], but no labels are pushed to peers. Used
124    /// by tests and by standalone-mode daemons that don't participate in
125    /// the worker-tier gossip pool.
126    #[must_use]
127    pub fn new() -> Self {
128        Self {
129            inner: Arc::new(RwLock::new(HashMap::new())),
130            gossip: None,
131            self_info: Arc::new(RwLock::new(None)),
132        }
133    }
134
135    /// Create a registry that pushes eligibility labels into the
136    /// supplied gossip pool.
137    ///
138    /// `self_info` is the [`PeerInfo`] template the registry will mutate
139    /// (label keys are added on `enable` and removed on `disable`) and
140    /// then re-broadcast via [`GossipPool::announce_self`].
141    #[must_use]
142    pub fn with_gossip(gossip: Arc<GossipPool>, self_info: PeerInfo) -> Self {
143        Self {
144            inner: Arc::new(RwLock::new(HashMap::new())),
145            gossip: Some(gossip),
146            self_info: Arc::new(RwLock::new(Some(self_info))),
147        }
148    }
149
150    /// Mark `node_id` as eligible for edge caching with `capacity`.
151    ///
152    /// Inserts a new [`EdgeCacheNode`] into the registry (overwriting any
153    /// prior entry for the same `node_id`) and, when a gossip pool is
154    /// attached, pushes the standard label set into the local node's
155    /// gossip self-info and re-announces it.
156    ///
157    /// # Errors
158    ///
159    /// Returns [`EdgeCacheError::Gossip`] if the gossip pool rejects the
160    /// re-announcement (serialization error inside chitchat).
161    pub async fn enable(&self, node_id: u64, capacity: NodeCapacity) -> Result<(), EdgeCacheError> {
162        {
163            let mut guard = self.inner.write().await;
164            guard.insert(
165                node_id,
166                EdgeCacheNode {
167                    node_id,
168                    capacity: capacity.clone(),
169                    enabled_at: SystemTime::now(),
170                },
171            );
172        }
173
174        if let Some(gossip) = self.gossip.as_ref() {
175            let mut info_guard = self.self_info.write().await;
176            if let Some(info) = info_guard.as_mut() {
177                info.labels
178                    .insert(LABEL_KEY_ENABLED.to_string(), "true".to_string());
179                info.labels
180                    .insert(LABEL_KEY_CPU.to_string(), capacity.cpu_cores.to_string());
181                info.labels
182                    .insert(LABEL_KEY_RAM_MIB.to_string(), capacity.ram_mib.to_string());
183                info.labels.insert(
184                    LABEL_KEY_DISK_MIB.to_string(),
185                    capacity.disk_mib.to_string(),
186                );
187                if let Some(geo) = capacity.geo.as_ref() {
188                    info.labels.insert(LABEL_KEY_GEO.to_string(), geo.clone());
189                } else {
190                    info.labels.remove(LABEL_KEY_GEO);
191                }
192                gossip.announce_self(info).await?;
193            }
194        }
195
196        Ok(())
197    }
198
199    /// Remove `node_id` from the eligibility registry.
200    ///
201    /// When a gossip pool is attached, clears the edge-cache label set
202    /// from the local node's gossip self-info and re-announces.
203    ///
204    /// # Errors
205    ///
206    /// - [`EdgeCacheError::NodeNotFound`] if `node_id` was not registered.
207    /// - [`EdgeCacheError::Gossip`] if the gossip re-announcement fails.
208    pub async fn disable(&self, node_id: u64) -> Result<(), EdgeCacheError> {
209        {
210            let mut guard = self.inner.write().await;
211            if guard.remove(&node_id).is_none() {
212                return Err(EdgeCacheError::NodeNotFound(node_id));
213            }
214        }
215
216        if let Some(gossip) = self.gossip.as_ref() {
217            let mut info_guard = self.self_info.write().await;
218            if let Some(info) = info_guard.as_mut() {
219                info.labels.remove(LABEL_KEY_ENABLED);
220                info.labels.remove(LABEL_KEY_CPU);
221                info.labels.remove(LABEL_KEY_RAM_MIB);
222                info.labels.remove(LABEL_KEY_DISK_MIB);
223                info.labels.remove(LABEL_KEY_GEO);
224                gossip.announce_self(info).await?;
225            }
226        }
227
228        Ok(())
229    }
230
231    /// Snapshot of every currently-eligible node.
232    pub async fn list_eligible(&self) -> Vec<EdgeCacheNode> {
233        let guard = self.inner.read().await;
234        guard.values().cloned().collect()
235    }
236
237    /// Whether `node_id` is currently registered as edge-cache-eligible.
238    pub async fn is_enabled(&self, node_id: u64) -> bool {
239        let guard = self.inner.read().await;
240        guard.contains_key(&node_id)
241    }
242
243    /// Return placeholder hit/miss counters for `node_id`.
244    ///
245    /// **Documented stub**: always returns `(0, 0)` regardless of
246    /// `node_id`. The real counters land in the follow-on cache subsystem
247    /// (see module docs). This stays callable so upstream integration
248    /// tests can rely on the endpoint existing today. The `async` shape is
249    /// preserved so a future implementation can read shared cache state
250    /// without a breaking API churn.
251    #[allow(clippy::unused_async)]
252    pub async fn stats(&self, _node_id: u64) -> EdgeCacheStats {
253        EdgeCacheStats::default()
254    }
255}
256
257impl Default for EdgeCacheRegistry {
258    fn default() -> Self {
259        Self::new()
260    }
261}
262
263#[cfg(test)]
264mod tests {
265    use super::*;
266
267    fn cap(cpu: u32, ram: u64, disk: u64, geo: Option<&str>) -> NodeCapacity {
268        NodeCapacity {
269            cpu_cores: cpu,
270            ram_mib: ram,
271            disk_mib: disk,
272            geo: geo.map(str::to_string),
273        }
274    }
275
276    #[tokio::test]
277    async fn enable_then_disable_round_trip() {
278        let reg = EdgeCacheRegistry::new();
279        assert!(reg.list_eligible().await.is_empty());
280        assert!(!reg.is_enabled(7).await);
281
282        reg.enable(7, cap(4, 1024, 8192, Some("us-east-1")))
283            .await
284            .expect("enable");
285        assert!(reg.is_enabled(7).await);
286        let listed = reg.list_eligible().await;
287        assert_eq!(listed.len(), 1);
288        assert_eq!(listed[0].node_id, 7);
289        assert_eq!(listed[0].capacity.cpu_cores, 4);
290        assert_eq!(listed[0].capacity.geo.as_deref(), Some("us-east-1"));
291
292        reg.disable(7).await.expect("disable");
293        assert!(!reg.is_enabled(7).await);
294        assert!(reg.list_eligible().await.is_empty());
295    }
296
297    #[tokio::test]
298    async fn disable_unknown_node_errors() {
299        let reg = EdgeCacheRegistry::new();
300        let err = reg.disable(99).await.expect_err("should fail");
301        match err {
302            EdgeCacheError::NodeNotFound(99) => {}
303            other => panic!("unexpected error: {other:?}"),
304        }
305    }
306
307    #[tokio::test]
308    async fn stats_returns_placeholder_zero() {
309        let reg = EdgeCacheRegistry::new();
310        reg.enable(3, cap(1, 64, 256, None)).await.expect("enable");
311        let s = reg.stats(3).await;
312        assert_eq!(s.hits, 0);
313        assert_eq!(s.misses, 0);
314        // Unknown node also returns zeroes — stats is purely a stub.
315        let s2 = reg.stats(999).await;
316        assert_eq!(s2.hits, 0);
317        assert_eq!(s2.misses, 0);
318    }
319
320    #[tokio::test]
321    async fn enable_overwrites_capacity() {
322        let reg = EdgeCacheRegistry::new();
323        reg.enable(5, cap(2, 128, 512, None)).await.expect("enable");
324        reg.enable(5, cap(8, 4096, 16_384, Some("eu-west-1")))
325            .await
326            .expect("re-enable");
327        let listed = reg.list_eligible().await;
328        assert_eq!(listed.len(), 1);
329        assert_eq!(listed[0].capacity.cpu_cores, 8);
330        assert_eq!(listed[0].capacity.disk_mib, 16_384);
331        assert_eq!(listed[0].capacity.geo.as_deref(), Some("eu-west-1"));
332    }
333}