Skip to main content

phago_distributed/shard/
ghost_cache.rs

1//! Ghost node cache for cross-shard references.
2//!
3//! This module provides a cache for ghost nodes - lightweight references
4//! to nodes that live on remote shards. The cache uses LRU eviction to
5//! bound memory usage while keeping frequently accessed ghost nodes warm.
6
7use crate::types::{GhostNode, ShardId};
8use phago_core::types::NodeId;
9use std::collections::HashMap;
10
11/// Cache of ghost nodes from other shards.
12///
13/// Ghost nodes are references to nodes that live on remote shards.
14/// This cache maintains a bounded set of ghost nodes using LRU eviction
15/// to keep frequently accessed nodes available while limiting memory usage.
16///
17/// # Example
18///
19/// ```ignore
20/// use phago_distributed::shard::GhostNodeCache;
21///
22/// let mut cache = GhostNodeCache::new(100);
23/// cache.insert(ghost_node);
24///
25/// if let Some(ghost) = cache.get(&node_id) {
26///     println!("Found ghost node: {}", ghost.label);
27/// }
28/// ```
29pub struct GhostNodeCache {
30    /// Ghost nodes indexed by ID.
31    cache: HashMap<NodeId, GhostNode>,
32    /// Maximum cache size.
33    max_size: usize,
34    /// Access order for LRU eviction (most recently used at the end).
35    access_order: Vec<NodeId>,
36}
37
38impl GhostNodeCache {
39    /// Create a new ghost node cache with the specified maximum size.
40    ///
41    /// # Arguments
42    ///
43    /// * `max_size` - Maximum number of ghost nodes to cache
44    pub fn new(max_size: usize) -> Self {
45        Self {
46            cache: HashMap::with_capacity(max_size),
47            max_size,
48            access_order: Vec::with_capacity(max_size),
49        }
50    }
51
52    /// Get a ghost node from cache.
53    ///
54    /// Updates the access order for LRU tracking.
55    ///
56    /// # Arguments
57    ///
58    /// * `id` - The node ID to look up
59    ///
60    /// # Returns
61    ///
62    /// A reference to the ghost node if found.
63    pub fn get(&mut self, id: &NodeId) -> Option<&GhostNode> {
64        if self.cache.contains_key(id) {
65            // Update access order for LRU
66            self.access_order.retain(|x| x != id);
67            self.access_order.push(*id);
68            self.cache.get(id)
69        } else {
70            None
71        }
72    }
73
74    /// Get a ghost node without updating LRU order (for read-only access).
75    pub fn peek(&self, id: &NodeId) -> Option<&GhostNode> {
76        self.cache.get(id)
77    }
78
79    /// Insert a ghost node into the cache.
80    ///
81    /// If the cache is at capacity, the least recently used node
82    /// will be evicted to make room.
83    ///
84    /// # Arguments
85    ///
86    /// * `ghost` - The ghost node to insert
87    pub fn insert(&mut self, ghost: GhostNode) {
88        let id = ghost.node_id;
89
90        // If already in cache, just update it
91        if self.cache.contains_key(&id) {
92            self.cache.insert(id, ghost);
93            // Update access order
94            self.access_order.retain(|x| *x != id);
95            self.access_order.push(id);
96            return;
97        }
98
99        // Evict if at capacity
100        while self.cache.len() >= self.max_size && !self.access_order.is_empty() {
101            let oldest = self.access_order.remove(0);
102            self.cache.remove(&oldest);
103        }
104
105        self.cache.insert(id, ghost);
106        self.access_order.push(id);
107    }
108
109    /// Update a ghost node with full data fetched from the remote shard.
110    ///
111    /// # Arguments
112    ///
113    /// * `id` - The node ID to update
114    /// * `data` - The full node data
115    pub fn update_full_data(&mut self, id: &NodeId, data: phago_core::types::NodeData) {
116        if let Some(ghost) = self.cache.get_mut(id) {
117            ghost.full_data = Some(data);
118        }
119    }
120
121    /// Check if a node is cached.
122    ///
123    /// # Arguments
124    ///
125    /// * `id` - The node ID to check
126    pub fn contains(&self, id: &NodeId) -> bool {
127        self.cache.contains_key(id)
128    }
129
130    /// Get all ghost nodes from a specific shard.
131    ///
132    /// # Arguments
133    ///
134    /// * `shard_id` - The shard to filter by
135    ///
136    /// # Returns
137    ///
138    /// A vector of references to ghost nodes from the specified shard.
139    pub fn nodes_from_shard(&self, shard_id: ShardId) -> Vec<&GhostNode> {
140        self.cache
141            .values()
142            .filter(|g| g.shard_id == shard_id)
143            .collect()
144    }
145
146    /// Get all ghost nodes in the cache.
147    pub fn all_nodes(&self) -> Vec<&GhostNode> {
148        self.cache.values().collect()
149    }
150
151    /// Remove a ghost node from the cache.
152    ///
153    /// # Arguments
154    ///
155    /// * `id` - The node ID to remove
156    ///
157    /// # Returns
158    ///
159    /// The removed ghost node if it was present.
160    pub fn remove(&mut self, id: &NodeId) -> Option<GhostNode> {
161        self.access_order.retain(|x| x != id);
162        self.cache.remove(id)
163    }
164
165    /// Clear the cache.
166    pub fn clear(&mut self) {
167        self.cache.clear();
168        self.access_order.clear();
169    }
170
171    /// Number of cached nodes.
172    pub fn len(&self) -> usize {
173        self.cache.len()
174    }
175
176    /// Check if cache is empty.
177    pub fn is_empty(&self) -> bool {
178        self.cache.is_empty()
179    }
180
181    /// Get the maximum cache size.
182    pub fn capacity(&self) -> usize {
183        self.max_size
184    }
185
186    /// Invalidate all ghost nodes from a specific shard.
187    ///
188    /// This is useful when a shard becomes unavailable or
189    /// undergoes significant changes.
190    ///
191    /// # Arguments
192    ///
193    /// * `shard_id` - The shard whose nodes should be invalidated
194    ///
195    /// # Returns
196    ///
197    /// The number of nodes that were invalidated.
198    pub fn invalidate_shard(&mut self, shard_id: ShardId) -> usize {
199        let to_remove: Vec<NodeId> = self
200            .cache
201            .iter()
202            .filter(|(_, g)| g.shard_id == shard_id)
203            .map(|(id, _)| *id)
204            .collect();
205
206        let count = to_remove.len();
207        for id in to_remove {
208            self.cache.remove(&id);
209            self.access_order.retain(|x| *x != id);
210        }
211
212        count
213    }
214
215    /// Get statistics about the cache.
216    pub fn stats(&self) -> GhostCacheStats {
217        let mut nodes_by_shard: HashMap<ShardId, usize> = HashMap::new();
218        let mut with_full_data = 0;
219
220        for ghost in self.cache.values() {
221            *nodes_by_shard.entry(ghost.shard_id).or_insert(0) += 1;
222            if ghost.full_data.is_some() {
223                with_full_data += 1;
224            }
225        }
226
227        GhostCacheStats {
228            total_nodes: self.cache.len(),
229            max_capacity: self.max_size,
230            nodes_by_shard,
231            nodes_with_full_data: with_full_data,
232        }
233    }
234}
235
236/// Statistics about the ghost node cache.
237#[derive(Debug, Clone)]
238pub struct GhostCacheStats {
239    /// Total number of cached ghost nodes.
240    pub total_nodes: usize,
241    /// Maximum capacity of the cache.
242    pub max_capacity: usize,
243    /// Number of nodes cached from each shard.
244    pub nodes_by_shard: HashMap<ShardId, usize>,
245    /// Number of nodes that have full data fetched.
246    pub nodes_with_full_data: usize,
247}
248
249#[cfg(test)]
250mod tests {
251    use super::*;
252
253    fn make_ghost(id: u64, shard: u32) -> GhostNode {
254        GhostNode::new(
255            NodeId::from_seed(id),
256            ShardId::new(shard),
257            format!("node_{}", id),
258        )
259    }
260
261    #[test]
262    fn test_insert_and_get() {
263        let mut cache = GhostNodeCache::new(10);
264        let ghost = make_ghost(1, 0);
265        let id = ghost.node_id;
266
267        cache.insert(ghost);
268
269        assert!(cache.contains(&id));
270        assert_eq!(cache.len(), 1);
271
272        let retrieved = cache.get(&id).unwrap();
273        assert_eq!(retrieved.label, "node_1");
274    }
275
276    #[test]
277    fn test_lru_eviction() {
278        let mut cache = GhostNodeCache::new(3);
279
280        // Insert 3 nodes
281        cache.insert(make_ghost(1, 0));
282        cache.insert(make_ghost(2, 0));
283        cache.insert(make_ghost(3, 0));
284
285        // Access node 1 to make it more recently used
286        let _ = cache.get(&NodeId::from_seed(1));
287
288        // Insert a 4th node - should evict node 2 (least recently used)
289        cache.insert(make_ghost(4, 0));
290
291        assert!(cache.contains(&NodeId::from_seed(1)));
292        assert!(!cache.contains(&NodeId::from_seed(2))); // Evicted
293        assert!(cache.contains(&NodeId::from_seed(3)));
294        assert!(cache.contains(&NodeId::from_seed(4)));
295    }
296
297    #[test]
298    fn test_nodes_from_shard() {
299        let mut cache = GhostNodeCache::new(10);
300
301        cache.insert(make_ghost(1, 0));
302        cache.insert(make_ghost(2, 1));
303        cache.insert(make_ghost(3, 0));
304        cache.insert(make_ghost(4, 2));
305
306        let shard0_nodes = cache.nodes_from_shard(ShardId::new(0));
307        assert_eq!(shard0_nodes.len(), 2);
308
309        let shard1_nodes = cache.nodes_from_shard(ShardId::new(1));
310        assert_eq!(shard1_nodes.len(), 1);
311    }
312
313    #[test]
314    fn test_invalidate_shard() {
315        let mut cache = GhostNodeCache::new(10);
316
317        cache.insert(make_ghost(1, 0));
318        cache.insert(make_ghost(2, 1));
319        cache.insert(make_ghost(3, 0));
320
321        let count = cache.invalidate_shard(ShardId::new(0));
322        assert_eq!(count, 2);
323        assert_eq!(cache.len(), 1);
324        assert!(cache.contains(&NodeId::from_seed(2)));
325    }
326
327    #[test]
328    fn test_clear() {
329        let mut cache = GhostNodeCache::new(10);
330
331        cache.insert(make_ghost(1, 0));
332        cache.insert(make_ghost(2, 0));
333
334        cache.clear();
335
336        assert!(cache.is_empty());
337        assert_eq!(cache.len(), 0);
338    }
339
340    #[test]
341    fn test_stats() {
342        let mut cache = GhostNodeCache::new(10);
343
344        cache.insert(make_ghost(1, 0));
345        cache.insert(make_ghost(2, 1));
346        cache.insert(make_ghost(3, 0));
347
348        let stats = cache.stats();
349        assert_eq!(stats.total_nodes, 3);
350        assert_eq!(stats.max_capacity, 10);
351        assert_eq!(*stats.nodes_by_shard.get(&ShardId::new(0)).unwrap(), 2);
352        assert_eq!(*stats.nodes_by_shard.get(&ShardId::new(1)).unwrap(), 1);
353    }
354}