reddb_server/storage/btree/
gc.rs1use super::node::{Node, NodeId};
6use super::tree::BPlusTree;
7use super::version::{current_timestamp, Timestamp};
8use std::fmt::Debug;
9use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
10use std::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
11use std::time::{Duration, Instant};
12
13fn gc_read<'a, T>(lock: &'a RwLock<T>) -> RwLockReadGuard<'a, T> {
14 lock.read().unwrap_or_else(|poisoned| poisoned.into_inner())
15}
16
17fn gc_write<'a, T>(lock: &'a RwLock<T>) -> RwLockWriteGuard<'a, T> {
18 lock.write()
19 .unwrap_or_else(|poisoned| poisoned.into_inner())
20}
21
22#[derive(Debug, Clone)]
24pub struct GcConfig {
25 pub min_age: Timestamp,
27 pub batch_size: usize,
29 pub interval: Duration,
31 pub background_gc: bool,
33}
34
35impl GcConfig {
36 pub fn new() -> Self {
38 Self {
39 min_age: Timestamp(1000),
40 batch_size: 1000,
41 interval: Duration::from_secs(60),
42 background_gc: true,
43 }
44 }
45
46 pub fn with_min_age(mut self, age: Timestamp) -> Self {
48 self.min_age = age;
49 self
50 }
51
52 pub fn with_batch_size(mut self, size: usize) -> Self {
54 self.batch_size = size;
55 self
56 }
57
58 pub fn with_interval(mut self, interval: Duration) -> Self {
60 self.interval = interval;
61 self
62 }
63}
64
65impl Default for GcConfig {
66 fn default() -> Self {
67 Self::new()
68 }
69}
70
71#[derive(Debug, Clone, Default)]
73pub struct GcStats {
74 pub runs: u64,
76 pub versions_collected: u64,
78 pub nodes_visited: u64,
80 pub time_spent_us: u64,
82 pub last_run: Timestamp,
84 pub last_run_duration_us: u64,
86}
87
88pub struct GarbageCollector {
90 config: GcConfig,
92 stats: AtomicGcStats,
94 running: AtomicBool,
96 oldest_active_ts: AtomicU64,
98}
99
100struct AtomicGcStats {
102 runs: AtomicU64,
103 versions_collected: AtomicU64,
104 nodes_visited: AtomicU64,
105 time_spent_us: AtomicU64,
106 last_run: AtomicU64,
107 last_run_duration_us: AtomicU64,
108}
109
110impl AtomicGcStats {
111 fn new() -> Self {
112 Self {
113 runs: AtomicU64::new(0),
114 versions_collected: AtomicU64::new(0),
115 nodes_visited: AtomicU64::new(0),
116 time_spent_us: AtomicU64::new(0),
117 last_run: AtomicU64::new(0),
118 last_run_duration_us: AtomicU64::new(0),
119 }
120 }
121
122 fn to_stats(&self) -> GcStats {
123 GcStats {
124 runs: self.runs.load(Ordering::Relaxed),
125 versions_collected: self.versions_collected.load(Ordering::Relaxed),
126 nodes_visited: self.nodes_visited.load(Ordering::Relaxed),
127 time_spent_us: self.time_spent_us.load(Ordering::Relaxed),
128 last_run: Timestamp(self.last_run.load(Ordering::Relaxed)),
129 last_run_duration_us: self.last_run_duration_us.load(Ordering::Relaxed),
130 }
131 }
132}
133
134impl GarbageCollector {
135 pub fn new(config: GcConfig) -> Self {
137 Self {
138 config,
139 stats: AtomicGcStats::new(),
140 running: AtomicBool::new(false),
141 oldest_active_ts: AtomicU64::new(0),
142 }
143 }
144
145 pub fn config(&self) -> &GcConfig {
147 &self.config
148 }
149
150 pub fn stats(&self) -> GcStats {
152 self.stats.to_stats()
153 }
154
155 pub fn set_oldest_active(&self, ts: Timestamp) {
157 self.oldest_active_ts.store(ts.get(), Ordering::SeqCst);
158 }
159
160 fn calculate_watermark(&self) -> Timestamp {
162 let current = current_timestamp();
163 let oldest_active = Timestamp(self.oldest_active_ts.load(Ordering::SeqCst));
164
165 if !oldest_active.is_epoch() {
169 oldest_active.min(current.saturating_sub(self.config.min_age))
170 } else {
171 current.saturating_sub(self.config.min_age)
172 }
173 }
174
175 pub fn run<K, V>(&self, tree: &BPlusTree<K, V>) -> GcStats
177 where
178 K: Clone + Ord + Debug + Send + Sync,
179 V: Clone + Debug + Send + Sync,
180 {
181 if self.running.swap(true, Ordering::SeqCst) {
183 return self.stats();
184 }
185
186 let start = Instant::now();
187 let watermark = self.calculate_watermark();
188
189 let mut versions_collected = 0u64;
190 let mut nodes_visited = 0u64;
191
192 let first_leaf = *gc_read(&tree.first_leaf);
194 let mut current_leaf = first_leaf;
195
196 while let Some(leaf_id) = current_leaf {
197 nodes_visited += 1;
198
199 if let Some(node) = tree.get_node(leaf_id) {
201 let mut node = gc_write(&node);
202 if let Node::Leaf(leaf) = &mut *node {
203 versions_collected += leaf.gc(watermark) as u64;
204
205 current_leaf = leaf.next;
206 } else {
207 break;
208 }
209 } else {
210 break;
211 }
212
213 if nodes_visited >= self.config.batch_size as u64 {
215 break;
216 }
217 }
218
219 let _ = tree.compact_deleted_entries(watermark);
220
221 let duration = start.elapsed();
222
223 self.stats.runs.fetch_add(1, Ordering::Relaxed);
225 self.stats
226 .versions_collected
227 .fetch_add(versions_collected, Ordering::Relaxed);
228 self.stats
229 .nodes_visited
230 .fetch_add(nodes_visited, Ordering::Relaxed);
231 self.stats
232 .time_spent_us
233 .fetch_add(duration.as_micros() as u64, Ordering::Relaxed);
234 self.stats
235 .last_run
236 .store(current_timestamp().get(), Ordering::Relaxed);
237 self.stats
238 .last_run_duration_us
239 .store(duration.as_micros() as u64, Ordering::Relaxed);
240
241 self.running.store(false, Ordering::SeqCst);
242
243 self.stats()
244 }
245
246 pub fn needs_gc<K, V>(&self, tree: &BPlusTree<K, V>) -> bool
248 where
249 K: Clone + Ord + Debug + Send + Sync,
250 V: Clone + Debug + Send + Sync,
251 {
252 let tree_stats = tree.stats();
254 tree_stats.versions > tree_stats.entries * 2
255 }
256
257 pub fn run_incremental<K, V>(
259 &self,
260 tree: &BPlusTree<K, V>,
261 start_leaf: Option<NodeId>,
262 ) -> Option<NodeId>
263 where
264 K: Clone + Ord + Debug + Send + Sync,
265 V: Clone + Debug + Send + Sync,
266 {
267 let watermark = self.calculate_watermark();
268 let mut nodes_visited = 0;
269 let mut versions_collected = 0u64;
270
271 let first = start_leaf.or_else(|| *gc_read(&tree.first_leaf));
272 let mut current_leaf = first;
273
274 while let Some(leaf_id) = current_leaf {
275 nodes_visited += 1;
276
277 if let Some(node) = tree.get_node(leaf_id) {
278 let mut node = gc_write(&node);
279 if let Node::Leaf(leaf) = &mut *node {
280 versions_collected += leaf.gc(watermark) as u64;
281 current_leaf = leaf.next;
282 } else {
283 break;
284 }
285 } else {
286 break;
287 }
288
289 if nodes_visited >= self.config.batch_size {
290 return current_leaf;
292 }
293 }
294
295 self.stats
297 .versions_collected
298 .fetch_add(versions_collected, Ordering::Relaxed);
299 self.stats
300 .nodes_visited
301 .fetch_add(nodes_visited as u64, Ordering::Relaxed);
302
303 None }
305}
306
307impl Default for GarbageCollector {
308 fn default() -> Self {
309 Self::new(GcConfig::default())
310 }
311}
312
313pub struct GcHandle {
315 gc: GarbageCollector,
317 stop: AtomicBool,
319}
320
321impl GcHandle {
322 pub fn new(config: GcConfig) -> Self {
324 Self {
325 gc: GarbageCollector::new(config),
326 stop: AtomicBool::new(false),
327 }
328 }
329
330 pub fn gc(&self) -> &GarbageCollector {
332 &self.gc
333 }
334
335 pub fn stop(&self) {
337 self.stop.store(true, Ordering::SeqCst);
338 }
339
340 pub fn is_stopped(&self) -> bool {
342 self.stop.load(Ordering::SeqCst)
343 }
344}
345
346#[cfg(test)]
347mod tests {
348 use super::*;
349 use crate::storage::btree::{BPlusTree, BTreeConfig};
350 use crate::storage::primitives::ids::TxnId;
351
352 #[test]
353 fn test_gc_config() {
354 let config = GcConfig::new()
355 .with_min_age(Timestamp(500))
356 .with_batch_size(100);
357
358 assert_eq!(config.min_age, Timestamp(500));
359 assert_eq!(config.batch_size, 100);
360 }
361
362 #[test]
363 fn test_gc_run_empty_tree() {
364 let gc = GarbageCollector::new(GcConfig::new());
365 let tree: BPlusTree<i32, String> = BPlusTree::with_default_config();
366
367 let stats = gc.run(&tree);
368 assert_eq!(stats.runs, 1);
369 assert_eq!(stats.versions_collected, 0);
370 }
371
372 #[test]
373 fn test_gc_run_with_data() {
374 let gc = GarbageCollector::new(GcConfig::new().with_min_age(Timestamp(0)));
375 let tree: BPlusTree<i32, String> = BPlusTree::new(BTreeConfig::new().with_order(4));
376
377 for i in 1..=10 {
379 tree.insert(i, format!("v1_{}", i), TxnId(1));
380 }
381
382 for i in 1..=10 {
384 tree.insert(i, format!("v2_{}", i), TxnId(2));
385 tree.insert(i, format!("v3_{}", i), TxnId(3));
386 }
387
388 let stats = gc.run(&tree);
389 assert!(stats.nodes_visited > 0);
390 }
391
392 #[test]
393 fn test_gc_incremental() {
394 let gc = GarbageCollector::new(GcConfig::new().with_batch_size(2));
395 let tree: BPlusTree<i32, String> = BPlusTree::new(BTreeConfig::new().with_order(4));
396
397 for i in 1..=20 {
398 tree.insert(i, format!("v{}", i), TxnId(1));
399 }
400
401 let next = gc.run_incremental(&tree, None);
403 }
405
406 #[test]
407 fn test_gc_watermark() {
408 let gc = GarbageCollector::new(GcConfig::new().with_min_age(Timestamp(0)));
410
411 let wm1 = gc.calculate_watermark();
413 assert!(wm1 >= Timestamp::EPOCH);
415
416 gc.set_oldest_active(Timestamp(50));
418 let wm2 = gc.calculate_watermark();
419 assert!(wm2 <= Timestamp(50));
420 }
421
422 #[test]
423 fn test_gc_handle() {
424 let handle = GcHandle::new(GcConfig::default());
425
426 assert!(!handle.is_stopped());
427
428 handle.stop();
429 assert!(handle.is_stopped());
430 }
431
432 #[test]
433 fn test_gc_run_recovers_after_first_leaf_lock_poisoning() {
434 let gc = GarbageCollector::new(GcConfig::new());
435 let tree: BPlusTree<i32, String> = BPlusTree::with_default_config();
436
437 let poison_target = &tree;
438 std::thread::scope(|scope| {
439 let handle = scope.spawn(|| {
440 let _guard = poison_target
441 .first_leaf
442 .write()
443 .expect("first_leaf lock should be acquired");
444 panic!("poison first_leaf lock");
445 });
446 let _ = handle.join();
447 });
448
449 let stats = gc.run(&tree);
450 assert_eq!(stats.runs, 1);
451 }
452
453 #[test]
454 fn test_gc_run_recovers_after_leaf_node_lock_poisoning() {
455 let gc = GarbageCollector::new(GcConfig::new().with_min_age(Timestamp(0)));
456 let tree: BPlusTree<i32, String> = BPlusTree::new(BTreeConfig::new().with_order(4));
457
458 for i in 1..=4 {
459 tree.insert(i, format!("v{}", i), TxnId(1));
460 }
461
462 let first_leaf = (*gc_read(&tree.first_leaf)).expect("tree should have a first leaf");
463 let leaf = tree.get_node(first_leaf).expect("leaf node should exist");
464 let poison_target = &leaf;
465 std::thread::scope(|scope| {
466 let handle = scope.spawn(|| {
467 let _guard = poison_target
468 .write()
469 .expect("leaf node lock should be acquired");
470 panic!("poison leaf node lock");
471 });
472 let _ = handle.join();
473 });
474
475 let stats = gc.run(&tree);
476 assert!(stats.nodes_visited > 0);
477 }
478}