1use crate::key::InternalKey;
6use crate::{
7 value::{InternalValue, SeqNo, UserValue},
8 ValueType,
9};
10use crossbeam_skiplist::SkipMap;
11use std::ops::RangeBounds;
12use std::sync::atomic::AtomicU64;
13
14#[derive(Default)]
18pub struct Memtable {
19 #[doc(hidden)]
21 pub items: SkipMap<InternalKey, UserValue>,
22
23 pub(crate) approximate_size: AtomicU64,
27
28 pub(crate) highest_seqno: AtomicU64,
32}
33
34impl Memtable {
35 pub fn clear(&mut self) {
37 self.items.clear();
38 self.highest_seqno = AtomicU64::new(0);
39 self.approximate_size
40 .store(0, std::sync::atomic::Ordering::Release);
41 }
42
43 pub fn iter(&self) -> impl DoubleEndedIterator<Item = InternalValue> + '_ {
45 self.items.iter().map(|entry| InternalValue {
46 key: entry.key().clone(),
47 value: entry.value().clone(),
48 })
49 }
50
51 pub(crate) fn range<'a, R: RangeBounds<InternalKey> + 'a>(
53 &'a self,
54 range: R,
55 ) -> impl DoubleEndedIterator<Item = InternalValue> + 'a {
56 self.items.range(range).map(|entry| InternalValue {
57 key: entry.key().clone(),
58 value: entry.value().clone(),
59 })
60 }
61
62 #[doc(hidden)]
66 pub fn get(&self, key: &[u8], seqno: SeqNo) -> Option<InternalValue> {
67 if seqno == 0 {
68 return None;
69 }
70
71 let lower_bound = InternalKey::new(key, seqno - 1, ValueType::Value);
88
89 let mut iter = self
90 .items
91 .range(lower_bound..)
92 .take_while(|entry| &*entry.key().user_key == key);
93
94 iter.next().map(|entry| InternalValue {
95 key: entry.key().clone(),
96 value: entry.value().clone(),
97 })
98 }
99
100 pub fn size(&self) -> u64 {
102 self.approximate_size
103 .load(std::sync::atomic::Ordering::Acquire)
104 }
105
106 pub fn len(&self) -> usize {
108 self.items.len()
109 }
110
111 #[must_use]
113 pub fn is_empty(&self) -> bool {
114 self.items.is_empty()
115 }
116
117 #[doc(hidden)]
119 pub fn insert(&self, item: InternalValue) -> (u64, u64) {
120 #[allow(clippy::cast_possible_truncation, clippy::expect_used)]
122 let item_size =
123 (item.key.user_key.len() + item.value.len() + std::mem::size_of::<InternalValue>())
124 .try_into()
125 .expect("should fit into u64");
126
127 let size_before = self
128 .approximate_size
129 .fetch_add(item_size, std::sync::atomic::Ordering::AcqRel);
130
131 let key = InternalKey::new(item.key.user_key, item.key.seqno, item.key.value_type);
132 self.items.insert(key, item.value);
133
134 self.highest_seqno
135 .fetch_max(item.key.seqno, std::sync::atomic::Ordering::AcqRel);
136
137 (item_size, size_before + item_size)
138 }
139
140 pub fn get_highest_seqno(&self) -> Option<SeqNo> {
142 if self.is_empty() {
143 None
144 } else {
145 Some(
146 self.highest_seqno
147 .load(std::sync::atomic::Ordering::Acquire),
148 )
149 }
150 }
151}
152
153#[cfg(test)]
154mod tests {
155 use super::*;
156 use crate::ValueType;
157 use test_log::test;
158
159 #[test]
160 #[allow(clippy::unwrap_used)]
161 fn memtable_mvcc_point_read() {
162 let memtable = Memtable::default();
163
164 memtable.insert(InternalValue::from_components(
165 *b"hello-key-999991",
166 *b"hello-value-999991",
167 0,
168 ValueType::Value,
169 ));
170
171 let item = memtable.get(b"hello-key-99999", SeqNo::MAX);
172 assert_eq!(None, item);
173
174 let item = memtable.get(b"hello-key-999991", SeqNo::MAX);
175 assert_eq!(*b"hello-value-999991", &*item.unwrap().value);
176
177 memtable.insert(InternalValue::from_components(
178 *b"hello-key-999991",
179 *b"hello-value-999991-2",
180 1,
181 ValueType::Value,
182 ));
183
184 let item = memtable.get(b"hello-key-99999", SeqNo::MAX);
185 assert_eq!(None, item);
186
187 let item = memtable.get(b"hello-key-999991", SeqNo::MAX);
188 assert_eq!((*b"hello-value-999991-2"), &*item.unwrap().value);
189
190 let item = memtable.get(b"hello-key-99999", 1);
191 assert_eq!(None, item);
192
193 let item = memtable.get(b"hello-key-999991", 1);
194 assert_eq!((*b"hello-value-999991"), &*item.unwrap().value);
195
196 let item = memtable.get(b"hello-key-99999", 2);
197 assert_eq!(None, item);
198
199 let item = memtable.get(b"hello-key-999991", 2);
200 assert_eq!((*b"hello-value-999991-2"), &*item.unwrap().value);
201 }
202
203 #[test]
204 fn memtable_get() {
205 let memtable = Memtable::default();
206
207 let value =
208 InternalValue::from_components(b"abc".to_vec(), b"abc".to_vec(), 0, ValueType::Value);
209
210 memtable.insert(value.clone());
211
212 assert_eq!(Some(value), memtable.get(b"abc", SeqNo::MAX));
213 }
214
215 #[test]
216 fn memtable_get_highest_seqno() {
217 let memtable = Memtable::default();
218
219 memtable.insert(InternalValue::from_components(
220 b"abc".to_vec(),
221 b"abc".to_vec(),
222 0,
223 ValueType::Value,
224 ));
225 memtable.insert(InternalValue::from_components(
226 b"abc".to_vec(),
227 b"abc".to_vec(),
228 1,
229 ValueType::Value,
230 ));
231 memtable.insert(InternalValue::from_components(
232 b"abc".to_vec(),
233 b"abc".to_vec(),
234 2,
235 ValueType::Value,
236 ));
237 memtable.insert(InternalValue::from_components(
238 b"abc".to_vec(),
239 b"abc".to_vec(),
240 3,
241 ValueType::Value,
242 ));
243 memtable.insert(InternalValue::from_components(
244 b"abc".to_vec(),
245 b"abc".to_vec(),
246 4,
247 ValueType::Value,
248 ));
249
250 assert_eq!(
251 Some(InternalValue::from_components(
252 b"abc".to_vec(),
253 b"abc".to_vec(),
254 4,
255 ValueType::Value,
256 )),
257 memtable.get(b"abc", SeqNo::MAX)
258 );
259 }
260
261 #[test]
262 fn memtable_get_prefix() {
263 let memtable = Memtable::default();
264
265 memtable.insert(InternalValue::from_components(
266 b"abc0".to_vec(),
267 b"abc".to_vec(),
268 0,
269 ValueType::Value,
270 ));
271 memtable.insert(InternalValue::from_components(
272 b"abc".to_vec(),
273 b"abc".to_vec(),
274 255,
275 ValueType::Value,
276 ));
277
278 assert_eq!(
279 Some(InternalValue::from_components(
280 b"abc".to_vec(),
281 b"abc".to_vec(),
282 255,
283 ValueType::Value,
284 )),
285 memtable.get(b"abc", SeqNo::MAX)
286 );
287
288 assert_eq!(
289 Some(InternalValue::from_components(
290 b"abc0".to_vec(),
291 b"abc".to_vec(),
292 0,
293 ValueType::Value,
294 )),
295 memtable.get(b"abc0", SeqNo::MAX)
296 );
297 }
298
299 #[test]
300 fn memtable_get_old_version() {
301 let memtable = Memtable::default();
302
303 memtable.insert(InternalValue::from_components(
304 b"abc".to_vec(),
305 b"abc".to_vec(),
306 0,
307 ValueType::Value,
308 ));
309 memtable.insert(InternalValue::from_components(
310 b"abc".to_vec(),
311 b"abc".to_vec(),
312 99,
313 ValueType::Value,
314 ));
315 memtable.insert(InternalValue::from_components(
316 b"abc".to_vec(),
317 b"abc".to_vec(),
318 255,
319 ValueType::Value,
320 ));
321
322 assert_eq!(
323 Some(InternalValue::from_components(
324 b"abc".to_vec(),
325 b"abc".to_vec(),
326 255,
327 ValueType::Value,
328 )),
329 memtable.get(b"abc", SeqNo::MAX)
330 );
331
332 assert_eq!(
333 Some(InternalValue::from_components(
334 b"abc".to_vec(),
335 b"abc".to_vec(),
336 99,
337 ValueType::Value,
338 )),
339 memtable.get(b"abc", 100)
340 );
341
342 assert_eq!(
343 Some(InternalValue::from_components(
344 b"abc".to_vec(),
345 b"abc".to_vec(),
346 0,
347 ValueType::Value,
348 )),
349 memtable.get(b"abc", 50)
350 );
351 }
352}