1use std::collections::{HashMap, HashSet};
8use std::time::{Duration, Instant};
9
10pub type NodeId = u64;
12
13#[derive(Debug, Clone)]
15pub struct NodeBlobReport {
16 pub node_id: NodeId,
17 pub active_blobs: HashSet<[u8; 32]>,
18 pub pinned_blobs: HashSet<[u8; 32]>,
19 pub timestamp: Instant,
20}
21
22#[derive(Debug, Clone, PartialEq, Eq)]
24pub enum BlobStatus {
25 Active,
26 Pinned,
27 Unreferenced { since: Instant },
28 MarkedForCollection,
29}
30
31#[derive(Debug, Clone)]
33pub struct GcConfig {
34 pub grace_period: Duration,
35 pub stale_report_threshold: Duration,
36 pub min_nodes_reporting: usize,
37}
38
39impl Default for GcConfig {
40 fn default() -> Self {
41 Self {
42 grace_period: Duration::from_secs(5 * 60),
43 stale_report_threshold: Duration::from_secs(10 * 60),
44 min_nodes_reporting: 1,
45 }
46 }
47}
48
49pub struct GcCoordinator {
51 config: GcConfig,
52 node_reports: HashMap<NodeId, NodeBlobReport>,
53 known_blobs: HashMap<[u8; 32], BlobStatus>,
54 reference_counts: HashMap<[u8; 32], usize>,
55 collection_log: Vec<CollectionEvent>,
56}
57
58#[derive(Debug, Clone)]
60pub struct CollectionEvent {
61 pub timestamp: Instant,
62 pub blobs_collected: Vec<[u8; 32]>,
63 pub blobs_preserved: usize,
64 pub nodes_reporting: usize,
65}
66
67#[derive(Debug)]
69pub struct GcCycleResult {
70 pub eligible_for_collection: Vec<[u8; 32]>,
71 pub active_count: usize,
72 pub pinned_count: usize,
73 pub stale_nodes: Vec<NodeId>,
74}
75
76impl GcCoordinator {
77 pub fn new(config: GcConfig) -> Self {
79 Self {
80 config,
81 node_reports: HashMap::new(),
82 known_blobs: HashMap::new(),
83 reference_counts: HashMap::new(),
84 collection_log: Vec::new(),
85 }
86 }
87
88 pub fn report_active_blobs(&mut self, report: NodeBlobReport) {
90 self.node_reports.insert(report.node_id, report);
91 }
92
93 pub fn run_gc_cycle(&mut self) -> GcCycleResult {
96 let now = Instant::now();
97
98 let stale_nodes = self.compute_stale_nodes(now);
100 for &node_id in &stale_nodes {
101 self.node_reports.remove(&node_id);
102 }
103
104 if self.node_reports.len() < self.config.min_nodes_reporting {
106 return GcCycleResult {
107 eligible_for_collection: Vec::new(),
108 active_count: 0,
109 pinned_count: 0,
110 stale_nodes,
111 };
112 }
113
114 let mut global_active: HashSet<[u8; 32]> = HashSet::new();
116 let mut global_pinned: HashSet<[u8; 32]> = HashSet::new();
117
118 for (hash, status) in &self.known_blobs {
120 if *status == BlobStatus::Pinned {
121 global_pinned.insert(*hash);
122 }
123 }
124
125 self.reference_counts.clear();
127
128 for report in self.node_reports.values() {
129 for hash in &report.active_blobs {
130 global_active.insert(*hash);
131 *self.reference_counts.entry(*hash).or_insert(0) += 1;
132 }
133 for hash in &report.pinned_blobs {
134 global_pinned.insert(*hash);
135 }
136 }
137
138 let all_known: Vec<[u8; 32]> = self.known_blobs.keys().copied().collect();
140
141 for hash in &all_known {
142 if global_pinned.contains(hash) {
143 self.known_blobs.insert(*hash, BlobStatus::Pinned);
144 } else if global_active.contains(hash) {
145 self.known_blobs.insert(*hash, BlobStatus::Active);
146 } else {
147 match self.known_blobs.get(hash) {
149 Some(BlobStatus::Unreferenced { .. })
150 | Some(BlobStatus::MarkedForCollection) => {
151 }
153 _ => {
154 self.known_blobs
155 .insert(*hash, BlobStatus::Unreferenced { since: now });
156 }
157 }
158 }
159 }
160
161 let mut eligible: Vec<[u8; 32]> = Vec::new();
163 let mut active_count: usize = 0;
164 let mut pinned_count: usize = 0;
165
166 for (hash, status) in &self.known_blobs {
167 match status {
168 BlobStatus::Active => active_count += 1,
169 BlobStatus::Pinned => pinned_count += 1,
170 BlobStatus::Unreferenced { since } => {
171 if now.duration_since(*since) >= self.config.grace_period {
172 eligible.push(*hash);
173 }
174 }
175 BlobStatus::MarkedForCollection => {
176 eligible.push(*hash);
177 }
178 }
179 }
180
181 for hash in &eligible {
183 self.known_blobs
184 .insert(*hash, BlobStatus::MarkedForCollection);
185 }
186
187 if !eligible.is_empty() {
189 self.collection_log.push(CollectionEvent {
190 timestamp: now,
191 blobs_collected: eligible.clone(),
192 blobs_preserved: active_count + pinned_count,
193 nodes_reporting: self.node_reports.len(),
194 });
195 }
196
197 GcCycleResult {
198 eligible_for_collection: eligible,
199 active_count,
200 pinned_count,
201 stale_nodes,
202 }
203 }
204
205 pub fn pin_blob(&mut self, hash: [u8; 32]) {
207 self.known_blobs.insert(hash, BlobStatus::Pinned);
208 }
209
210 pub fn unpin_blob(&mut self, hash: [u8; 32]) {
212 if let Some(status) = self.known_blobs.get(&hash) {
213 if *status == BlobStatus::Pinned {
214 self.known_blobs.insert(hash, BlobStatus::Active);
215 }
216 }
217 }
218
219 pub fn register_blob(&mut self, hash: [u8; 32]) {
221 self.known_blobs.entry(hash).or_insert(BlobStatus::Active);
222 }
223
224 pub fn get_status(&self, hash: &[u8; 32]) -> Option<&BlobStatus> {
226 self.known_blobs.get(hash)
227 }
228
229 pub fn prune_stale_nodes(&mut self) -> Vec<NodeId> {
231 let now = Instant::now();
232 let stale = self.compute_stale_nodes(now);
233 for &node_id in &stale {
234 self.node_reports.remove(&node_id);
235 }
236 stale
237 }
238
239 pub fn collection_history(&self) -> &[CollectionEvent] {
241 &self.collection_log
242 }
243
244 fn compute_stale_nodes(&self, now: Instant) -> Vec<NodeId> {
246 self.node_reports
247 .iter()
248 .filter(|(_, report)| {
249 now.duration_since(report.timestamp) >= self.config.stale_report_threshold
250 })
251 .map(|(&node_id, _)| node_id)
252 .collect()
253 }
254}
255
256#[cfg(test)]
257mod tests {
258 use super::*;
259
260 fn make_hash(byte: u8) -> [u8; 32] {
261 [byte; 32]
262 }
263
264 #[test]
265 fn test_default_config() {
266 let config = GcConfig::default();
267 assert_eq!(config.grace_period, Duration::from_secs(300));
268 assert_eq!(config.stale_report_threshold, Duration::from_secs(600));
269 assert_eq!(config.min_nodes_reporting, 1);
270 }
271
272 #[test]
273 fn test_register_and_query() {
274 let mut gc = GcCoordinator::new(GcConfig::default());
275 let h = make_hash(0xAA);
276 assert!(gc.get_status(&h).is_none());
277
278 gc.register_blob(h);
279 assert_eq!(gc.get_status(&h), Some(&BlobStatus::Active));
280 }
281
282 #[test]
283 fn test_pin_unpin() {
284 let mut gc = GcCoordinator::new(GcConfig::default());
285 let h = make_hash(0xBB);
286
287 gc.register_blob(h);
288 gc.pin_blob(h);
289 assert_eq!(gc.get_status(&h), Some(&BlobStatus::Pinned));
290
291 gc.unpin_blob(h);
292 assert_eq!(gc.get_status(&h), Some(&BlobStatus::Active));
293 }
294
295 #[test]
296 fn test_gc_cycle_no_reports_below_min() {
297 let config = GcConfig {
298 min_nodes_reporting: 2,
299 ..GcConfig::default()
300 };
301 let mut gc = GcCoordinator::new(config);
302 gc.register_blob(make_hash(1));
303
304 gc.report_active_blobs(NodeBlobReport {
306 node_id: 1,
307 active_blobs: HashSet::new(),
308 pinned_blobs: HashSet::new(),
309 timestamp: Instant::now(),
310 });
311
312 let result = gc.run_gc_cycle();
313 assert!(result.eligible_for_collection.is_empty());
314 }
315
316 #[test]
317 fn test_gc_cycle_active_blob_not_collected() {
318 let config = GcConfig {
319 grace_period: Duration::from_millis(0),
320 min_nodes_reporting: 1,
321 ..GcConfig::default()
322 };
323 let mut gc = GcCoordinator::new(config);
324 let h = make_hash(0xCC);
325 gc.register_blob(h);
326
327 let mut active = HashSet::new();
328 active.insert(h);
329 gc.report_active_blobs(NodeBlobReport {
330 node_id: 1,
331 active_blobs: active,
332 pinned_blobs: HashSet::new(),
333 timestamp: Instant::now(),
334 });
335
336 let result = gc.run_gc_cycle();
337 assert!(result.eligible_for_collection.is_empty());
338 assert_eq!(result.active_count, 1);
339 }
340
341 #[test]
342 fn test_gc_cycle_unreferenced_blob_collected_after_grace() {
343 let config = GcConfig {
344 grace_period: Duration::from_millis(0),
345 min_nodes_reporting: 1,
346 ..GcConfig::default()
347 };
348 let mut gc = GcCoordinator::new(config);
349 let h = make_hash(0xDD);
350 gc.register_blob(h);
351
352 gc.report_active_blobs(NodeBlobReport {
354 node_id: 1,
355 active_blobs: HashSet::new(),
356 pinned_blobs: HashSet::new(),
357 timestamp: Instant::now(),
358 });
359
360 let result = gc.run_gc_cycle();
362 assert_eq!(result.eligible_for_collection.len(), 1);
366 assert_eq!(result.eligible_for_collection[0], h);
367 }
368
369 #[test]
370 fn test_pinned_blob_not_collected() {
371 let config = GcConfig {
372 grace_period: Duration::from_millis(0),
373 min_nodes_reporting: 1,
374 ..GcConfig::default()
375 };
376 let mut gc = GcCoordinator::new(config);
377 let h = make_hash(0xEE);
378 gc.register_blob(h);
379 gc.pin_blob(h);
380
381 gc.report_active_blobs(NodeBlobReport {
382 node_id: 1,
383 active_blobs: HashSet::new(),
384 pinned_blobs: HashSet::new(),
385 timestamp: Instant::now(),
386 });
387
388 let result = gc.run_gc_cycle();
389 assert!(result.eligible_for_collection.is_empty());
390 assert_eq!(result.pinned_count, 1);
391 }
392
393 #[test]
394 fn test_collection_history() {
395 let config = GcConfig {
396 grace_period: Duration::from_millis(0),
397 min_nodes_reporting: 1,
398 ..GcConfig::default()
399 };
400 let mut gc = GcCoordinator::new(config);
401 gc.register_blob(make_hash(1));
402
403 gc.report_active_blobs(NodeBlobReport {
404 node_id: 1,
405 active_blobs: HashSet::new(),
406 pinned_blobs: HashSet::new(),
407 timestamp: Instant::now(),
408 });
409
410 assert!(gc.collection_history().is_empty());
411 gc.run_gc_cycle();
412 assert_eq!(gc.collection_history().len(), 1);
413 assert_eq!(gc.collection_history()[0].blobs_collected.len(), 1);
414 }
415}