1use crate::key::InternalKey;
6use crate::{
7 value::{InternalValue, SeqNo, UserValue},
8 ValueType,
9};
10use crossbeam_skiplist::SkipMap;
11use std::ops::RangeBounds;
12use std::sync::atomic::{AtomicBool, AtomicU64};
13
14pub use crate::tree::inner::MemtableId;
15
16pub struct Memtable {
20 #[doc(hidden)]
21 pub id: MemtableId,
22
23 #[doc(hidden)]
25 pub items: SkipMap<InternalKey, UserValue>,
26
27 pub(crate) approximate_size: AtomicU64,
31
32 pub(crate) highest_seqno: AtomicU64,
36
37 pub(crate) requested_rotation: AtomicBool,
38}
39
40impl Memtable {
41 pub fn id(&self) -> MemtableId {
43 self.id
44 }
45
46 pub fn is_flagged_for_rotation(&self) -> bool {
48 self.requested_rotation
49 .load(std::sync::atomic::Ordering::Relaxed)
50 }
51
52 pub fn flag_rotated(&self) {
54 self.requested_rotation
55 .store(true, std::sync::atomic::Ordering::Relaxed);
56 }
57
58 #[doc(hidden)]
59 #[must_use]
60 pub fn new(id: MemtableId) -> Self {
61 Self {
62 id,
63 items: SkipMap::default(),
64 approximate_size: AtomicU64::default(),
65 highest_seqno: AtomicU64::default(),
66 requested_rotation: AtomicBool::default(),
67 }
68 }
69
70 pub fn iter(&self) -> impl DoubleEndedIterator<Item = InternalValue> + '_ {
72 self.items.iter().map(|entry| InternalValue {
73 key: entry.key().clone(),
74 value: entry.value().clone(),
75 })
76 }
77
78 pub(crate) fn range<'a, R: RangeBounds<InternalKey> + 'a>(
80 &'a self,
81 range: R,
82 ) -> impl DoubleEndedIterator<Item = InternalValue> + 'a {
83 self.items.range(range).map(|entry| InternalValue {
84 key: entry.key().clone(),
85 value: entry.value().clone(),
86 })
87 }
88
89 #[doc(hidden)]
93 pub fn get(&self, key: &[u8], seqno: SeqNo) -> Option<InternalValue> {
94 if seqno == 0 {
95 return None;
96 }
97
98 let lower_bound = InternalKey::new(key, seqno - 1, ValueType::Value);
115
116 let mut iter = self
117 .items
118 .range(lower_bound..)
119 .take_while(|entry| &*entry.key().user_key == key);
120
121 iter.next().map(|entry| InternalValue {
122 key: entry.key().clone(),
123 value: entry.value().clone(),
124 })
125 }
126
127 pub fn size(&self) -> u64 {
129 self.approximate_size
130 .load(std::sync::atomic::Ordering::Acquire)
131 }
132
133 pub fn len(&self) -> usize {
135 self.items.len()
136 }
137
138 #[must_use]
140 pub fn is_empty(&self) -> bool {
141 self.items.is_empty()
142 }
143
144 #[doc(hidden)]
146 pub fn insert(&self, item: InternalValue) -> (u64, u64) {
147 #[expect(
148 clippy::expect_used,
149 reason = "keys are limited to 16-bit length + values are limited to 32-bit length"
150 )]
151 let item_size =
152 (item.key.user_key.len() + item.value.len() + std::mem::size_of::<InternalValue>())
153 .try_into()
154 .expect("should fit into u64");
155
156 let size_before = self
157 .approximate_size
158 .fetch_add(item_size, std::sync::atomic::Ordering::AcqRel);
159
160 let key = InternalKey::new(item.key.user_key, item.key.seqno, item.key.value_type);
161 self.items.insert(key, item.value);
162
163 self.highest_seqno
164 .fetch_max(item.key.seqno, std::sync::atomic::Ordering::AcqRel);
165
166 (item_size, size_before + item_size)
167 }
168
169 pub fn get_highest_seqno(&self) -> Option<SeqNo> {
171 if self.is_empty() {
172 None
173 } else {
174 Some(
175 self.highest_seqno
176 .load(std::sync::atomic::Ordering::Acquire),
177 )
178 }
179 }
180}
181
182#[cfg(test)]
183mod tests {
184 use super::*;
185 use crate::ValueType;
186 use test_log::test;
187
188 #[test]
189 #[expect(clippy::unwrap_used)]
190 fn memtable_mvcc_point_read() {
191 let memtable = Memtable::new(0);
192
193 memtable.insert(InternalValue::from_components(
194 *b"hello-key-999991",
195 *b"hello-value-999991",
196 0,
197 ValueType::Value,
198 ));
199
200 let item = memtable.get(b"hello-key-99999", SeqNo::MAX);
201 assert_eq!(None, item);
202
203 let item = memtable.get(b"hello-key-999991", SeqNo::MAX);
204 assert_eq!(*b"hello-value-999991", &*item.unwrap().value);
205
206 memtable.insert(InternalValue::from_components(
207 *b"hello-key-999991",
208 *b"hello-value-999991-2",
209 1,
210 ValueType::Value,
211 ));
212
213 let item = memtable.get(b"hello-key-99999", SeqNo::MAX);
214 assert_eq!(None, item);
215
216 let item = memtable.get(b"hello-key-999991", SeqNo::MAX);
217 assert_eq!((*b"hello-value-999991-2"), &*item.unwrap().value);
218
219 let item = memtable.get(b"hello-key-99999", 1);
220 assert_eq!(None, item);
221
222 let item = memtable.get(b"hello-key-999991", 1);
223 assert_eq!((*b"hello-value-999991"), &*item.unwrap().value);
224
225 let item = memtable.get(b"hello-key-99999", 2);
226 assert_eq!(None, item);
227
228 let item = memtable.get(b"hello-key-999991", 2);
229 assert_eq!((*b"hello-value-999991-2"), &*item.unwrap().value);
230 }
231
232 #[test]
233 fn memtable_get() {
234 let memtable = Memtable::new(0);
235
236 let value =
237 InternalValue::from_components(b"abc".to_vec(), b"abc".to_vec(), 0, ValueType::Value);
238
239 memtable.insert(value.clone());
240
241 assert_eq!(Some(value), memtable.get(b"abc", SeqNo::MAX));
242 }
243
244 #[test]
245 fn memtable_get_highest_seqno() {
246 let memtable = Memtable::new(0);
247
248 memtable.insert(InternalValue::from_components(
249 b"abc".to_vec(),
250 b"abc".to_vec(),
251 0,
252 ValueType::Value,
253 ));
254 memtable.insert(InternalValue::from_components(
255 b"abc".to_vec(),
256 b"abc".to_vec(),
257 1,
258 ValueType::Value,
259 ));
260 memtable.insert(InternalValue::from_components(
261 b"abc".to_vec(),
262 b"abc".to_vec(),
263 2,
264 ValueType::Value,
265 ));
266 memtable.insert(InternalValue::from_components(
267 b"abc".to_vec(),
268 b"abc".to_vec(),
269 3,
270 ValueType::Value,
271 ));
272 memtable.insert(InternalValue::from_components(
273 b"abc".to_vec(),
274 b"abc".to_vec(),
275 4,
276 ValueType::Value,
277 ));
278
279 assert_eq!(
280 Some(InternalValue::from_components(
281 b"abc".to_vec(),
282 b"abc".to_vec(),
283 4,
284 ValueType::Value,
285 )),
286 memtable.get(b"abc", SeqNo::MAX)
287 );
288 }
289
290 #[test]
291 fn memtable_get_prefix() {
292 let memtable = Memtable::new(0);
293
294 memtable.insert(InternalValue::from_components(
295 b"abc0".to_vec(),
296 b"abc".to_vec(),
297 0,
298 ValueType::Value,
299 ));
300 memtable.insert(InternalValue::from_components(
301 b"abc".to_vec(),
302 b"abc".to_vec(),
303 255,
304 ValueType::Value,
305 ));
306
307 assert_eq!(
308 Some(InternalValue::from_components(
309 b"abc".to_vec(),
310 b"abc".to_vec(),
311 255,
312 ValueType::Value,
313 )),
314 memtable.get(b"abc", SeqNo::MAX)
315 );
316
317 assert_eq!(
318 Some(InternalValue::from_components(
319 b"abc0".to_vec(),
320 b"abc".to_vec(),
321 0,
322 ValueType::Value,
323 )),
324 memtable.get(b"abc0", SeqNo::MAX)
325 );
326 }
327
328 #[test]
329 fn memtable_get_old_version() {
330 let memtable = Memtable::new(0);
331
332 memtable.insert(InternalValue::from_components(
333 b"abc".to_vec(),
334 b"abc".to_vec(),
335 0,
336 ValueType::Value,
337 ));
338 memtable.insert(InternalValue::from_components(
339 b"abc".to_vec(),
340 b"abc".to_vec(),
341 99,
342 ValueType::Value,
343 ));
344 memtable.insert(InternalValue::from_components(
345 b"abc".to_vec(),
346 b"abc".to_vec(),
347 255,
348 ValueType::Value,
349 ));
350
351 assert_eq!(
352 Some(InternalValue::from_components(
353 b"abc".to_vec(),
354 b"abc".to_vec(),
355 255,
356 ValueType::Value,
357 )),
358 memtable.get(b"abc", SeqNo::MAX)
359 );
360
361 assert_eq!(
362 Some(InternalValue::from_components(
363 b"abc".to_vec(),
364 b"abc".to_vec(),
365 99,
366 ValueType::Value,
367 )),
368 memtable.get(b"abc", 100)
369 );
370
371 assert_eq!(
372 Some(InternalValue::from_components(
373 b"abc".to_vec(),
374 b"abc".to_vec(),
375 0,
376 ValueType::Value,
377 )),
378 memtable.get(b"abc", 50)
379 );
380 }
381}