1use crate::{
6 key::InternalKey,
7 level_manifest::{level::Level, LevelManifest},
8 level_reader::LevelReader,
9 memtable::Memtable,
10 merge::{BoxedIterator, Merger},
11 multi_reader::MultiReader,
12 mvcc_stream::MvccStream,
13 segment::value_block::CachePolicy,
14 value::{SeqNo, UserKey},
15 InternalValue,
16};
17use guardian::ArcRwLockReadGuardian;
18use self_cell::self_cell;
19use std::{ops::Bound, sync::Arc};
20
21#[must_use]
22pub fn seqno_filter(item_seqno: SeqNo, seqno: SeqNo) -> bool {
23 item_seqno < seqno
24}
25
26#[must_use]
27#[allow(clippy::module_name_repetitions)]
28pub fn prefix_to_range(prefix: &[u8]) -> (Bound<UserKey>, Bound<UserKey>) {
29 use std::ops::Bound::{Excluded, Included, Unbounded};
30
31 if prefix.is_empty() {
32 return (Unbounded, Unbounded);
33 }
34
35 let mut end = prefix.to_vec();
36 let len = end.len();
37
38 for (idx, byte) in end.iter_mut().rev().enumerate() {
39 let idx = len - 1 - idx;
40
41 if *byte < 255 {
42 *byte += 1;
43 end.truncate(idx + 1);
44 return (Included(prefix.into()), Excluded(end.into()));
45 }
46 }
47
48 (Included(prefix.into()), Unbounded)
49}
50
51pub struct IterState {
55 pub(crate) active: Arc<Memtable>,
56 pub(crate) sealed: Vec<Arc<Memtable>>,
57 pub(crate) ephemeral: Option<Arc<Memtable>>,
58
59 #[allow(unused)]
64 pub(crate) levels: Vec<Arc<Level>>,
65}
66
67type BoxedMerge<'a> = Box<dyn DoubleEndedIterator<Item = crate::Result<InternalValue>> + 'a>;
68
69self_cell!(
70 pub struct TreeIter {
71 owner: IterState,
72
73 #[covariant]
74 dependent: BoxedMerge,
75 }
76);
77
78impl Iterator for TreeIter {
79 type Item = crate::Result<InternalValue>;
80
81 fn next(&mut self) -> Option<Self::Item> {
82 self.with_dependent_mut(|_, iter| iter.next())
83 }
84}
85
86impl DoubleEndedIterator for TreeIter {
87 fn next_back(&mut self) -> Option<Self::Item> {
88 self.with_dependent_mut(|_, iter| iter.next_back())
89 }
90}
91
92fn collect_disjoint_tree_with_range(
93 level_manifest: &LevelManifest,
94 bounds: &(Bound<UserKey>, Bound<UserKey>),
95) -> MultiReader<LevelReader> {
96 debug_assert!(level_manifest.is_disjoint());
97
98 let mut levels = level_manifest
99 .levels
100 .iter()
101 .filter(|x| !x.is_empty())
102 .cloned()
103 .collect::<Vec<_>>();
104
105 #[allow(clippy::expect_used)]
113 levels.sort_by(|a, b| {
114 a.segments
115 .first()
116 .expect("level should not be empty")
117 .metadata
118 .key_range
119 .min()
120 .cmp(
121 b.segments
122 .first()
123 .expect("level should not be empty")
124 .metadata
125 .key_range
126 .min(),
127 )
128 });
129
130 let readers = levels
131 .into_iter()
132 .filter_map(|lvl| LevelReader::new(lvl, bounds, CachePolicy::Write))
133 .collect();
134
135 MultiReader::new(readers)
136}
137
138impl TreeIter {
139 #[must_use]
140 #[allow(clippy::too_many_lines)]
141 pub fn create_range(
142 guard: IterState,
143 bounds: (Bound<UserKey>, Bound<UserKey>),
144 seqno: Option<SeqNo>,
145 level_manifest: ArcRwLockReadGuardian<LevelManifest>,
146 ) -> Self {
147 Self::new(guard, |lock| {
148 let lo = match &bounds.0 {
149 Bound::Included(key) => Bound::Included(InternalKey::new(
151 key.clone(),
152 SeqNo::MAX,
153 crate::value::ValueType::Tombstone,
154 )),
155 Bound::Excluded(key) => Bound::Excluded(InternalKey::new(
156 key.clone(),
157 0,
158 crate::value::ValueType::Tombstone,
159 )),
160 Bound::Unbounded => Bound::Unbounded,
161 };
162
163 let hi = match &bounds.1 {
164 Bound::Included(key) => Bound::Included(InternalKey::new(
179 key.clone(),
180 0,
181 crate::value::ValueType::Value,
182 )),
183 Bound::Excluded(key) => Bound::Excluded(InternalKey::new(
184 key.clone(),
185 SeqNo::MAX,
186 crate::value::ValueType::Value,
187 )),
188 Bound::Unbounded => Bound::Unbounded,
189 };
190
191 let range = (lo, hi);
192
193 let mut iters: Vec<BoxedIterator<'_>> = Vec::with_capacity(5);
194
195 if level_manifest.is_disjoint() {
197 let reader = collect_disjoint_tree_with_range(&level_manifest, &bounds);
198
199 if let Some(seqno) = seqno {
200 iters.push(Box::new(reader.filter(move |item| match item {
201 Ok(item) => seqno_filter(item.key.seqno, seqno),
202 Err(_) => true,
203 })));
204 } else {
205 iters.push(Box::new(reader));
206 }
207 } else {
208 for level in &level_manifest.levels {
209 if level.is_disjoint {
210 if !level.is_empty() {
211 if let Some(reader) =
212 LevelReader::new(level.clone(), &bounds, CachePolicy::Write)
213 {
214 if let Some(seqno) = seqno {
215 iters.push(Box::new(reader.filter(move |item| match item {
216 Ok(item) => seqno_filter(item.key.seqno, seqno),
217 Err(_) => true,
218 })));
219 } else {
220 iters.push(Box::new(reader));
221 }
222 }
223 }
224 } else {
225 for segment in &level.segments {
226 if segment.check_key_range_overlap(&bounds) {
227 let reader = segment.range(bounds.clone());
228
229 if let Some(seqno) = seqno {
230 iters.push(Box::new(reader.filter(move |item| match item {
231 Ok(item) => seqno_filter(item.key.seqno, seqno),
232 Err(_) => true,
233 })));
234 } else {
235 iters.push(Box::new(reader));
236 }
237 }
238 }
239 }
240 }
241 }
242
243 drop(level_manifest);
244
245 for memtable in lock.sealed.iter() {
247 let iter = memtable.range(range.clone());
248
249 if let Some(seqno) = seqno {
250 iters.push(Box::new(
251 iter.filter(move |item| seqno_filter(item.key.seqno, seqno))
252 .map(Ok),
253 ));
254 } else {
255 iters.push(Box::new(iter.map(Ok)));
256 }
257 }
258
259 {
261 let iter = lock.active.range(range.clone());
262
263 if let Some(seqno) = seqno {
264 iters.push(Box::new(
265 iter.filter(move |item| seqno_filter(item.key.seqno, seqno))
266 .map(Ok),
267 ));
268 } else {
269 iters.push(Box::new(iter.map(Ok)));
270 }
271 }
272
273 if let Some(index) = &lock.ephemeral {
274 let iter = Box::new(index.range(range).map(Ok));
275 iters.push(iter);
276 }
277
278 let merged = Merger::new(iters);
279 let iter = MvccStream::new(merged);
280
281 Box::new(iter.filter(|x| match x {
282 Ok(value) => !value.key.is_tombstone(),
283 Err(_) => true,
284 }))
285 })
286 }
287}
288
289#[cfg(test)]
290mod tests {
291 use super::*;
292 use crate::Slice;
293 use std::ops::Bound::{Excluded, Included, Unbounded};
294 use test_log::test;
295
296 fn test_prefix(prefix: &[u8], upper_bound: Bound<&[u8]>) {
297 let range = prefix_to_range(prefix);
298 assert_eq!(
299 range,
300 (
301 match prefix {
302 _ if prefix.is_empty() => Unbounded,
303 _ => Included(Slice::from(prefix)),
304 },
305 match upper_bound {
307 Unbounded => Unbounded,
308 Included(x) => Included(Slice::from(x)),
309 Excluded(x) => Excluded(Slice::from(x)),
310 }
311 )
312 );
313 }
314
315 #[test]
316 fn prefix_to_range_basic() {
317 test_prefix(b"abc", Excluded(b"abd"));
318 }
319
320 #[test]
321 fn prefix_to_range_empty() {
322 test_prefix(b"", Unbounded);
323 }
324
325 #[test]
326 fn prefix_to_range_single_char() {
327 test_prefix(b"a", Excluded(b"b"));
328 }
329
330 #[test]
331 fn prefix_to_range_1() {
332 test_prefix(&[0, 250], Excluded(&[0, 251]));
333 }
334
335 #[test]
336 fn prefix_to_range_2() {
337 test_prefix(&[0, 250, 50], Excluded(&[0, 250, 51]));
338 }
339
340 #[test]
341 fn prefix_to_range_3() {
342 test_prefix(&[255, 255, 255], Unbounded);
343 }
344
345 #[test]
346 fn prefix_to_range_char_max() {
347 test_prefix(&[0, 255], Excluded(&[1]));
348 }
349
350 #[test]
351 fn prefix_to_range_char_max_2() {
352 test_prefix(&[0, 2, 255], Excluded(&[0, 3]));
353 }
354}