Skip to main content

reddb_server/storage/btree/
gc.rs

1//! Garbage Collection for MVCC Versions
2//!
3//! Cleans up old versions that are no longer visible to any transaction.
4
5use super::node::{Node, NodeId};
6use super::tree::BPlusTree;
7use super::version::{current_timestamp, Timestamp};
8use std::fmt::Debug;
9use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
10use std::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
11use std::time::{Duration, Instant};
12
13fn gc_read<'a, T>(lock: &'a RwLock<T>) -> RwLockReadGuard<'a, T> {
14    lock.read().unwrap_or_else(|poisoned| poisoned.into_inner())
15}
16
17fn gc_write<'a, T>(lock: &'a RwLock<T>) -> RwLockWriteGuard<'a, T> {
18    lock.write()
19        .unwrap_or_else(|poisoned| poisoned.into_inner())
20}
21
22/// GC Configuration
23#[derive(Debug, Clone)]
24pub struct GcConfig {
25    /// Minimum age of versions to collect (in timestamps)
26    pub min_age: Timestamp,
27    /// Maximum versions to process per batch
28    pub batch_size: usize,
29    /// Interval between GC runs
30    pub interval: Duration,
31    /// Enable background GC
32    pub background_gc: bool,
33}
34
35impl GcConfig {
36    /// Create default config
37    pub fn new() -> Self {
38        Self {
39            min_age: Timestamp(1000),
40            batch_size: 1000,
41            interval: Duration::from_secs(60),
42            background_gc: true,
43        }
44    }
45
46    /// Set minimum age
47    pub fn with_min_age(mut self, age: Timestamp) -> Self {
48        self.min_age = age;
49        self
50    }
51
52    /// Set batch size
53    pub fn with_batch_size(mut self, size: usize) -> Self {
54        self.batch_size = size;
55        self
56    }
57
58    /// Set interval
59    pub fn with_interval(mut self, interval: Duration) -> Self {
60        self.interval = interval;
61        self
62    }
63}
64
65impl Default for GcConfig {
66    fn default() -> Self {
67        Self::new()
68    }
69}
70
71/// GC Statistics
72#[derive(Debug, Clone, Default)]
73pub struct GcStats {
74    /// Total GC runs
75    pub runs: u64,
76    /// Total versions collected
77    pub versions_collected: u64,
78    /// Total nodes visited
79    pub nodes_visited: u64,
80    /// Total time spent in GC (microseconds)
81    pub time_spent_us: u64,
82    /// Last run timestamp
83    pub last_run: Timestamp,
84    /// Last run duration (microseconds)
85    pub last_run_duration_us: u64,
86}
87
88/// Garbage Collector for B+ Tree
89pub struct GarbageCollector {
90    /// Configuration
91    config: GcConfig,
92    /// Statistics
93    stats: AtomicGcStats,
94    /// Running flag
95    running: AtomicBool,
96    /// Oldest active transaction timestamp
97    oldest_active_ts: AtomicU64,
98}
99
100/// Atomic wrapper for GC stats
101struct AtomicGcStats {
102    runs: AtomicU64,
103    versions_collected: AtomicU64,
104    nodes_visited: AtomicU64,
105    time_spent_us: AtomicU64,
106    last_run: AtomicU64,
107    last_run_duration_us: AtomicU64,
108}
109
110impl AtomicGcStats {
111    fn new() -> Self {
112        Self {
113            runs: AtomicU64::new(0),
114            versions_collected: AtomicU64::new(0),
115            nodes_visited: AtomicU64::new(0),
116            time_spent_us: AtomicU64::new(0),
117            last_run: AtomicU64::new(0),
118            last_run_duration_us: AtomicU64::new(0),
119        }
120    }
121
122    fn to_stats(&self) -> GcStats {
123        GcStats {
124            runs: self.runs.load(Ordering::Relaxed),
125            versions_collected: self.versions_collected.load(Ordering::Relaxed),
126            nodes_visited: self.nodes_visited.load(Ordering::Relaxed),
127            time_spent_us: self.time_spent_us.load(Ordering::Relaxed),
128            last_run: Timestamp(self.last_run.load(Ordering::Relaxed)),
129            last_run_duration_us: self.last_run_duration_us.load(Ordering::Relaxed),
130        }
131    }
132}
133
134impl GarbageCollector {
135    /// Create new GC
136    pub fn new(config: GcConfig) -> Self {
137        Self {
138            config,
139            stats: AtomicGcStats::new(),
140            running: AtomicBool::new(false),
141            oldest_active_ts: AtomicU64::new(0),
142        }
143    }
144
145    /// Get configuration
146    pub fn config(&self) -> &GcConfig {
147        &self.config
148    }
149
150    /// Get statistics
151    pub fn stats(&self) -> GcStats {
152        self.stats.to_stats()
153    }
154
155    /// Update oldest active transaction timestamp
156    pub fn set_oldest_active(&self, ts: Timestamp) {
157        self.oldest_active_ts.store(ts.get(), Ordering::SeqCst);
158    }
159
160    /// Calculate GC watermark
161    fn calculate_watermark(&self) -> Timestamp {
162        let current = current_timestamp();
163        let oldest_active = Timestamp(self.oldest_active_ts.load(Ordering::SeqCst));
164
165        // Watermark is the minimum of:
166        // - oldest active transaction timestamp
167        // - current - min_age
168        if !oldest_active.is_epoch() {
169            oldest_active.min(current.saturating_sub(self.config.min_age))
170        } else {
171            current.saturating_sub(self.config.min_age)
172        }
173    }
174
175    /// Run GC on a B+ tree
176    pub fn run<K, V>(&self, tree: &BPlusTree<K, V>) -> GcStats
177    where
178        K: Clone + Ord + Debug + Send + Sync,
179        V: Clone + Debug + Send + Sync,
180    {
181        // Check if already running
182        if self.running.swap(true, Ordering::SeqCst) {
183            return self.stats();
184        }
185
186        let start = Instant::now();
187        let watermark = self.calculate_watermark();
188
189        let mut versions_collected = 0u64;
190        let mut nodes_visited = 0u64;
191
192        // Collect from all leaf nodes
193        let first_leaf = *gc_read(&tree.first_leaf);
194        let mut current_leaf = first_leaf;
195
196        while let Some(leaf_id) = current_leaf {
197            nodes_visited += 1;
198
199            // Process leaf
200            if let Some(node) = tree.get_node(leaf_id) {
201                let mut node = gc_write(&node);
202                if let Node::Leaf(leaf) = &mut *node {
203                    versions_collected += leaf.gc(watermark) as u64;
204
205                    current_leaf = leaf.next;
206                } else {
207                    break;
208                }
209            } else {
210                break;
211            }
212
213            // Batch limit
214            if nodes_visited >= self.config.batch_size as u64 {
215                break;
216            }
217        }
218
219        let _ = tree.compact_deleted_entries(watermark);
220
221        let duration = start.elapsed();
222
223        // Update stats
224        self.stats.runs.fetch_add(1, Ordering::Relaxed);
225        self.stats
226            .versions_collected
227            .fetch_add(versions_collected, Ordering::Relaxed);
228        self.stats
229            .nodes_visited
230            .fetch_add(nodes_visited, Ordering::Relaxed);
231        self.stats
232            .time_spent_us
233            .fetch_add(duration.as_micros() as u64, Ordering::Relaxed);
234        self.stats
235            .last_run
236            .store(current_timestamp().get(), Ordering::Relaxed);
237        self.stats
238            .last_run_duration_us
239            .store(duration.as_micros() as u64, Ordering::Relaxed);
240
241        self.running.store(false, Ordering::SeqCst);
242
243        self.stats()
244    }
245
246    /// Check if GC is needed
247    pub fn needs_gc<K, V>(&self, tree: &BPlusTree<K, V>) -> bool
248    where
249        K: Clone + Ord + Debug + Send + Sync,
250        V: Clone + Debug + Send + Sync,
251    {
252        // Simple heuristic: GC if stats show high version count
253        let tree_stats = tree.stats();
254        tree_stats.versions > tree_stats.entries * 2
255    }
256
257    /// Run incremental GC (process one batch)
258    pub fn run_incremental<K, V>(
259        &self,
260        tree: &BPlusTree<K, V>,
261        start_leaf: Option<NodeId>,
262    ) -> Option<NodeId>
263    where
264        K: Clone + Ord + Debug + Send + Sync,
265        V: Clone + Debug + Send + Sync,
266    {
267        let watermark = self.calculate_watermark();
268        let mut nodes_visited = 0;
269        let mut versions_collected = 0u64;
270
271        let first = start_leaf.or_else(|| *gc_read(&tree.first_leaf));
272        let mut current_leaf = first;
273
274        while let Some(leaf_id) = current_leaf {
275            nodes_visited += 1;
276
277            if let Some(node) = tree.get_node(leaf_id) {
278                let mut node = gc_write(&node);
279                if let Node::Leaf(leaf) = &mut *node {
280                    versions_collected += leaf.gc(watermark) as u64;
281                    current_leaf = leaf.next;
282                } else {
283                    break;
284                }
285            } else {
286                break;
287            }
288
289            if nodes_visited >= self.config.batch_size {
290                // Return next leaf to continue from
291                return current_leaf;
292            }
293        }
294
295        // Update stats
296        self.stats
297            .versions_collected
298            .fetch_add(versions_collected, Ordering::Relaxed);
299        self.stats
300            .nodes_visited
301            .fetch_add(nodes_visited as u64, Ordering::Relaxed);
302
303        None // GC complete
304    }
305}
306
307impl Default for GarbageCollector {
308    fn default() -> Self {
309        Self::new(GcConfig::default())
310    }
311}
312
313/// GC handle for managing background GC
314pub struct GcHandle {
315    /// GC instance
316    gc: GarbageCollector,
317    /// Stop flag
318    stop: AtomicBool,
319}
320
321impl GcHandle {
322    /// Create new handle
323    pub fn new(config: GcConfig) -> Self {
324        Self {
325            gc: GarbageCollector::new(config),
326            stop: AtomicBool::new(false),
327        }
328    }
329
330    /// Get GC reference
331    pub fn gc(&self) -> &GarbageCollector {
332        &self.gc
333    }
334
335    /// Stop background GC
336    pub fn stop(&self) {
337        self.stop.store(true, Ordering::SeqCst);
338    }
339
340    /// Check if stopped
341    pub fn is_stopped(&self) -> bool {
342        self.stop.load(Ordering::SeqCst)
343    }
344}
345
346#[cfg(test)]
347mod tests {
348    use super::*;
349    use crate::storage::btree::{BPlusTree, BTreeConfig};
350    use crate::storage::primitives::ids::TxnId;
351
352    #[test]
353    fn test_gc_config() {
354        let config = GcConfig::new()
355            .with_min_age(Timestamp(500))
356            .with_batch_size(100);
357
358        assert_eq!(config.min_age, Timestamp(500));
359        assert_eq!(config.batch_size, 100);
360    }
361
362    #[test]
363    fn test_gc_run_empty_tree() {
364        let gc = GarbageCollector::new(GcConfig::new());
365        let tree: BPlusTree<i32, String> = BPlusTree::with_default_config();
366
367        let stats = gc.run(&tree);
368        assert_eq!(stats.runs, 1);
369        assert_eq!(stats.versions_collected, 0);
370    }
371
372    #[test]
373    fn test_gc_run_with_data() {
374        let gc = GarbageCollector::new(GcConfig::new().with_min_age(Timestamp(0)));
375        let tree: BPlusTree<i32, String> = BPlusTree::new(BTreeConfig::new().with_order(4));
376
377        // Insert and update to create versions
378        for i in 1..=10 {
379            tree.insert(i, format!("v1_{}", i), TxnId(1));
380        }
381
382        // Update to create more versions
383        for i in 1..=10 {
384            tree.insert(i, format!("v2_{}", i), TxnId(2));
385            tree.insert(i, format!("v3_{}", i), TxnId(3));
386        }
387
388        let stats = gc.run(&tree);
389        assert!(stats.nodes_visited > 0);
390    }
391
392    #[test]
393    fn test_gc_incremental() {
394        let gc = GarbageCollector::new(GcConfig::new().with_batch_size(2));
395        let tree: BPlusTree<i32, String> = BPlusTree::new(BTreeConfig::new().with_order(4));
396
397        for i in 1..=20 {
398            tree.insert(i, format!("v{}", i), TxnId(1));
399        }
400
401        // Run incremental - should return continuation point
402        let next = gc.run_incremental(&tree, None);
403        // May or may not be done depending on tree structure
404    }
405
406    #[test]
407    fn test_gc_watermark() {
408        // Use min_age = 0 so watermark equals current timestamp
409        let gc = GarbageCollector::new(GcConfig::new().with_min_age(Timestamp(0)));
410
411        // With no active transactions, watermark is current timestamp
412        let wm1 = gc.calculate_watermark();
413        // Can be 0 if no timestamps have been generated yet
414        assert!(wm1 >= Timestamp::EPOCH);
415
416        // With active transaction set, watermark should respect it
417        gc.set_oldest_active(Timestamp(50));
418        let wm2 = gc.calculate_watermark();
419        assert!(wm2 <= Timestamp(50));
420    }
421
422    #[test]
423    fn test_gc_handle() {
424        let handle = GcHandle::new(GcConfig::default());
425
426        assert!(!handle.is_stopped());
427
428        handle.stop();
429        assert!(handle.is_stopped());
430    }
431
432    #[test]
433    fn test_gc_run_recovers_after_first_leaf_lock_poisoning() {
434        let gc = GarbageCollector::new(GcConfig::new());
435        let tree: BPlusTree<i32, String> = BPlusTree::with_default_config();
436
437        let poison_target = &tree;
438        std::thread::scope(|scope| {
439            let handle = scope.spawn(|| {
440                let _guard = poison_target
441                    .first_leaf
442                    .write()
443                    .expect("first_leaf lock should be acquired");
444                panic!("poison first_leaf lock");
445            });
446            let _ = handle.join();
447        });
448
449        let stats = gc.run(&tree);
450        assert_eq!(stats.runs, 1);
451    }
452
453    #[test]
454    fn test_gc_run_recovers_after_leaf_node_lock_poisoning() {
455        let gc = GarbageCollector::new(GcConfig::new().with_min_age(Timestamp(0)));
456        let tree: BPlusTree<i32, String> = BPlusTree::new(BTreeConfig::new().with_order(4));
457
458        for i in 1..=4 {
459            tree.insert(i, format!("v{}", i), TxnId(1));
460        }
461
462        let first_leaf = (*gc_read(&tree.first_leaf)).expect("tree should have a first leaf");
463        let leaf = tree.get_node(first_leaf).expect("leaf node should exist");
464        let poison_target = &leaf;
465        std::thread::scope(|scope| {
466            let handle = scope.spawn(|| {
467                let _guard = poison_target
468                    .write()
469                    .expect("leaf node lock should be acquired");
470                panic!("poison leaf node lock");
471            });
472            let _ = handle.join();
473        });
474
475        let stats = gc.run(&tree);
476        assert!(stats.nodes_visited > 0);
477    }
478}