1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
//! Stitch a full table scan from piece-wise iteration.
//!
//! Typically used for memory-only index.
use std::ops::{Bound, RangeBounds};
use std::vec;
use crate::core::{Diff, Entry, FullScan, Result, ScanEntry};
/// SkipScan can be used to stitch piece-wise scanning of LSM
/// data-structure, only selecting mutations (and versions)
/// that are within specified sequence-no range.
///
/// Mitigates following issues.
///
/// a. Read references to data-structure is held only for
/// very small period, like few tens of micro-seconds.
/// b. Automatically filters mutations that are older than
/// specified sequence-no range, there by saving time for
/// top-level DB components.
/// c. Ignores mutations that are newer than the specified
/// sequence-no range, there by providing a stable full
/// table scan.
///
/// Important pre-requist:
///
/// a. Applicable only for LSM based data structures.
/// b. Data-structure must not suffer any delete/purge
/// operation until full-scan is completed.
/// c. Data-structure must implement FullScan trait.
pub(crate) struct SkipScan<'a, M, K, V, G>
where
K: 'a + Clone + Ord,
V: 'a + Clone + Diff + From<<V as Diff>::D>,
G: Clone + RangeBounds<u64>,
M: FullScan<K, V>,
{
index: &'a M, // read reference to index.
within: G, // pick mutations withing this sequence-no range.
from: Bound<K>, // place to start the next batch of scan.
iter: vec::IntoIter<Result<Entry<K, V>>>,
batch_size: usize,
}
enum Refill<K, V>
where
K: Clone + Ord,
V: Clone + Diff,
{
Ok(Vec<Result<Entry<K, V>>>),
Retry(K, Vec<Result<Entry<K, V>>>),
Finish(Vec<Result<Entry<K, V>>>),
}
impl<'a, M, K, V, G> SkipScan<'a, M, K, V, G>
where
K: 'a + Clone + Ord,
V: 'a + Clone + Diff + From<<V as Diff>::D>,
G: Clone + RangeBounds<u64>,
M: FullScan<K, V>,
{
const BATCH_SIZE: usize = 1000;
pub(crate) fn new(index: &'a M, within: G) -> SkipScan<M, K, V, G> {
SkipScan {
index,
within,
from: Bound::Unbounded,
iter: vec![].into_iter(),
batch_size: Self::BATCH_SIZE,
}
}
pub(crate) fn set_batch_size(&mut self, batch_size: usize) {
self.batch_size = batch_size
}
fn refill(&mut self) -> Refill<K, V> {
let from = self.from.clone();
let mut entries: Vec<Result<Entry<K, V>>> = vec![];
match self.index.full_scan(from, self.within.clone()) {
Ok(niter) => {
let mut niter = niter.enumerate();
loop {
match niter.next() {
Some((i, Ok(ScanEntry::Found(entry)))) => {
entries.push(Ok(entry));
if i >= self.batch_size {
break Refill::Ok(entries);
}
}
Some((_, Ok(ScanEntry::Retry(key)))) => {
break Refill::Retry(key, entries);
}
Some((_, Err(err))) => {
entries.push(Err(err));
break Refill::Ok(entries);
}
None => break Refill::Finish(entries),
}
}
}
Err(err) => {
entries.push(Err(err));
Refill::Ok(entries)
}
}
}
}
impl<'a, M, K, V, G> Iterator for SkipScan<'a, M, K, V, G>
where
K: 'a + Clone + Ord,
V: 'a + Clone + Diff + From<<V as Diff>::D>,
G: Clone + RangeBounds<u64>,
M: FullScan<K, V>,
{
type Item = Result<Entry<K, V>>;
fn next(&mut self) -> Option<Self::Item> {
loop {
match self.iter.next() {
Some(Ok(entry)) => {
self.from = Bound::Excluded(entry.to_key());
break Some(Ok(entry));
}
Some(Err(err)) => {
self.batch_size = 0;
break Some(Err(err));
}
None if self.batch_size == 0 => break None,
None => {
let entries = match self.refill() {
Refill::Ok(entries) => entries,
Refill::Retry(key, entries) => {
self.from = Bound::Excluded(key);
if entries.len() > 0 {
entries
} else {
continue;
}
}
Refill::Finish(entries) => {
self.batch_size = 0;
entries
}
};
self.iter = entries.into_iter()
}
}
}
}
}
#[cfg(test)]
#[path = "scans_test.rs"]
mod scans_test;