1use std::collections::HashMap;
4use tracing::{debug, info};
5
6#[derive(
8 Debug,
9 Clone,
10 serde::Serialize,
11 serde::Deserialize,
12 zerompk::ToMessagePack,
13 zerompk::FromMessagePack,
14)]
15struct PersistedGhostStub {
16 target_shard: u32,
17 refcount: u32,
18 created_at_ms: u64,
19}
20
21#[derive(Debug, Clone, PartialEq, Eq)]
29pub struct GhostStub {
30 pub node_id: String,
32 pub target_shard: u32,
34 pub refcount: u32,
36 pub created_at_ms: u64,
38}
39
40impl GhostStub {
41 pub fn new(node_id: String, target_shard: u32, initial_refcount: u32) -> Self {
42 Self {
43 node_id,
44 target_shard,
45 refcount: initial_refcount,
46 created_at_ms: std::time::SystemTime::now()
47 .duration_since(std::time::UNIX_EPOCH)
48 .unwrap_or_else(|e| e.duration())
49 .as_millis() as u64,
50 }
51 }
52
53 pub fn decrement_ref(&mut self) -> bool {
56 self.refcount = self.refcount.saturating_sub(1);
57 self.refcount == 0
58 }
59
60 pub fn increment_ref(&mut self) {
62 self.refcount = self.refcount.saturating_add(1);
63 }
64}
65
66#[derive(Debug, Clone, PartialEq, Eq)]
68pub enum SweepVerdict {
69 Keep,
71 Purge,
73 Inconclusive,
75}
76
77#[derive(Debug, Clone)]
82pub struct GhostTable {
83 stubs: HashMap<String, GhostStub>,
85 purge_count: u64,
87 last_sweep_ms: u64,
89}
90
91impl GhostTable {
92 pub fn new() -> Self {
93 Self {
94 stubs: HashMap::new(),
95 purge_count: 0,
96 last_sweep_ms: 0,
97 }
98 }
99
100 pub fn insert(&mut self, stub: GhostStub) {
102 debug!(
103 node = %stub.node_id,
104 target_shard = stub.target_shard,
105 refcount = stub.refcount,
106 "inserted ghost stub"
107 );
108 self.stubs.insert(stub.node_id.clone(), stub);
109 }
110
111 pub fn get(&self, node_id: &str) -> Option<&GhostStub> {
113 self.stubs.get(node_id)
114 }
115
116 pub fn decrement_ref(&mut self, node_id: &str) -> bool {
119 if let Some(stub) = self.stubs.get_mut(node_id)
120 && stub.decrement_ref()
121 {
122 self.stubs.remove(node_id);
123 self.purge_count += 1;
124 return true;
125 }
126 false
127 }
128
129 pub fn increment_ref(&mut self, node_id: &str) {
131 if let Some(stub) = self.stubs.get_mut(node_id) {
132 stub.increment_ref();
133 }
134 }
135
136 pub fn sweep<F>(&mut self, verify_fn: F) -> SweepReport
145 where
146 F: Fn(&str, u32) -> SweepVerdict,
147 {
148 let now_ms = std::time::SystemTime::now()
149 .duration_since(std::time::UNIX_EPOCH)
150 .unwrap_or_default()
151 .as_millis() as u64;
152 self.last_sweep_ms = now_ms;
153
154 let mut report = SweepReport::default();
155 let mut to_purge = Vec::new();
156
157 for (node_id, stub) in &self.stubs {
158 report.checked += 1;
159
160 if stub.refcount == 0 {
162 to_purge.push(node_id.clone());
163 report.purged += 1;
164 continue;
165 }
166
167 match verify_fn(node_id, stub.target_shard) {
168 SweepVerdict::Purge => {
169 to_purge.push(node_id.clone());
170 report.purged += 1;
171 }
172 SweepVerdict::Keep => {
173 report.kept += 1;
174 }
175 SweepVerdict::Inconclusive => {
176 report.inconclusive += 1;
177 }
178 }
179 }
180
181 for node_id in to_purge {
182 self.stubs.remove(&node_id);
183 self.purge_count += 1;
184 }
185
186 if report.purged > 0 {
187 info!(
188 purged = report.purged,
189 kept = report.kept,
190 inconclusive = report.inconclusive,
191 total_ghosts = self.stubs.len(),
192 "anti-entropy sweep complete"
193 );
194 }
195
196 report
197 }
198
199 pub fn len(&self) -> usize {
200 self.stubs.len()
201 }
202
203 pub fn is_empty(&self) -> bool {
204 self.stubs.is_empty()
205 }
206
207 pub fn total_purged(&self) -> u64 {
208 self.purge_count
209 }
210
211 pub fn last_sweep_ms(&self) -> u64 {
212 self.last_sweep_ms
213 }
214
215 pub fn stubs(&self) -> impl Iterator<Item = &GhostStub> {
217 self.stubs.values()
218 }
219
220 pub fn resolve(&self, node_id: &str) -> Option<u32> {
223 self.stubs.get(node_id).map(|s| s.target_shard)
224 }
225}
226
227impl GhostTable {
228 pub fn to_bytes(&self) -> Vec<u8> {
232 let persisted: HashMap<String, PersistedGhostStub> = self
233 .stubs
234 .iter()
235 .map(|(k, v)| {
236 (
237 k.clone(),
238 PersistedGhostStub {
239 target_shard: v.target_shard,
240 refcount: v.refcount,
241 created_at_ms: v.created_at_ms,
242 },
243 )
244 })
245 .collect();
246 match zerompk::to_msgpack_vec(&persisted) {
247 Ok(bytes) => bytes,
248 Err(e) => {
249 tracing::error!(error = %e, "ghost table serialization failed — state will not persist");
250 Vec::new()
251 }
252 }
253 }
254
255 pub fn from_bytes(data: &[u8]) -> Option<Self> {
259 let persisted: HashMap<String, PersistedGhostStub> = zerompk::from_msgpack(data).ok()?;
260 let stubs: HashMap<String, GhostStub> = persisted
261 .into_iter()
262 .map(|(k, v)| {
263 (
264 k.clone(),
265 GhostStub {
266 node_id: k,
267 target_shard: v.target_shard,
268 refcount: v.refcount,
269 created_at_ms: v.created_at_ms,
270 },
271 )
272 })
273 .collect();
274 Some(Self {
275 stubs,
276 purge_count: 0,
277 last_sweep_ms: 0,
278 })
279 }
280}
281
282impl Default for GhostTable {
283 fn default() -> Self {
284 Self::new()
285 }
286}
287
288#[derive(Debug, Default, Clone)]
290pub struct SweepReport {
291 pub checked: usize,
292 pub purged: usize,
293 pub kept: usize,
294 pub inconclusive: usize,
295}
296
297#[cfg(test)]
298mod tests {
299 use super::*;
300
301 #[test]
302 fn ghost_lifecycle() {
303 let mut table = GhostTable::new();
304 let stub = GhostStub::new("node-42".into(), 10, 3);
305 table.insert(stub);
306 assert_eq!(table.len(), 1);
307
308 assert_eq!(table.resolve("node-42"), Some(10));
310 assert_eq!(table.resolve("nonexistent"), None);
311
312 assert!(!table.decrement_ref("node-42"));
314 assert!(!table.decrement_ref("node-42"));
315 assert!(table.decrement_ref("node-42")); assert!(table.is_empty());
317 assert_eq!(table.total_purged(), 1);
318 }
319
320 #[test]
321 fn ghost_increment_ref() {
322 let mut table = GhostTable::new();
323 table.insert(GhostStub::new("n1".into(), 5, 1));
324 table.increment_ref("n1");
325 assert!(!table.decrement_ref("n1"));
327 assert!(table.decrement_ref("n1"));
328 }
329
330 #[test]
331 fn sweep_purges_stale_ghosts() {
332 let mut table = GhostTable::new();
333 table.insert(GhostStub::new("stale".into(), 5, 1));
334 table.insert(GhostStub::new("alive".into(), 6, 2));
335 table.insert(GhostStub::new("unreachable".into(), 7, 1));
336
337 let report = table.sweep(|node_id, _target| match node_id {
338 "stale" => SweepVerdict::Purge,
339 "alive" => SweepVerdict::Keep,
340 "unreachable" => SweepVerdict::Inconclusive,
341 _ => SweepVerdict::Keep,
342 });
343
344 assert_eq!(report.checked, 3);
345 assert_eq!(report.purged, 1);
346 assert_eq!(report.kept, 1);
347 assert_eq!(report.inconclusive, 1);
348 assert_eq!(table.len(), 2); }
350
351 #[test]
352 fn sweep_purges_zero_refcount_without_remote() {
353 let mut table = GhostTable::new();
354 let mut stub = GhostStub::new("zero-ref".into(), 5, 1);
355 stub.refcount = 0; table.insert(stub);
357
358 let report = table.sweep(|_, _| SweepVerdict::Keep);
361 assert_eq!(report.purged, 1);
362 assert!(table.is_empty());
363 }
364
365 #[test]
366 fn resolve_for_scatter_gather() {
367 let mut table = GhostTable::new();
368 table.insert(GhostStub::new("migrated-node".into(), 42, 5));
369
370 let target = table.resolve("migrated-node");
372 assert_eq!(target, Some(42));
373 }
374}