1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
//! The primary point of the pruning cursor is to turn an in-memory skip-list into something that
//! looks like a consistent cut of the data. Consequently, there nees to be more logic than just
//! working on a static sst. Other cursors will assume a pruning cursor gets applied beneath them
//! to create a cursor over an immutable data set.
use super::{Cursor, Error, KeyRef, LOGIC_ERROR, logic_error_prev_not_positioned};
/////////////////////////////////////////// PruningCursor //////////////////////////////////////////
/// A PruningCursor returns the latest value less than or equal to a timestmap.
pub struct PruningCursor<C: Cursor> {
cursor: C,
timestamp: u64,
skip_key: Option<Vec<u8>>,
}
impl<C: Cursor> PruningCursor<C> {
/// Create a new pruning cursor.
pub fn new(mut cursor: C, timestamp: u64) -> Result<Self, Error> {
cursor.seek_to_first()?;
Ok(Self {
cursor,
timestamp,
skip_key: None,
})
}
fn set_skip_key(&mut self) {
match self.key() {
Some(v) => {
self.skip_key = Some(v.key.to_vec());
}
None => {
self.skip_key = None;
}
}
}
}
impl<C: Cursor> Cursor for PruningCursor<C> {
fn seek_to_first(&mut self) -> Result<(), Error> {
self.skip_key = None;
self.cursor.seek_to_first()
}
fn seek_to_last(&mut self) -> Result<(), Error> {
self.skip_key = None;
self.cursor.seek_to_last()
}
fn seek(&mut self, key: &[u8]) -> Result<(), Error> {
self.skip_key = None;
self.cursor.seek(key)?;
loop {
let kr = match self.key() {
Some(kr) => kr,
None => {
return Ok(());
}
};
if kr.timestamp <= self.timestamp && self.value().is_none() {
self.set_skip_key();
} else if kr.timestamp <= self.timestamp
&& (self.skip_key.is_none() || self.skip_key.as_ref().unwrap() != kr.key)
{
self.set_skip_key();
return Ok(());
}
self.cursor.next()?;
}
}
fn prev(&mut self) -> Result<(), Error> {
if self.key().is_none() {
self.skip_key = None;
}
loop {
// Skip the set skip key.
self.cursor.prev()?;
while self.skip_key.is_some() {
let kr = match self.key() {
Some(kr) => kr,
None => {
self.skip_key = None;
return Ok(());
}
};
// SAFETY(rescrv): We check is_some() as while invariant.
if self.skip_key.as_ref().unwrap() != kr.key {
self.skip_key = None;
} else {
self.cursor.prev()?;
}
}
// This is the key we want to investigate.
// Find the largest timestamp less than self.timestamp for this key.
let kr = match self.key() {
Some(kr) => kr,
None => {
self.skip_key = None;
return Ok(());
}
};
// If the smallest timestamp of this key (the first we'll hit after a series of prevs)
// is too large, set skip key and continue at the top.
if kr.timestamp > self.timestamp {
self.set_skip_key();
continue;
}
// We know there exists a target key with timestamp less than or equal to our threshold
// timestamp.
let target_key = kr.key.to_vec();
// Loop until we overrun and then reverse by at least one.
//
// Note that reversing by one is not sufficient as late-arriving writes---which are
// allowed to arrive out of order in the prescribed pattern---will possibly insert
// earlier data. The LSM tree relies upon the pruning cursor to give a consistent view
// by screening out these writes, and then it sequences the writes so that they expose
// their data in an order consistent with their timestamps.
loop {
self.cursor.prev()?;
let kr = match self.key() {
Some(kr) => kr,
None => {
break;
}
};
if kr.timestamp > self.timestamp || kr.key != target_key {
break;
}
}
// Here's where we would be most likely to fail with concurrency present.
if self.key().is_none() {
self.cursor.next()?;
}
// SAFETY(rescrv) self.key() cannot be None because we witnessed target_key above as a
// valid value that *must* come after the None at the head of the list. We know that
// key must have a lesser timestamp by the continue under kr.timestamp > self.timestamp
// above. Thus we can simply seek until we have both fronts.
while let Some(kr) = self.key() {
if kr.timestamp <= self.timestamp && kr.key == target_key {
break;
} else {
self.cursor.next()?;
}
}
// Operate on the considered value.
// The largest of target_key less than or equal to the timestamp.
let kr = match self.key() {
Some(kr) => kr,
None => {
LOGIC_ERROR.click();
return Err(logic_error_prev_not_positioned());
}
};
// SAFETY(rescrv): Ensured by the while loop above.
assert!(kr.timestamp <= self.timestamp);
assert!(kr.key == target_key);
// If it's not a tombstone, return the value (and skip it next time)
// Otherwise, just skip it.
if self.value().is_some() {
self.set_skip_key();
return Ok(());
} else {
self.set_skip_key();
}
}
}
fn next(&mut self) -> Result<(), Error> {
loop {
self.cursor.next()?;
let kr = match self.key() {
Some(kr) => kr,
None => {
return Ok(());
}
};
if kr.timestamp <= self.timestamp && self.value().is_none() {
self.set_skip_key();
} else if kr.timestamp <= self.timestamp
&& (self.skip_key.is_none()
// SAFETY(rescrv): We check is_none() and short circuit.
|| self.skip_key.as_ref().unwrap() != kr.key)
{
self.set_skip_key();
return Ok(());
}
}
}
fn key(&self) -> Option<KeyRef<'_>> {
self.cursor.key()
}
fn value(&self) -> Option<&[u8]> {
self.cursor.value()
}
}