1use std::collections::HashMap;
2use tracing::{debug, info};
3
4#[derive(
6 Debug,
7 Clone,
8 serde::Serialize,
9 serde::Deserialize,
10 zerompk::ToMessagePack,
11 zerompk::FromMessagePack,
12)]
13struct PersistedGhostStub {
14 target_shard: u16,
15 refcount: u32,
16 created_at_ms: u64,
17}
18
19#[derive(Debug, Clone, PartialEq, Eq)]
27pub struct GhostStub {
28 pub node_id: String,
30 pub target_shard: u16,
32 pub refcount: u32,
34 pub created_at_ms: u64,
36}
37
38impl GhostStub {
39 pub fn new(node_id: String, target_shard: u16, initial_refcount: u32) -> Self {
40 Self {
41 node_id,
42 target_shard,
43 refcount: initial_refcount,
44 created_at_ms: std::time::SystemTime::now()
45 .duration_since(std::time::UNIX_EPOCH)
46 .unwrap_or_else(|e| e.duration())
47 .as_millis() as u64,
48 }
49 }
50
51 pub fn decrement_ref(&mut self) -> bool {
54 self.refcount = self.refcount.saturating_sub(1);
55 self.refcount == 0
56 }
57
58 pub fn increment_ref(&mut self) {
60 self.refcount = self.refcount.saturating_add(1);
61 }
62}
63
64#[derive(Debug, Clone, PartialEq, Eq)]
66pub enum SweepVerdict {
67 Keep,
69 Purge,
71 Inconclusive,
73}
74
75#[derive(Debug, Clone)]
80pub struct GhostTable {
81 stubs: HashMap<String, GhostStub>,
83 purge_count: u64,
85 last_sweep_ms: u64,
87}
88
89impl GhostTable {
90 pub fn new() -> Self {
91 Self {
92 stubs: HashMap::new(),
93 purge_count: 0,
94 last_sweep_ms: 0,
95 }
96 }
97
98 pub fn insert(&mut self, stub: GhostStub) {
100 debug!(
101 node = %stub.node_id,
102 target_shard = stub.target_shard,
103 refcount = stub.refcount,
104 "inserted ghost stub"
105 );
106 self.stubs.insert(stub.node_id.clone(), stub);
107 }
108
109 pub fn get(&self, node_id: &str) -> Option<&GhostStub> {
111 self.stubs.get(node_id)
112 }
113
114 pub fn decrement_ref(&mut self, node_id: &str) -> bool {
117 if let Some(stub) = self.stubs.get_mut(node_id)
118 && stub.decrement_ref()
119 {
120 self.stubs.remove(node_id);
121 self.purge_count += 1;
122 return true;
123 }
124 false
125 }
126
127 pub fn increment_ref(&mut self, node_id: &str) {
129 if let Some(stub) = self.stubs.get_mut(node_id) {
130 stub.increment_ref();
131 }
132 }
133
134 pub fn sweep<F>(&mut self, verify_fn: F) -> SweepReport
143 where
144 F: Fn(&str, u16) -> SweepVerdict,
145 {
146 let now_ms = std::time::SystemTime::now()
147 .duration_since(std::time::UNIX_EPOCH)
148 .unwrap_or_default()
149 .as_millis() as u64;
150 self.last_sweep_ms = now_ms;
151
152 let mut report = SweepReport::default();
153 let mut to_purge = Vec::new();
154
155 for (node_id, stub) in &self.stubs {
156 report.checked += 1;
157
158 if stub.refcount == 0 {
160 to_purge.push(node_id.clone());
161 report.purged += 1;
162 continue;
163 }
164
165 match verify_fn(node_id, stub.target_shard) {
166 SweepVerdict::Purge => {
167 to_purge.push(node_id.clone());
168 report.purged += 1;
169 }
170 SweepVerdict::Keep => {
171 report.kept += 1;
172 }
173 SweepVerdict::Inconclusive => {
174 report.inconclusive += 1;
175 }
176 }
177 }
178
179 for node_id in to_purge {
180 self.stubs.remove(&node_id);
181 self.purge_count += 1;
182 }
183
184 if report.purged > 0 {
185 info!(
186 purged = report.purged,
187 kept = report.kept,
188 inconclusive = report.inconclusive,
189 total_ghosts = self.stubs.len(),
190 "anti-entropy sweep complete"
191 );
192 }
193
194 report
195 }
196
197 pub fn len(&self) -> usize {
198 self.stubs.len()
199 }
200
201 pub fn is_empty(&self) -> bool {
202 self.stubs.is_empty()
203 }
204
205 pub fn total_purged(&self) -> u64 {
206 self.purge_count
207 }
208
209 pub fn last_sweep_ms(&self) -> u64 {
210 self.last_sweep_ms
211 }
212
213 pub fn stubs(&self) -> impl Iterator<Item = &GhostStub> {
215 self.stubs.values()
216 }
217
218 pub fn resolve(&self, node_id: &str) -> Option<u16> {
221 self.stubs.get(node_id).map(|s| s.target_shard)
222 }
223}
224
225impl GhostTable {
226 pub fn to_bytes(&self) -> Vec<u8> {
230 let persisted: HashMap<String, PersistedGhostStub> = self
231 .stubs
232 .iter()
233 .map(|(k, v)| {
234 (
235 k.clone(),
236 PersistedGhostStub {
237 target_shard: v.target_shard,
238 refcount: v.refcount,
239 created_at_ms: v.created_at_ms,
240 },
241 )
242 })
243 .collect();
244 match zerompk::to_msgpack_vec(&persisted) {
245 Ok(bytes) => bytes,
246 Err(e) => {
247 tracing::error!(error = %e, "ghost table serialization failed — state will not persist");
248 Vec::new()
249 }
250 }
251 }
252
253 pub fn from_bytes(data: &[u8]) -> Option<Self> {
257 let persisted: HashMap<String, PersistedGhostStub> = zerompk::from_msgpack(data).ok()?;
258 let stubs: HashMap<String, GhostStub> = persisted
259 .into_iter()
260 .map(|(k, v)| {
261 (
262 k.clone(),
263 GhostStub {
264 node_id: k,
265 target_shard: v.target_shard,
266 refcount: v.refcount,
267 created_at_ms: v.created_at_ms,
268 },
269 )
270 })
271 .collect();
272 Some(Self {
273 stubs,
274 purge_count: 0,
275 last_sweep_ms: 0,
276 })
277 }
278}
279
280impl Default for GhostTable {
281 fn default() -> Self {
282 Self::new()
283 }
284}
285
286#[derive(Debug, Default, Clone)]
288pub struct SweepReport {
289 pub checked: usize,
290 pub purged: usize,
291 pub kept: usize,
292 pub inconclusive: usize,
293}
294
295#[cfg(test)]
296mod tests {
297 use super::*;
298
299 #[test]
300 fn ghost_lifecycle() {
301 let mut table = GhostTable::new();
302 let stub = GhostStub::new("node-42".into(), 10, 3);
303 table.insert(stub);
304 assert_eq!(table.len(), 1);
305
306 assert_eq!(table.resolve("node-42"), Some(10));
308 assert_eq!(table.resolve("nonexistent"), None);
309
310 assert!(!table.decrement_ref("node-42"));
312 assert!(!table.decrement_ref("node-42"));
313 assert!(table.decrement_ref("node-42")); assert!(table.is_empty());
315 assert_eq!(table.total_purged(), 1);
316 }
317
318 #[test]
319 fn ghost_increment_ref() {
320 let mut table = GhostTable::new();
321 table.insert(GhostStub::new("n1".into(), 5, 1));
322 table.increment_ref("n1");
323 assert!(!table.decrement_ref("n1"));
325 assert!(table.decrement_ref("n1"));
326 }
327
328 #[test]
329 fn sweep_purges_stale_ghosts() {
330 let mut table = GhostTable::new();
331 table.insert(GhostStub::new("stale".into(), 5, 1));
332 table.insert(GhostStub::new("alive".into(), 6, 2));
333 table.insert(GhostStub::new("unreachable".into(), 7, 1));
334
335 let report = table.sweep(|node_id, _target| match node_id {
336 "stale" => SweepVerdict::Purge,
337 "alive" => SweepVerdict::Keep,
338 "unreachable" => SweepVerdict::Inconclusive,
339 _ => SweepVerdict::Keep,
340 });
341
342 assert_eq!(report.checked, 3);
343 assert_eq!(report.purged, 1);
344 assert_eq!(report.kept, 1);
345 assert_eq!(report.inconclusive, 1);
346 assert_eq!(table.len(), 2); }
348
349 #[test]
350 fn sweep_purges_zero_refcount_without_remote() {
351 let mut table = GhostTable::new();
352 let mut stub = GhostStub::new("zero-ref".into(), 5, 1);
353 stub.refcount = 0; table.insert(stub);
355
356 let report = table.sweep(|_, _| SweepVerdict::Keep);
359 assert_eq!(report.purged, 1);
360 assert!(table.is_empty());
361 }
362
363 #[test]
364 fn resolve_for_scatter_gather() {
365 let mut table = GhostTable::new();
366 table.insert(GhostStub::new("migrated-node".into(), 42, 5));
367
368 let target = table.resolve("migrated-node");
370 assert_eq!(target, Some(42));
371 }
372}