1use crate::key::InternalKey;
6use crate::value::{InternalValue, SeqNo, UserValue, ValueType};
7use crossbeam_skiplist::SkipMap;
8use std::ops::RangeBounds;
9use std::sync::atomic::AtomicU64;
10
11#[derive(Default)]
15pub struct Memtable {
16 #[doc(hidden)]
18 pub items: SkipMap<InternalKey, UserValue>,
19
20 pub(crate) approximate_size: AtomicU64,
24
25 pub(crate) highest_seqno: AtomicU64,
29}
30
31impl Memtable {
32 pub fn clear(&mut self) {
34 self.items.clear();
35 self.highest_seqno = AtomicU64::new(0);
36 self.approximate_size
37 .store(0, std::sync::atomic::Ordering::Release);
38 }
39
40 pub fn iter(&self) -> impl DoubleEndedIterator<Item = InternalValue> + '_ {
42 self.items.iter().map(|entry| InternalValue {
43 key: entry.key().clone(),
44 value: entry.value().clone(),
45 })
46 }
47
48 pub(crate) fn range<'a, R: RangeBounds<InternalKey> + 'a>(
50 &'a self,
51 range: R,
52 ) -> impl DoubleEndedIterator<Item = InternalValue> + 'a {
53 self.items.range(range).map(|entry| InternalValue {
54 key: entry.key().clone(),
55 value: entry.value().clone(),
56 })
57 }
58
59 #[doc(hidden)]
63 pub fn get(&self, key: &[u8], seqno: SeqNo) -> Option<InternalValue> {
64 if seqno == 0 {
65 return None;
66 }
67
68 let lower_bound = InternalKey::new(key, seqno - 1, ValueType::Value);
85
86 let mut iter = self
87 .items
88 .range(lower_bound..)
89 .take_while(|entry| &*entry.key().user_key == key);
90
91 iter.next().map(|entry| InternalValue {
92 key: entry.key().clone(),
93 value: entry.value().clone(),
94 })
95 }
96
97 pub fn size(&self) -> u64 {
99 self.approximate_size
100 .load(std::sync::atomic::Ordering::Acquire)
101 }
102
103 pub fn len(&self) -> usize {
105 self.items.len()
106 }
107
108 #[must_use]
110 pub fn is_empty(&self) -> bool {
111 self.items.is_empty()
112 }
113
114 #[doc(hidden)]
116 pub fn insert(&self, item: InternalValue) -> (u64, u64) {
117 #[allow(clippy::cast_possible_truncation, clippy::expect_used)]
119 let item_size =
120 (item.key.user_key.len() + item.value.len() + std::mem::size_of::<InternalValue>())
121 .try_into()
122 .expect("should fit into u64");
123
124 let size_before = self
125 .approximate_size
126 .fetch_add(item_size, std::sync::atomic::Ordering::AcqRel);
127
128 let key = InternalKey::new(item.key.user_key, item.key.seqno, item.key.value_type);
129 self.items.insert(key, item.value);
130
131 self.highest_seqno
132 .fetch_max(item.key.seqno, std::sync::atomic::Ordering::AcqRel);
133
134 (item_size, size_before + item_size)
135 }
136
137 pub fn get_highest_seqno(&self) -> Option<SeqNo> {
139 if self.is_empty() {
140 None
141 } else {
142 Some(
143 self.highest_seqno
144 .load(std::sync::atomic::Ordering::Acquire),
145 )
146 }
147 }
148}
149
150#[cfg(test)]
151mod tests {
152 use super::*;
153 use crate::value::ValueType;
154 use test_log::test;
155
156 #[test]
157 #[allow(clippy::unwrap_used)]
158 fn memtable_mvcc_point_read() {
159 let memtable = Memtable::default();
160
161 memtable.insert(InternalValue::from_components(
162 *b"hello-key-999991",
163 *b"hello-value-999991",
164 0,
165 ValueType::Value,
166 ));
167
168 let item = memtable.get(b"hello-key-99999", SeqNo::MAX);
169 assert_eq!(None, item);
170
171 let item = memtable.get(b"hello-key-999991", SeqNo::MAX);
172 assert_eq!(*b"hello-value-999991", &*item.unwrap().value);
173
174 memtable.insert(InternalValue::from_components(
175 *b"hello-key-999991",
176 *b"hello-value-999991-2",
177 1,
178 ValueType::Value,
179 ));
180
181 let item = memtable.get(b"hello-key-99999", SeqNo::MAX);
182 assert_eq!(None, item);
183
184 let item = memtable.get(b"hello-key-999991", SeqNo::MAX);
185 assert_eq!((*b"hello-value-999991-2"), &*item.unwrap().value);
186
187 let item = memtable.get(b"hello-key-99999", 1);
188 assert_eq!(None, item);
189
190 let item = memtable.get(b"hello-key-999991", 1);
191 assert_eq!((*b"hello-value-999991"), &*item.unwrap().value);
192
193 let item = memtable.get(b"hello-key-99999", 2);
194 assert_eq!(None, item);
195
196 let item = memtable.get(b"hello-key-999991", 2);
197 assert_eq!((*b"hello-value-999991-2"), &*item.unwrap().value);
198 }
199
200 #[test]
201 fn memtable_get() {
202 let memtable = Memtable::default();
203
204 let value =
205 InternalValue::from_components(b"abc".to_vec(), b"abc".to_vec(), 0, ValueType::Value);
206
207 memtable.insert(value.clone());
208
209 assert_eq!(Some(value), memtable.get(b"abc", SeqNo::MAX));
210 }
211
212 #[test]
213 fn memtable_get_highest_seqno() {
214 let memtable = Memtable::default();
215
216 memtable.insert(InternalValue::from_components(
217 b"abc".to_vec(),
218 b"abc".to_vec(),
219 0,
220 ValueType::Value,
221 ));
222 memtable.insert(InternalValue::from_components(
223 b"abc".to_vec(),
224 b"abc".to_vec(),
225 1,
226 ValueType::Value,
227 ));
228 memtable.insert(InternalValue::from_components(
229 b"abc".to_vec(),
230 b"abc".to_vec(),
231 2,
232 ValueType::Value,
233 ));
234 memtable.insert(InternalValue::from_components(
235 b"abc".to_vec(),
236 b"abc".to_vec(),
237 3,
238 ValueType::Value,
239 ));
240 memtable.insert(InternalValue::from_components(
241 b"abc".to_vec(),
242 b"abc".to_vec(),
243 4,
244 ValueType::Value,
245 ));
246
247 assert_eq!(
248 Some(InternalValue::from_components(
249 b"abc".to_vec(),
250 b"abc".to_vec(),
251 4,
252 ValueType::Value,
253 )),
254 memtable.get(b"abc", SeqNo::MAX)
255 );
256 }
257
258 #[test]
259 fn memtable_get_prefix() {
260 let memtable = Memtable::default();
261
262 memtable.insert(InternalValue::from_components(
263 b"abc0".to_vec(),
264 b"abc".to_vec(),
265 0,
266 ValueType::Value,
267 ));
268 memtable.insert(InternalValue::from_components(
269 b"abc".to_vec(),
270 b"abc".to_vec(),
271 255,
272 ValueType::Value,
273 ));
274
275 assert_eq!(
276 Some(InternalValue::from_components(
277 b"abc".to_vec(),
278 b"abc".to_vec(),
279 255,
280 ValueType::Value,
281 )),
282 memtable.get(b"abc", SeqNo::MAX)
283 );
284
285 assert_eq!(
286 Some(InternalValue::from_components(
287 b"abc0".to_vec(),
288 b"abc".to_vec(),
289 0,
290 ValueType::Value,
291 )),
292 memtable.get(b"abc0", SeqNo::MAX)
293 );
294 }
295
296 #[test]
297 fn memtable_get_old_version() {
298 let memtable = Memtable::default();
299
300 memtable.insert(InternalValue::from_components(
301 b"abc".to_vec(),
302 b"abc".to_vec(),
303 0,
304 ValueType::Value,
305 ));
306 memtable.insert(InternalValue::from_components(
307 b"abc".to_vec(),
308 b"abc".to_vec(),
309 99,
310 ValueType::Value,
311 ));
312 memtable.insert(InternalValue::from_components(
313 b"abc".to_vec(),
314 b"abc".to_vec(),
315 255,
316 ValueType::Value,
317 ));
318
319 assert_eq!(
320 Some(InternalValue::from_components(
321 b"abc".to_vec(),
322 b"abc".to_vec(),
323 255,
324 ValueType::Value,
325 )),
326 memtable.get(b"abc", SeqNo::MAX)
327 );
328
329 assert_eq!(
330 Some(InternalValue::from_components(
331 b"abc".to_vec(),
332 b"abc".to_vec(),
333 99,
334 ValueType::Value,
335 )),
336 memtable.get(b"abc", 100)
337 );
338
339 assert_eq!(
340 Some(InternalValue::from_components(
341 b"abc".to_vec(),
342 b"abc".to_vec(),
343 0,
344 ValueType::Value,
345 )),
346 memtable.get(b"abc", 50)
347 );
348 }
349}