1use crate::key::InternalKey;
6use crate::{
7 value::{InternalValue, SeqNo, UserValue},
8 ValueType,
9};
10use crossbeam_skiplist::SkipMap;
11use std::ops::RangeBounds;
12use std::sync::atomic::AtomicU64;
13
14#[derive(Default)]
18pub struct Memtable {
19 #[doc(hidden)]
21 pub items: SkipMap<InternalKey, UserValue>,
22
23 pub(crate) approximate_size: AtomicU64,
27
28 pub(crate) highest_seqno: AtomicU64,
32}
33
34impl Memtable {
35 pub fn clear(&mut self) {
37 self.items.clear();
38 self.highest_seqno = AtomicU64::new(0);
39 self.approximate_size
40 .store(0, std::sync::atomic::Ordering::Release);
41 }
42
43 pub fn iter(&self) -> impl DoubleEndedIterator<Item = InternalValue> + '_ {
45 self.items.iter().map(|entry| InternalValue {
46 key: entry.key().clone(),
47 value: entry.value().clone(),
48 })
49 }
50
51 pub(crate) fn range<'a, R: RangeBounds<InternalKey> + 'a>(
53 &'a self,
54 range: R,
55 ) -> impl DoubleEndedIterator<Item = InternalValue> + 'a {
56 self.items.range(range).map(|entry| InternalValue {
57 key: entry.key().clone(),
58 value: entry.value().clone(),
59 })
60 }
61
62 #[doc(hidden)]
66 pub fn get(&self, key: &[u8], seqno: SeqNo) -> Option<InternalValue> {
67 if seqno == 0 {
68 return None;
69 }
70
71 let lower_bound = InternalKey::new(key, seqno - 1, ValueType::Value);
88
89 let mut iter = self
90 .items
91 .range(lower_bound..)
92 .take_while(|entry| &*entry.key().user_key == key);
93
94 iter.next().map(|entry| InternalValue {
95 key: entry.key().clone(),
96 value: entry.value().clone(),
97 })
98 }
99
100 pub fn size(&self) -> u64 {
102 self.approximate_size
103 .load(std::sync::atomic::Ordering::Acquire)
104 }
105
106 pub fn len(&self) -> usize {
108 self.items.len()
109 }
110
111 #[must_use]
113 pub fn is_empty(&self) -> bool {
114 self.items.is_empty()
115 }
116
117 #[doc(hidden)]
119 pub fn insert(&self, item: InternalValue) -> (u64, u64) {
120 #[expect(
121 clippy::expect_used,
122 reason = "keys are limited to 16-bit length + values are limited to 32-bit length"
123 )]
124 let item_size =
125 (item.key.user_key.len() + item.value.len() + std::mem::size_of::<InternalValue>())
126 .try_into()
127 .expect("should fit into u64");
128
129 let size_before = self
130 .approximate_size
131 .fetch_add(item_size, std::sync::atomic::Ordering::AcqRel);
132
133 let key = InternalKey::new(item.key.user_key, item.key.seqno, item.key.value_type);
134 self.items.insert(key, item.value);
135
136 self.highest_seqno
137 .fetch_max(item.key.seqno, std::sync::atomic::Ordering::AcqRel);
138
139 (item_size, size_before + item_size)
140 }
141
142 pub fn get_highest_seqno(&self) -> Option<SeqNo> {
144 if self.is_empty() {
145 None
146 } else {
147 Some(
148 self.highest_seqno
149 .load(std::sync::atomic::Ordering::Acquire),
150 )
151 }
152 }
153}
154
155#[cfg(test)]
156mod tests {
157 use super::*;
158 use crate::ValueType;
159 use test_log::test;
160
161 #[test]
162 #[expect(clippy::unwrap_used)]
163 fn memtable_mvcc_point_read() {
164 let memtable = Memtable::default();
165
166 memtable.insert(InternalValue::from_components(
167 *b"hello-key-999991",
168 *b"hello-value-999991",
169 0,
170 ValueType::Value,
171 ));
172
173 let item = memtable.get(b"hello-key-99999", SeqNo::MAX);
174 assert_eq!(None, item);
175
176 let item = memtable.get(b"hello-key-999991", SeqNo::MAX);
177 assert_eq!(*b"hello-value-999991", &*item.unwrap().value);
178
179 memtable.insert(InternalValue::from_components(
180 *b"hello-key-999991",
181 *b"hello-value-999991-2",
182 1,
183 ValueType::Value,
184 ));
185
186 let item = memtable.get(b"hello-key-99999", SeqNo::MAX);
187 assert_eq!(None, item);
188
189 let item = memtable.get(b"hello-key-999991", SeqNo::MAX);
190 assert_eq!((*b"hello-value-999991-2"), &*item.unwrap().value);
191
192 let item = memtable.get(b"hello-key-99999", 1);
193 assert_eq!(None, item);
194
195 let item = memtable.get(b"hello-key-999991", 1);
196 assert_eq!((*b"hello-value-999991"), &*item.unwrap().value);
197
198 let item = memtable.get(b"hello-key-99999", 2);
199 assert_eq!(None, item);
200
201 let item = memtable.get(b"hello-key-999991", 2);
202 assert_eq!((*b"hello-value-999991-2"), &*item.unwrap().value);
203 }
204
205 #[test]
206 fn memtable_get() {
207 let memtable = Memtable::default();
208
209 let value =
210 InternalValue::from_components(b"abc".to_vec(), b"abc".to_vec(), 0, ValueType::Value);
211
212 memtable.insert(value.clone());
213
214 assert_eq!(Some(value), memtable.get(b"abc", SeqNo::MAX));
215 }
216
217 #[test]
218 fn memtable_get_highest_seqno() {
219 let memtable = Memtable::default();
220
221 memtable.insert(InternalValue::from_components(
222 b"abc".to_vec(),
223 b"abc".to_vec(),
224 0,
225 ValueType::Value,
226 ));
227 memtable.insert(InternalValue::from_components(
228 b"abc".to_vec(),
229 b"abc".to_vec(),
230 1,
231 ValueType::Value,
232 ));
233 memtable.insert(InternalValue::from_components(
234 b"abc".to_vec(),
235 b"abc".to_vec(),
236 2,
237 ValueType::Value,
238 ));
239 memtable.insert(InternalValue::from_components(
240 b"abc".to_vec(),
241 b"abc".to_vec(),
242 3,
243 ValueType::Value,
244 ));
245 memtable.insert(InternalValue::from_components(
246 b"abc".to_vec(),
247 b"abc".to_vec(),
248 4,
249 ValueType::Value,
250 ));
251
252 assert_eq!(
253 Some(InternalValue::from_components(
254 b"abc".to_vec(),
255 b"abc".to_vec(),
256 4,
257 ValueType::Value,
258 )),
259 memtable.get(b"abc", SeqNo::MAX)
260 );
261 }
262
263 #[test]
264 fn memtable_get_prefix() {
265 let memtable = Memtable::default();
266
267 memtable.insert(InternalValue::from_components(
268 b"abc0".to_vec(),
269 b"abc".to_vec(),
270 0,
271 ValueType::Value,
272 ));
273 memtable.insert(InternalValue::from_components(
274 b"abc".to_vec(),
275 b"abc".to_vec(),
276 255,
277 ValueType::Value,
278 ));
279
280 assert_eq!(
281 Some(InternalValue::from_components(
282 b"abc".to_vec(),
283 b"abc".to_vec(),
284 255,
285 ValueType::Value,
286 )),
287 memtable.get(b"abc", SeqNo::MAX)
288 );
289
290 assert_eq!(
291 Some(InternalValue::from_components(
292 b"abc0".to_vec(),
293 b"abc".to_vec(),
294 0,
295 ValueType::Value,
296 )),
297 memtable.get(b"abc0", SeqNo::MAX)
298 );
299 }
300
301 #[test]
302 fn memtable_get_old_version() {
303 let memtable = Memtable::default();
304
305 memtable.insert(InternalValue::from_components(
306 b"abc".to_vec(),
307 b"abc".to_vec(),
308 0,
309 ValueType::Value,
310 ));
311 memtable.insert(InternalValue::from_components(
312 b"abc".to_vec(),
313 b"abc".to_vec(),
314 99,
315 ValueType::Value,
316 ));
317 memtable.insert(InternalValue::from_components(
318 b"abc".to_vec(),
319 b"abc".to_vec(),
320 255,
321 ValueType::Value,
322 ));
323
324 assert_eq!(
325 Some(InternalValue::from_components(
326 b"abc".to_vec(),
327 b"abc".to_vec(),
328 255,
329 ValueType::Value,
330 )),
331 memtable.get(b"abc", SeqNo::MAX)
332 );
333
334 assert_eq!(
335 Some(InternalValue::from_components(
336 b"abc".to_vec(),
337 b"abc".to_vec(),
338 99,
339 ValueType::Value,
340 )),
341 memtable.get(b"abc", 100)
342 );
343
344 assert_eq!(
345 Some(InternalValue::from_components(
346 b"abc".to_vec(),
347 b"abc".to_vec(),
348 0,
349 ValueType::Value,
350 )),
351 memtable.get(b"abc", 50)
352 );
353 }
354}