1use crate::key::InternalKey;
6use crate::segment::block::ItemSize;
7use crate::value::{InternalValue, SeqNo, UserValue, ValueType};
8use crossbeam_skiplist::SkipMap;
9use std::ops::RangeBounds;
10use std::sync::atomic::{AtomicU32, AtomicU64};
11
12#[derive(Default)]
16pub struct Memtable {
17 #[doc(hidden)]
19 pub items: SkipMap<InternalKey, UserValue>,
20
21 pub(crate) approximate_size: AtomicU32,
25
26 pub(crate) highest_seqno: AtomicU64,
30}
31
32impl Memtable {
33 pub fn clear(&mut self) {
35 self.items.clear();
36 self.highest_seqno = AtomicU64::new(0);
37 self.approximate_size
38 .store(0, std::sync::atomic::Ordering::Release);
39 }
40
41 pub fn iter(&self) -> impl DoubleEndedIterator<Item = InternalValue> + '_ {
43 self.items.iter().map(|entry| InternalValue {
44 key: entry.key().clone(),
45 value: entry.value().clone(),
46 })
47 }
48
49 pub(crate) fn range<'a, R: RangeBounds<InternalKey> + 'a>(
51 &'a self,
52 range: R,
53 ) -> impl DoubleEndedIterator<Item = InternalValue> + 'a {
54 self.items.range(range).map(|entry| InternalValue {
55 key: entry.key().clone(),
56 value: entry.value().clone(),
57 })
58 }
59
60 #[doc(hidden)]
64 pub fn get<K: AsRef<[u8]>>(&self, key: K, seqno: Option<SeqNo>) -> Option<InternalValue> {
65 if seqno == Some(0) {
66 return None;
67 }
68
69 let key = key.as_ref();
70
71 let lower_bound = InternalKey::new(
88 key,
89 match seqno {
90 Some(seqno) => seqno - 1,
91 None => SeqNo::MAX,
92 },
93 ValueType::Value,
94 );
95
96 let mut iter = self
97 .items
98 .range(lower_bound..)
99 .take_while(|entry| &*entry.key().user_key == key);
100
101 iter.next().map(|entry| InternalValue {
102 key: entry.key().clone(),
103 value: entry.value().clone(),
104 })
105 }
106
107 pub fn size(&self) -> u32 {
109 self.approximate_size
110 .load(std::sync::atomic::Ordering::Acquire)
111 }
112
113 pub fn len(&self) -> usize {
115 self.items.len()
116 }
117
118 #[must_use]
120 pub fn is_empty(&self) -> bool {
121 self.items.is_empty()
122 }
123
124 #[doc(hidden)]
126 pub fn insert(&self, item: InternalValue) -> (u32, u32) {
127 #[allow(clippy::cast_possible_truncation)]
129 let item_size = item.size() as u32;
130
131 let size_before = self
132 .approximate_size
133 .fetch_add(item_size, std::sync::atomic::Ordering::AcqRel);
134
135 let key = InternalKey::new(item.key.user_key, item.key.seqno, item.key.value_type);
136 self.items.insert(key, item.value);
137
138 self.highest_seqno
139 .fetch_max(item.key.seqno, std::sync::atomic::Ordering::AcqRel);
140
141 (item_size, size_before + item_size)
142 }
143
144 pub fn get_highest_seqno(&self) -> Option<SeqNo> {
146 if self.is_empty() {
147 None
148 } else {
149 Some(
150 self.highest_seqno
151 .load(std::sync::atomic::Ordering::Acquire),
152 )
153 }
154 }
155}
156
157#[cfg(test)]
158mod tests {
159 use super::*;
160 use crate::value::ValueType;
161 use test_log::test;
162
163 #[test]
164 #[allow(clippy::unwrap_used)]
165 fn memtable_mvcc_point_read() {
166 let memtable = Memtable::default();
167
168 memtable.insert(InternalValue::from_components(
169 *b"hello-key-999991",
170 *b"hello-value-999991",
171 0,
172 ValueType::Value,
173 ));
174
175 let item = memtable.get("hello-key-99999", None);
176 assert_eq!(None, item);
177
178 let item = memtable.get("hello-key-999991", None);
179 assert_eq!(*b"hello-value-999991", &*item.unwrap().value);
180
181 memtable.insert(InternalValue::from_components(
182 *b"hello-key-999991",
183 *b"hello-value-999991-2",
184 1,
185 ValueType::Value,
186 ));
187
188 let item = memtable.get("hello-key-99999", None);
189 assert_eq!(None, item);
190
191 let item = memtable.get("hello-key-999991", None);
192 assert_eq!((*b"hello-value-999991-2"), &*item.unwrap().value);
193
194 let item = memtable.get("hello-key-99999", Some(1));
195 assert_eq!(None, item);
196
197 let item = memtable.get("hello-key-999991", Some(1));
198 assert_eq!((*b"hello-value-999991"), &*item.unwrap().value);
199
200 let item = memtable.get("hello-key-99999", Some(2));
201 assert_eq!(None, item);
202
203 let item = memtable.get("hello-key-999991", Some(2));
204 assert_eq!((*b"hello-value-999991-2"), &*item.unwrap().value);
205 }
206
207 #[test]
208 fn memtable_get() {
209 let memtable = Memtable::default();
210
211 let value =
212 InternalValue::from_components(b"abc".to_vec(), b"abc".to_vec(), 0, ValueType::Value);
213
214 memtable.insert(value.clone());
215
216 assert_eq!(Some(value), memtable.get("abc", None));
217 }
218
219 #[test]
220 fn memtable_get_highest_seqno() {
221 let memtable = Memtable::default();
222
223 memtable.insert(InternalValue::from_components(
224 b"abc".to_vec(),
225 b"abc".to_vec(),
226 0,
227 ValueType::Value,
228 ));
229 memtable.insert(InternalValue::from_components(
230 b"abc".to_vec(),
231 b"abc".to_vec(),
232 1,
233 ValueType::Value,
234 ));
235 memtable.insert(InternalValue::from_components(
236 b"abc".to_vec(),
237 b"abc".to_vec(),
238 2,
239 ValueType::Value,
240 ));
241 memtable.insert(InternalValue::from_components(
242 b"abc".to_vec(),
243 b"abc".to_vec(),
244 3,
245 ValueType::Value,
246 ));
247 memtable.insert(InternalValue::from_components(
248 b"abc".to_vec(),
249 b"abc".to_vec(),
250 4,
251 ValueType::Value,
252 ));
253
254 assert_eq!(
255 Some(InternalValue::from_components(
256 b"abc".to_vec(),
257 b"abc".to_vec(),
258 4,
259 ValueType::Value,
260 )),
261 memtable.get("abc", None)
262 );
263 }
264
265 #[test]
266 fn memtable_get_prefix() {
267 let memtable = Memtable::default();
268
269 memtable.insert(InternalValue::from_components(
270 b"abc0".to_vec(),
271 b"abc".to_vec(),
272 0,
273 ValueType::Value,
274 ));
275 memtable.insert(InternalValue::from_components(
276 b"abc".to_vec(),
277 b"abc".to_vec(),
278 255,
279 ValueType::Value,
280 ));
281
282 assert_eq!(
283 Some(InternalValue::from_components(
284 b"abc".to_vec(),
285 b"abc".to_vec(),
286 255,
287 ValueType::Value,
288 )),
289 memtable.get("abc", None)
290 );
291
292 assert_eq!(
293 Some(InternalValue::from_components(
294 b"abc0".to_vec(),
295 b"abc".to_vec(),
296 0,
297 ValueType::Value,
298 )),
299 memtable.get("abc0", None)
300 );
301 }
302
303 #[test]
304 fn memtable_get_old_version() {
305 let memtable = Memtable::default();
306
307 memtable.insert(InternalValue::from_components(
308 b"abc".to_vec(),
309 b"abc".to_vec(),
310 0,
311 ValueType::Value,
312 ));
313 memtable.insert(InternalValue::from_components(
314 b"abc".to_vec(),
315 b"abc".to_vec(),
316 99,
317 ValueType::Value,
318 ));
319 memtable.insert(InternalValue::from_components(
320 b"abc".to_vec(),
321 b"abc".to_vec(),
322 255,
323 ValueType::Value,
324 ));
325
326 assert_eq!(
327 Some(InternalValue::from_components(
328 b"abc".to_vec(),
329 b"abc".to_vec(),
330 255,
331 ValueType::Value,
332 )),
333 memtable.get("abc", None)
334 );
335
336 assert_eq!(
337 Some(InternalValue::from_components(
338 b"abc".to_vec(),
339 b"abc".to_vec(),
340 99,
341 ValueType::Value,
342 )),
343 memtable.get("abc", Some(100))
344 );
345
346 assert_eq!(
347 Some(InternalValue::from_components(
348 b"abc".to_vec(),
349 b"abc".to_vec(),
350 0,
351 ValueType::Value,
352 )),
353 memtable.get("abc", Some(50))
354 );
355 }
356}