phago_distributed/shard/
ghost_cache.rs1use crate::types::{GhostNode, ShardId};
8use phago_core::types::NodeId;
9use std::collections::HashMap;
10
11pub struct GhostNodeCache {
30 cache: HashMap<NodeId, GhostNode>,
32 max_size: usize,
34 access_order: Vec<NodeId>,
36}
37
38impl GhostNodeCache {
39 pub fn new(max_size: usize) -> Self {
45 Self {
46 cache: HashMap::with_capacity(max_size),
47 max_size,
48 access_order: Vec::with_capacity(max_size),
49 }
50 }
51
52 pub fn get(&mut self, id: &NodeId) -> Option<&GhostNode> {
64 if self.cache.contains_key(id) {
65 self.access_order.retain(|x| x != id);
67 self.access_order.push(*id);
68 self.cache.get(id)
69 } else {
70 None
71 }
72 }
73
74 pub fn peek(&self, id: &NodeId) -> Option<&GhostNode> {
76 self.cache.get(id)
77 }
78
79 pub fn insert(&mut self, ghost: GhostNode) {
88 let id = ghost.node_id;
89
90 if self.cache.contains_key(&id) {
92 self.cache.insert(id, ghost);
93 self.access_order.retain(|x| *x != id);
95 self.access_order.push(id);
96 return;
97 }
98
99 while self.cache.len() >= self.max_size && !self.access_order.is_empty() {
101 let oldest = self.access_order.remove(0);
102 self.cache.remove(&oldest);
103 }
104
105 self.cache.insert(id, ghost);
106 self.access_order.push(id);
107 }
108
109 pub fn update_full_data(&mut self, id: &NodeId, data: phago_core::types::NodeData) {
116 if let Some(ghost) = self.cache.get_mut(id) {
117 ghost.full_data = Some(data);
118 }
119 }
120
121 pub fn contains(&self, id: &NodeId) -> bool {
127 self.cache.contains_key(id)
128 }
129
130 pub fn nodes_from_shard(&self, shard_id: ShardId) -> Vec<&GhostNode> {
140 self.cache
141 .values()
142 .filter(|g| g.shard_id == shard_id)
143 .collect()
144 }
145
146 pub fn all_nodes(&self) -> Vec<&GhostNode> {
148 self.cache.values().collect()
149 }
150
151 pub fn remove(&mut self, id: &NodeId) -> Option<GhostNode> {
161 self.access_order.retain(|x| x != id);
162 self.cache.remove(id)
163 }
164
165 pub fn clear(&mut self) {
167 self.cache.clear();
168 self.access_order.clear();
169 }
170
171 pub fn len(&self) -> usize {
173 self.cache.len()
174 }
175
176 pub fn is_empty(&self) -> bool {
178 self.cache.is_empty()
179 }
180
181 pub fn capacity(&self) -> usize {
183 self.max_size
184 }
185
186 pub fn invalidate_shard(&mut self, shard_id: ShardId) -> usize {
199 let to_remove: Vec<NodeId> = self
200 .cache
201 .iter()
202 .filter(|(_, g)| g.shard_id == shard_id)
203 .map(|(id, _)| *id)
204 .collect();
205
206 let count = to_remove.len();
207 for id in to_remove {
208 self.cache.remove(&id);
209 self.access_order.retain(|x| *x != id);
210 }
211
212 count
213 }
214
215 pub fn stats(&self) -> GhostCacheStats {
217 let mut nodes_by_shard: HashMap<ShardId, usize> = HashMap::new();
218 let mut with_full_data = 0;
219
220 for ghost in self.cache.values() {
221 *nodes_by_shard.entry(ghost.shard_id).or_insert(0) += 1;
222 if ghost.full_data.is_some() {
223 with_full_data += 1;
224 }
225 }
226
227 GhostCacheStats {
228 total_nodes: self.cache.len(),
229 max_capacity: self.max_size,
230 nodes_by_shard,
231 nodes_with_full_data: with_full_data,
232 }
233 }
234}
235
236#[derive(Debug, Clone)]
238pub struct GhostCacheStats {
239 pub total_nodes: usize,
241 pub max_capacity: usize,
243 pub nodes_by_shard: HashMap<ShardId, usize>,
245 pub nodes_with_full_data: usize,
247}
248
249#[cfg(test)]
250mod tests {
251 use super::*;
252
253 fn make_ghost(id: u64, shard: u32) -> GhostNode {
254 GhostNode::new(
255 NodeId::from_seed(id),
256 ShardId::new(shard),
257 format!("node_{}", id),
258 )
259 }
260
261 #[test]
262 fn test_insert_and_get() {
263 let mut cache = GhostNodeCache::new(10);
264 let ghost = make_ghost(1, 0);
265 let id = ghost.node_id;
266
267 cache.insert(ghost);
268
269 assert!(cache.contains(&id));
270 assert_eq!(cache.len(), 1);
271
272 let retrieved = cache.get(&id).unwrap();
273 assert_eq!(retrieved.label, "node_1");
274 }
275
276 #[test]
277 fn test_lru_eviction() {
278 let mut cache = GhostNodeCache::new(3);
279
280 cache.insert(make_ghost(1, 0));
282 cache.insert(make_ghost(2, 0));
283 cache.insert(make_ghost(3, 0));
284
285 let _ = cache.get(&NodeId::from_seed(1));
287
288 cache.insert(make_ghost(4, 0));
290
291 assert!(cache.contains(&NodeId::from_seed(1)));
292 assert!(!cache.contains(&NodeId::from_seed(2))); assert!(cache.contains(&NodeId::from_seed(3)));
294 assert!(cache.contains(&NodeId::from_seed(4)));
295 }
296
297 #[test]
298 fn test_nodes_from_shard() {
299 let mut cache = GhostNodeCache::new(10);
300
301 cache.insert(make_ghost(1, 0));
302 cache.insert(make_ghost(2, 1));
303 cache.insert(make_ghost(3, 0));
304 cache.insert(make_ghost(4, 2));
305
306 let shard0_nodes = cache.nodes_from_shard(ShardId::new(0));
307 assert_eq!(shard0_nodes.len(), 2);
308
309 let shard1_nodes = cache.nodes_from_shard(ShardId::new(1));
310 assert_eq!(shard1_nodes.len(), 1);
311 }
312
313 #[test]
314 fn test_invalidate_shard() {
315 let mut cache = GhostNodeCache::new(10);
316
317 cache.insert(make_ghost(1, 0));
318 cache.insert(make_ghost(2, 1));
319 cache.insert(make_ghost(3, 0));
320
321 let count = cache.invalidate_shard(ShardId::new(0));
322 assert_eq!(count, 2);
323 assert_eq!(cache.len(), 1);
324 assert!(cache.contains(&NodeId::from_seed(2)));
325 }
326
327 #[test]
328 fn test_clear() {
329 let mut cache = GhostNodeCache::new(10);
330
331 cache.insert(make_ghost(1, 0));
332 cache.insert(make_ghost(2, 0));
333
334 cache.clear();
335
336 assert!(cache.is_empty());
337 assert_eq!(cache.len(), 0);
338 }
339
340 #[test]
341 fn test_stats() {
342 let mut cache = GhostNodeCache::new(10);
343
344 cache.insert(make_ghost(1, 0));
345 cache.insert(make_ghost(2, 1));
346 cache.insert(make_ghost(3, 0));
347
348 let stats = cache.stats();
349 assert_eq!(stats.total_nodes, 3);
350 assert_eq!(stats.max_capacity, 10);
351 assert_eq!(*stats.nodes_by_shard.get(&ShardId::new(0)).unwrap(), 2);
352 assert_eq!(*stats.nodes_by_shard.get(&ShardId::new(1)).unwrap(), 1);
353 }
354}