1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
use crate::compaction::{CompactionFilter, FilterDecision};
use crate::sstable::{SSTable, SSTableError};
use crate::types::InternalKey;
use bytes::Bytes;
use std::sync::Arc;
/// Merged entries from all `SSTables`, sorted by key
pub struct MergeIterator {
entries: std::vec::IntoIter<(Bytes, Bytes)>,
}
impl MergeIterator {
/// Create a new merge iterator from multiple `SSTables`
///
/// Collects all entries, sorts by key, and applies MVCC garbage collection.
/// For L0 compaction, "newest" means from HIGHER `source_id` (later in vector).
/// L0 `SSTables` are ordered oldest→newest, so higher index = newer data.
///
/// # MVCC Garbage Collection
/// - `oldest_snapshot`: Sequence number of the oldest active snapshot.
/// Pass `u64::MAX` if no snapshots are active (GC everything possible).
/// - For each `user_key`, keeps:
/// - The newest version (always)
/// - Any version with seq >= `oldest_snapshot` (visible to active snapshots)
/// - Drops old versions with seq < `oldest_snapshot` when a newer version exists.
pub fn new(
sstables: Vec<SSTable>,
level: usize,
filter: Option<Arc<dyn CompactionFilter>>,
) -> Result<Self, SSTableError> {
// Default: no GC (keep all versions)
Self::with_gc(sstables, level, filter, u64::MAX)
}
/// Create a merge iterator with MVCC garbage collection
///
/// # Arguments
/// * `sstables` - `SSTables` to merge (older first for L0)
/// * `level` - Target compaction level
/// * `filter` - Optional compaction filter
/// * `oldest_snapshot` - Oldest active snapshot seq (`u64::MAX` = no snapshots)
pub fn with_gc(
mut sstables: Vec<SSTable>,
level: usize,
filter: Option<Arc<dyn CompactionFilter>>,
oldest_snapshot: u64,
) -> Result<Self, SSTableError> {
let mut all_entries = Vec::new();
// Collect all entries from all SSTables
for (source_id, sstable) in sstables.iter_mut().enumerate() {
let iter = sstable.iter()?;
for result in iter {
let (key, value) = result?;
all_entries.push((key, value, source_id));
}
}
// Sort by encoded key first, then by source_id DESCENDING (higher = newer)
// With MVCC encoding, this sorts by (user_key ASC, seq DESC, source_id DESC)
all_entries.sort_by(|a, b| {
match a.0.cmp(&b.0) {
std::cmp::Ordering::Equal => b.2.cmp(&a.2), // Higher source_id first (NEWEST)
other => other,
}
});
let mut finalized_entries = Vec::new();
if let Some(filter) = filter {
// Group by ENCODED key and apply filter logic
// (filter API expects encoded keys, not user keys)
let mut i = 0;
while i < all_entries.len() {
let key = &all_entries[i].0;
let mut j = i + 1;
// Find all versions of this ENCODED key
while j < all_entries.len() && all_entries[j].0 == *key {
j += 1;
}
// Slice of all versions for this key (already sorted by recency)
let versions = &all_entries[i..j];
// 1. Merge phase
let values: Vec<&[u8]> = versions.iter().map(|(_, v, _)| v.as_ref()).collect();
// Default: pick newest (first in slice)
let newest_value = &versions[0].1;
let mut merged_value_bytes = newest_value.clone();
// Try custom merge
if let Some(merged) = filter.merge(level, key, &values) {
// If merged, we treat it as a new INLINE value
// Prepend FLAG_INLINE (1)
let mut with_flag = Vec::with_capacity(1 + merged.len());
with_flag.push(crate::sstable::FLAG_INLINE);
with_flag.extend_from_slice(&merged);
merged_value_bytes = Bytes::from(with_flag);
}
// 2. Filter phase
match filter.filter(level, key, &merged_value_bytes) {
FilterDecision::Keep => {
finalized_entries.push((key.clone(), merged_value_bytes));
}
FilterDecision::Remove => {
// Skip
}
FilterDecision::ChangeValue(new_val) => {
// Treat as new INLINE value
let mut with_flag = Vec::with_capacity(1 + new_val.len());
with_flag.push(crate::sstable::FLAG_INLINE);
with_flag.extend_from_slice(&new_val);
finalized_entries.push((key.clone(), Bytes::from(with_flag)));
}
}
// Advance to next key
i = j;
}
} else {
// Fast path with MVCC GC: Group by user_key, apply GC rules
Self::apply_mvcc_gc(&all_entries, oldest_snapshot, &mut finalized_entries);
}
Ok(Self {
entries: finalized_entries.into_iter(),
})
}
/// Apply MVCC garbage collection to sorted entries
///
/// Groups entries by `user_key` and keeps:
/// - Newest version (always)
/// - Versions with seq >= `oldest_snapshot` (visible to snapshots)
fn apply_mvcc_gc(
all_entries: &[(Bytes, Bytes, usize)],
oldest_snapshot: u64,
finalized_entries: &mut Vec<(Bytes, Bytes)>,
) {
if all_entries.is_empty() {
return;
}
let mut i = 0;
while i < all_entries.len() {
let (encoded_key, value, _) = &all_entries[i];
// Try to decode as InternalKey
if let Some(ikey) = InternalKey::decode(encoded_key.clone()) {
let user_key = &ikey.user_key;
// Find all versions of this user_key
let mut j = i + 1;
while j < all_entries.len() {
if let Some(next_ikey) = InternalKey::decode(all_entries[j].0.clone()) {
if next_ikey.user_key != *user_key {
break;
}
} else {
// Not an InternalKey, different key
break;
}
j += 1;
}
// Process all versions of this user_key [i..j)
// First entry (i) is newest due to sort order (seq DESC)
let mut kept_newest = false;
for (enc_key, val, _) in &all_entries[i..j] {
if let Some(ver_ikey) = InternalKey::decode(enc_key.clone()) {
// Keep if:
// 1. This is the newest version (first one), OR
// 2. seq >= oldest_snapshot (visible to active snapshot)
if !kept_newest || ver_ikey.seq >= oldest_snapshot {
finalized_entries.push((enc_key.clone(), val.clone()));
kept_newest = true;
}
// else: GC this old version
}
}
i = j;
} else {
// Not an InternalKey (legacy format) - keep with simple dedup
finalized_entries.push((encoded_key.clone(), value.clone()));
// Skip duplicates of the same non-MVCC key
let mut j = i + 1;
while j < all_entries.len() && all_entries[j].0 == *encoded_key {
j += 1;
}
i = j;
}
}
}
}
impl Iterator for MergeIterator {
type Item = Result<(Bytes, Bytes), SSTableError>;
fn next(&mut self) -> Option<Self::Item> {
self.entries.next().map(Ok)
}
}