Skip to main content

shape_runtime/
distributed_gc.rs

1//! Distributed garbage collection for content-addressed function blobs.
2//!
3//! Each VM node reports its active blob set (hashes in call frames + function table).
4//! A coordinator computes the global union of active sets.
5//! Unreferenced blobs are eligible for collection.
6
7use std::collections::{HashMap, HashSet};
8use std::time::{Duration, Instant};
9
10/// Unique identifier for a VM node in the distributed system.
11pub type NodeId = u64;
12
13/// Report from a VM node about its active blob set.
14#[derive(Debug, Clone)]
15pub struct NodeBlobReport {
16    pub node_id: NodeId,
17    pub active_blobs: HashSet<[u8; 32]>,
18    pub pinned_blobs: HashSet<[u8; 32]>,
19    pub timestamp: Instant,
20}
21
22/// Status of a blob in the distributed system.
23#[derive(Debug, Clone, PartialEq, Eq)]
24pub enum BlobStatus {
25    Active,
26    Pinned,
27    Unreferenced { since: Instant },
28    MarkedForCollection,
29}
30
31/// Configuration for the GC coordinator.
32#[derive(Debug, Clone)]
33pub struct GcConfig {
34    pub grace_period: Duration,
35    pub stale_report_threshold: Duration,
36    pub min_nodes_reporting: usize,
37}
38
39impl Default for GcConfig {
40    fn default() -> Self {
41        Self {
42            grace_period: Duration::from_secs(5 * 60),
43            stale_report_threshold: Duration::from_secs(10 * 60),
44            min_nodes_reporting: 1,
45        }
46    }
47}
48
49/// Coordinator for distributed garbage collection.
50pub struct GcCoordinator {
51    config: GcConfig,
52    node_reports: HashMap<NodeId, NodeBlobReport>,
53    known_blobs: HashMap<[u8; 32], BlobStatus>,
54    reference_counts: HashMap<[u8; 32], usize>,
55    collection_log: Vec<CollectionEvent>,
56}
57
58/// Record of a GC event.
59#[derive(Debug, Clone)]
60pub struct CollectionEvent {
61    pub timestamp: Instant,
62    pub blobs_collected: Vec<[u8; 32]>,
63    pub blobs_preserved: usize,
64    pub nodes_reporting: usize,
65}
66
67/// Result of a GC cycle.
68#[derive(Debug)]
69pub struct GcCycleResult {
70    pub eligible_for_collection: Vec<[u8; 32]>,
71    pub active_count: usize,
72    pub pinned_count: usize,
73    pub stale_nodes: Vec<NodeId>,
74}
75
76impl GcCoordinator {
77    /// Create a new GC coordinator with the given configuration.
78    pub fn new(config: GcConfig) -> Self {
79        Self {
80            config,
81            node_reports: HashMap::new(),
82            known_blobs: HashMap::new(),
83            reference_counts: HashMap::new(),
84            collection_log: Vec::new(),
85        }
86    }
87
88    /// Receive a node's report of its active and pinned blob sets.
89    pub fn report_active_blobs(&mut self, report: NodeBlobReport) {
90        self.node_reports.insert(report.node_id, report);
91    }
92
93    /// Run a full GC cycle: compute the global active set, identify unreferenced
94    /// blobs, respect the grace period, and return the cycle result.
95    pub fn run_gc_cycle(&mut self) -> GcCycleResult {
96        let now = Instant::now();
97
98        // Prune stale node reports first.
99        let stale_nodes = self.compute_stale_nodes(now);
100        for &node_id in &stale_nodes {
101            self.node_reports.remove(&node_id);
102        }
103
104        // Check minimum reporting threshold.
105        if self.node_reports.len() < self.config.min_nodes_reporting {
106            return GcCycleResult {
107                eligible_for_collection: Vec::new(),
108                active_count: 0,
109                pinned_count: 0,
110                stale_nodes,
111            };
112        }
113
114        // Compute the global union of active and pinned blobs from all reports.
115        let mut global_active: HashSet<[u8; 32]> = HashSet::new();
116        let mut global_pinned: HashSet<[u8; 32]> = HashSet::new();
117
118        // Include coordinator-level pins (set via pin_blob()) in the global pinned set.
119        for (hash, status) in &self.known_blobs {
120            if *status == BlobStatus::Pinned {
121                global_pinned.insert(*hash);
122            }
123        }
124
125        // Recompute reference counts from scratch.
126        self.reference_counts.clear();
127
128        for report in self.node_reports.values() {
129            for hash in &report.active_blobs {
130                global_active.insert(*hash);
131                *self.reference_counts.entry(*hash).or_insert(0) += 1;
132            }
133            for hash in &report.pinned_blobs {
134                global_pinned.insert(*hash);
135            }
136        }
137
138        // Update blob statuses based on the global active/pinned sets.
139        let all_known: Vec<[u8; 32]> = self.known_blobs.keys().copied().collect();
140
141        for hash in &all_known {
142            if global_pinned.contains(hash) {
143                self.known_blobs.insert(*hash, BlobStatus::Pinned);
144            } else if global_active.contains(hash) {
145                self.known_blobs.insert(*hash, BlobStatus::Active);
146            } else {
147                // Not referenced by any node -- mark as unreferenced if not already.
148                match self.known_blobs.get(hash) {
149                    Some(BlobStatus::Unreferenced { .. })
150                    | Some(BlobStatus::MarkedForCollection) => {
151                        // Keep existing unreferenced timestamp or marked status.
152                    }
153                    _ => {
154                        self.known_blobs
155                            .insert(*hash, BlobStatus::Unreferenced { since: now });
156                    }
157                }
158            }
159        }
160
161        // Determine which blobs are eligible for collection (grace period expired).
162        let mut eligible: Vec<[u8; 32]> = Vec::new();
163        let mut active_count: usize = 0;
164        let mut pinned_count: usize = 0;
165
166        for (hash, status) in &self.known_blobs {
167            match status {
168                BlobStatus::Active => active_count += 1,
169                BlobStatus::Pinned => pinned_count += 1,
170                BlobStatus::Unreferenced { since } => {
171                    if now.duration_since(*since) >= self.config.grace_period {
172                        eligible.push(*hash);
173                    }
174                }
175                BlobStatus::MarkedForCollection => {
176                    eligible.push(*hash);
177                }
178            }
179        }
180
181        // Mark eligible blobs for collection.
182        for hash in &eligible {
183            self.known_blobs
184                .insert(*hash, BlobStatus::MarkedForCollection);
185        }
186
187        // Record the collection event.
188        if !eligible.is_empty() {
189            self.collection_log.push(CollectionEvent {
190                timestamp: now,
191                blobs_collected: eligible.clone(),
192                blobs_preserved: active_count + pinned_count,
193                nodes_reporting: self.node_reports.len(),
194            });
195        }
196
197        GcCycleResult {
198            eligible_for_collection: eligible,
199            active_count,
200            pinned_count,
201            stale_nodes,
202        }
203    }
204
205    /// Pin a blob to prevent it from being collected.
206    pub fn pin_blob(&mut self, hash: [u8; 32]) {
207        self.known_blobs.insert(hash, BlobStatus::Pinned);
208    }
209
210    /// Unpin a blob, allowing it to be collected if unreferenced.
211    pub fn unpin_blob(&mut self, hash: [u8; 32]) {
212        if let Some(status) = self.known_blobs.get(&hash) {
213            if *status == BlobStatus::Pinned {
214                self.known_blobs.insert(hash, BlobStatus::Active);
215            }
216        }
217    }
218
219    /// Register a new blob in the known set as active.
220    pub fn register_blob(&mut self, hash: [u8; 32]) {
221        self.known_blobs.entry(hash).or_insert(BlobStatus::Active);
222    }
223
224    /// Query the status of a blob.
225    pub fn get_status(&self, hash: &[u8; 32]) -> Option<&BlobStatus> {
226        self.known_blobs.get(hash)
227    }
228
229    /// Remove reports from nodes whose timestamps are older than the stale threshold.
230    pub fn prune_stale_nodes(&mut self) -> Vec<NodeId> {
231        let now = Instant::now();
232        let stale = self.compute_stale_nodes(now);
233        for &node_id in &stale {
234            self.node_reports.remove(&node_id);
235        }
236        stale
237    }
238
239    /// Return the collection history log.
240    pub fn collection_history(&self) -> &[CollectionEvent] {
241        &self.collection_log
242    }
243
244    /// Compute the list of stale node IDs without removing them.
245    fn compute_stale_nodes(&self, now: Instant) -> Vec<NodeId> {
246        self.node_reports
247            .iter()
248            .filter(|(_, report)| {
249                now.duration_since(report.timestamp) >= self.config.stale_report_threshold
250            })
251            .map(|(&node_id, _)| node_id)
252            .collect()
253    }
254}
255
256#[cfg(test)]
257mod tests {
258    use super::*;
259
260    fn make_hash(byte: u8) -> [u8; 32] {
261        [byte; 32]
262    }
263
264    #[test]
265    fn test_default_config() {
266        let config = GcConfig::default();
267        assert_eq!(config.grace_period, Duration::from_secs(300));
268        assert_eq!(config.stale_report_threshold, Duration::from_secs(600));
269        assert_eq!(config.min_nodes_reporting, 1);
270    }
271
272    #[test]
273    fn test_register_and_query() {
274        let mut gc = GcCoordinator::new(GcConfig::default());
275        let h = make_hash(0xAA);
276        assert!(gc.get_status(&h).is_none());
277
278        gc.register_blob(h);
279        assert_eq!(gc.get_status(&h), Some(&BlobStatus::Active));
280    }
281
282    #[test]
283    fn test_pin_unpin() {
284        let mut gc = GcCoordinator::new(GcConfig::default());
285        let h = make_hash(0xBB);
286
287        gc.register_blob(h);
288        gc.pin_blob(h);
289        assert_eq!(gc.get_status(&h), Some(&BlobStatus::Pinned));
290
291        gc.unpin_blob(h);
292        assert_eq!(gc.get_status(&h), Some(&BlobStatus::Active));
293    }
294
295    #[test]
296    fn test_gc_cycle_no_reports_below_min() {
297        let config = GcConfig {
298            min_nodes_reporting: 2,
299            ..GcConfig::default()
300        };
301        let mut gc = GcCoordinator::new(config);
302        gc.register_blob(make_hash(1));
303
304        // Only one node reporting -- below min_nodes_reporting of 2.
305        gc.report_active_blobs(NodeBlobReport {
306            node_id: 1,
307            active_blobs: HashSet::new(),
308            pinned_blobs: HashSet::new(),
309            timestamp: Instant::now(),
310        });
311
312        let result = gc.run_gc_cycle();
313        assert!(result.eligible_for_collection.is_empty());
314    }
315
316    #[test]
317    fn test_gc_cycle_active_blob_not_collected() {
318        let config = GcConfig {
319            grace_period: Duration::from_millis(0),
320            min_nodes_reporting: 1,
321            ..GcConfig::default()
322        };
323        let mut gc = GcCoordinator::new(config);
324        let h = make_hash(0xCC);
325        gc.register_blob(h);
326
327        let mut active = HashSet::new();
328        active.insert(h);
329        gc.report_active_blobs(NodeBlobReport {
330            node_id: 1,
331            active_blobs: active,
332            pinned_blobs: HashSet::new(),
333            timestamp: Instant::now(),
334        });
335
336        let result = gc.run_gc_cycle();
337        assert!(result.eligible_for_collection.is_empty());
338        assert_eq!(result.active_count, 1);
339    }
340
341    #[test]
342    fn test_gc_cycle_unreferenced_blob_collected_after_grace() {
343        let config = GcConfig {
344            grace_period: Duration::from_millis(0),
345            min_nodes_reporting: 1,
346            ..GcConfig::default()
347        };
348        let mut gc = GcCoordinator::new(config);
349        let h = make_hash(0xDD);
350        gc.register_blob(h);
351
352        // Node reports with no active blobs -- h is unreferenced.
353        gc.report_active_blobs(NodeBlobReport {
354            node_id: 1,
355            active_blobs: HashSet::new(),
356            pinned_blobs: HashSet::new(),
357            timestamp: Instant::now(),
358        });
359
360        // First cycle marks as unreferenced.
361        let result = gc.run_gc_cycle();
362        // With zero grace period, it should be eligible immediately on second cycle.
363        // First cycle transitions to Unreferenced, then checks grace -- with 0ms grace
364        // it's eligible right away.
365        assert_eq!(result.eligible_for_collection.len(), 1);
366        assert_eq!(result.eligible_for_collection[0], h);
367    }
368
369    #[test]
370    fn test_pinned_blob_not_collected() {
371        let config = GcConfig {
372            grace_period: Duration::from_millis(0),
373            min_nodes_reporting: 1,
374            ..GcConfig::default()
375        };
376        let mut gc = GcCoordinator::new(config);
377        let h = make_hash(0xEE);
378        gc.register_blob(h);
379        gc.pin_blob(h);
380
381        gc.report_active_blobs(NodeBlobReport {
382            node_id: 1,
383            active_blobs: HashSet::new(),
384            pinned_blobs: HashSet::new(),
385            timestamp: Instant::now(),
386        });
387
388        let result = gc.run_gc_cycle();
389        assert!(result.eligible_for_collection.is_empty());
390        assert_eq!(result.pinned_count, 1);
391    }
392
393    #[test]
394    fn test_collection_history() {
395        let config = GcConfig {
396            grace_period: Duration::from_millis(0),
397            min_nodes_reporting: 1,
398            ..GcConfig::default()
399        };
400        let mut gc = GcCoordinator::new(config);
401        gc.register_blob(make_hash(1));
402
403        gc.report_active_blobs(NodeBlobReport {
404            node_id: 1,
405            active_blobs: HashSet::new(),
406            pinned_blobs: HashSet::new(),
407            timestamp: Instant::now(),
408        });
409
410        assert!(gc.collection_history().is_empty());
411        gc.run_gc_cycle();
412        assert_eq!(gc.collection_history().len(), 1);
413        assert_eq!(gc.collection_history()[0].blobs_collected.len(), 1);
414    }
415}