1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
//! Implements the garbage collection (vacuum) process for FluxMap.
//!
//! The vacuum process is responsible for reclaiming memory from "dead" data versions
//! that are no longer visible to any active or future transaction. This prevents
//! unbounded memory growth in workloads with frequent updates or deletes.
use crate::transaction::TransactionStatus;
use crate::{SkipList, mem::MemSize};
use crossbeam_epoch::Atomic;
use serde::{Serialize, de::DeserializeOwned};
use std::sync::Arc;
use std::sync::atomic::Ordering;
impl<K, V> SkipList<K, V>
where
K: Ord
+ Clone
+ Send
+ Sync
+ 'static
+ std::hash::Hash
+ Eq
+ Serialize
+ DeserializeOwned
+ MemSize,
V: Clone + Send + Sync + 'static + Serialize + DeserializeOwned + MemSize,
{
/// Scans the database and removes dead data versions to reclaim space.
///
/// This function performs two main tasks:
/// 1. **Version Pruning:** It iterates through every key's version chain and removes
/// any version that is considered "dead". A version is dead if it was expired
/// by a transaction that has since committed, and that transaction is older
/// than any currently active transaction (i.e., it's before the "vacuum horizon").
///
/// 2. **Node Pruning:** If, after pruning versions, a key's version chain becomes
/// empty, the node itself is logically marked as deleted. A subsequent traversal
/// of the skiplist will then physically unlink and reclaim the memory for this node.
///
/// Finally, it updates the `TransactionManager` to allow it to prune its own
/// historical transaction status records, further saving memory.
///
/// # Returns
///
/// A `Result` containing a tuple of `(versions_removed, keys_removed)`.
pub async fn vacuum(&self) -> Result<(usize, usize), ()> {
let tx_manager = self.transaction_manager();
// 1. Find the vacuum horizon. This is the oldest active transaction ID.
// Any version expired by a transaction that committed *before* this horizon is safe to remove.
let vacuum_horizon = tx_manager
.get_active_txids()
.iter()
.min()
.copied()
.unwrap_or_else(|| tx_manager.get_current_txid());
let mut versions_removed = 0;
let mut keys_removed_count = 0;
let guard = &crossbeam_epoch::pin();
let mut current_node_ptr = self.head.load(Ordering::Relaxed, guard);
// Iterate through all nodes in the base level of the skip list.
while let Some(node) = unsafe {
// SAFETY: `current_node_ptr` is a `Shared` pointer. `as_ref()` is safe as `current_node_ptr` is checked for null.
// The `guard` ensures the memory is protected.
current_node_ptr.as_ref()
} {
if node.key.is_some() {
let mut prev_version_ptr = &node.value;
let mut has_live_versions = false; // True if any version in this chain is not dead
loop {
let version_head = prev_version_ptr.load(Ordering::Acquire, guard);
let version_node = match unsafe {
// SAFETY: `version_head` is a `Shared` pointer. `as_ref()` is safe as `version_head` is checked for null.
// The `guard` ensures the memory is protected.
version_head.as_ref()
} {
Some(v) => v,
None => break, // End of the version chain
};
let expirer_id = version_node.version.expirer_txid.load(Ordering::Relaxed);
// A version is dead if it was expired by a committed transaction
// that is older than the vacuum horizon.
let is_dead = if expirer_id != 0 {
if let Some(status) = tx_manager.get_status(expirer_id) {
status == TransactionStatus::Committed && expirer_id < vacuum_horizon
} else {
// If status is not found, it must be a very old transaction.
// We can consider it committed if it's older than the horizon.
expirer_id < vacuum_horizon
}
} else {
// If expirer_id is 0, it's not expired, thus not dead.
false
};
if is_dead {
// Atomically unlink the dead version from the chain.
let next_version_ptr = version_node.next.load(Ordering::Relaxed, guard);
if prev_version_ptr
.compare_exchange(
version_head,
next_version_ptr,
Ordering::AcqRel,
Ordering::Acquire,
guard,
)
.is_ok()
{
let version_size = std::mem::size_of::<crate::VersionNode<Arc<V>>>()
+ version_node.version.value.mem_size();
self.current_memory_bytes
.fetch_sub(version_size as u64, Ordering::Relaxed);
let raw_ptr = version_head.as_raw() as *mut crate::VersionNode<Arc<V>>;
let raw_ptr_addr = raw_ptr as usize;
let allocator = self.version_node_allocator.clone();
unsafe {
// SAFETY: `version_head` points to the unlinked node. We can now
// schedule its destruction and memory reclamation.
guard.defer(move || {
let raw_ptr = raw_ptr_addr as *mut crate::VersionNode<Arc<V>>;
std::ptr::drop_in_place(raw_ptr);
allocator.free(std::ptr::NonNull::new_unchecked(raw_ptr));
});
}
versions_removed += 1;
// Pointer has been swung, so we loop again on the same prev_version_ptr
continue;
}
// If CAS failed, this version is still "live" for now (could be a race),
// so we treat it as live and move to the next.
has_live_versions = true;
} else {
// This version is not dead, so it's a live version.
has_live_versions = true;
}
// Move to the next version.
prev_version_ptr = &version_node.next;
}
// If after checking all versions, no live versions remain in the chain,
// and the node itself is not already marked for deletion,
// then the node can be logically marked for removal.
if !has_live_versions && !node.deleted.load(Ordering::Acquire) {
if node
.deleted
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
.is_ok()
{
keys_removed_count += 1;
// The node is now logically deleted. We are responsible for decrementing
// the memory counter for it and its entire version chain.
let mut total_removed_size = std::mem::size_of::<crate::Node<K, V>>()
+ (node.next.len() * std::mem::size_of::<Atomic<crate::Node<K, V>>>());
if let Some(k) = &node.key {
total_removed_size += k.mem_size();
}
let mut version_ptr = node.value.load(Ordering::Acquire, guard);
while let Some(version_node) = unsafe { version_ptr.as_ref() } {
total_removed_size += std::mem::size_of::<crate::VersionNode<Arc<V>>>();
total_removed_size += version_node.version.value.mem_size();
version_ptr = version_node.next.load(Ordering::Acquire, guard);
}
self.current_memory_bytes
.fetch_sub(total_removed_size as u64, Ordering::Relaxed);
}
}
}
current_node_ptr = node.next[0].load(Ordering::Relaxed, guard);
}
// After vacuuming the skip list, update the minimum retainable TXID
// and prune old transaction statuses from the manager.
tx_manager
.min_retainable_txid
.store(vacuum_horizon, Ordering::Release);
tx_manager.prune_statuses();
Ok((versions_removed, keys_removed_count))
}
}