1use crate::key::InternalKey;
6use crate::segment::block::ItemSize;
7use crate::value::{InternalValue, SeqNo, UserValue, ValueType};
8use crossbeam_skiplist::SkipMap;
9use std::ops::RangeBounds;
10use std::sync::atomic::{AtomicU32, AtomicU64};
11
12#[derive(Default)]
16pub struct Memtable {
17 #[doc(hidden)]
19 pub items: SkipMap<InternalKey, UserValue>,
20
21 pub(crate) approximate_size: AtomicU32,
25
26 pub(crate) highest_seqno: AtomicU64,
30}
31
32impl Memtable {
33 pub fn clear(&mut self) {
35 self.items.clear();
36 self.highest_seqno = AtomicU64::new(0);
37 self.approximate_size
38 .store(0, std::sync::atomic::Ordering::Release);
39 }
40
41 pub fn iter(&self) -> impl DoubleEndedIterator<Item = InternalValue> + '_ {
43 self.items.iter().map(|entry| InternalValue {
44 key: entry.key().clone(),
45 value: entry.value().clone(),
46 })
47 }
48
49 pub(crate) fn range<'a, R: RangeBounds<InternalKey> + 'a>(
51 &'a self,
52 range: R,
53 ) -> impl DoubleEndedIterator<Item = InternalValue> + 'a {
54 self.items.range(range).map(|entry| InternalValue {
55 key: entry.key().clone(),
56 value: entry.value().clone(),
57 })
58 }
59
60 #[doc(hidden)]
64 pub fn get(&self, key: &[u8], seqno: Option<SeqNo>) -> Option<InternalValue> {
65 if seqno == Some(0) {
66 return None;
67 }
68
69 let lower_bound = InternalKey::new(
86 key,
87 match seqno {
88 Some(seqno) => seqno - 1,
89 None => SeqNo::MAX,
90 },
91 ValueType::Value,
92 );
93
94 let mut iter = self
95 .items
96 .range(lower_bound..)
97 .take_while(|entry| &*entry.key().user_key == key);
98
99 iter.next().map(|entry| InternalValue {
100 key: entry.key().clone(),
101 value: entry.value().clone(),
102 })
103 }
104
105 pub fn size(&self) -> u32 {
107 self.approximate_size
108 .load(std::sync::atomic::Ordering::Acquire)
109 }
110
111 pub fn len(&self) -> usize {
113 self.items.len()
114 }
115
116 #[must_use]
118 pub fn is_empty(&self) -> bool {
119 self.items.is_empty()
120 }
121
122 #[doc(hidden)]
124 pub fn insert(&self, item: InternalValue) -> (u32, u32) {
125 #[allow(clippy::cast_possible_truncation)]
127 let item_size = item.size() as u32;
128
129 let size_before = self
130 .approximate_size
131 .fetch_add(item_size, std::sync::atomic::Ordering::AcqRel);
132
133 let key = InternalKey::new(item.key.user_key, item.key.seqno, item.key.value_type);
134 self.items.insert(key, item.value);
135
136 self.highest_seqno
137 .fetch_max(item.key.seqno, std::sync::atomic::Ordering::AcqRel);
138
139 (item_size, size_before + item_size)
140 }
141
142 pub fn get_highest_seqno(&self) -> Option<SeqNo> {
144 if self.is_empty() {
145 None
146 } else {
147 Some(
148 self.highest_seqno
149 .load(std::sync::atomic::Ordering::Acquire),
150 )
151 }
152 }
153}
154
155#[cfg(test)]
156mod tests {
157 use super::*;
158 use crate::value::ValueType;
159 use test_log::test;
160
161 #[test]
162 #[allow(clippy::unwrap_used)]
163 fn memtable_mvcc_point_read() {
164 let memtable = Memtable::default();
165
166 memtable.insert(InternalValue::from_components(
167 *b"hello-key-999991",
168 *b"hello-value-999991",
169 0,
170 ValueType::Value,
171 ));
172
173 let item = memtable.get(b"hello-key-99999", None);
174 assert_eq!(None, item);
175
176 let item = memtable.get(b"hello-key-999991", None);
177 assert_eq!(*b"hello-value-999991", &*item.unwrap().value);
178
179 memtable.insert(InternalValue::from_components(
180 *b"hello-key-999991",
181 *b"hello-value-999991-2",
182 1,
183 ValueType::Value,
184 ));
185
186 let item = memtable.get(b"hello-key-99999", None);
187 assert_eq!(None, item);
188
189 let item = memtable.get(b"hello-key-999991", None);
190 assert_eq!((*b"hello-value-999991-2"), &*item.unwrap().value);
191
192 let item = memtable.get(b"hello-key-99999", Some(1));
193 assert_eq!(None, item);
194
195 let item = memtable.get(b"hello-key-999991", Some(1));
196 assert_eq!((*b"hello-value-999991"), &*item.unwrap().value);
197
198 let item = memtable.get(b"hello-key-99999", Some(2));
199 assert_eq!(None, item);
200
201 let item = memtable.get(b"hello-key-999991", Some(2));
202 assert_eq!((*b"hello-value-999991-2"), &*item.unwrap().value);
203 }
204
205 #[test]
206 fn memtable_get() {
207 let memtable = Memtable::default();
208
209 let value =
210 InternalValue::from_components(b"abc".to_vec(), b"abc".to_vec(), 0, ValueType::Value);
211
212 memtable.insert(value.clone());
213
214 assert_eq!(Some(value), memtable.get(b"abc", None));
215 }
216
217 #[test]
218 fn memtable_get_highest_seqno() {
219 let memtable = Memtable::default();
220
221 memtable.insert(InternalValue::from_components(
222 b"abc".to_vec(),
223 b"abc".to_vec(),
224 0,
225 ValueType::Value,
226 ));
227 memtable.insert(InternalValue::from_components(
228 b"abc".to_vec(),
229 b"abc".to_vec(),
230 1,
231 ValueType::Value,
232 ));
233 memtable.insert(InternalValue::from_components(
234 b"abc".to_vec(),
235 b"abc".to_vec(),
236 2,
237 ValueType::Value,
238 ));
239 memtable.insert(InternalValue::from_components(
240 b"abc".to_vec(),
241 b"abc".to_vec(),
242 3,
243 ValueType::Value,
244 ));
245 memtable.insert(InternalValue::from_components(
246 b"abc".to_vec(),
247 b"abc".to_vec(),
248 4,
249 ValueType::Value,
250 ));
251
252 assert_eq!(
253 Some(InternalValue::from_components(
254 b"abc".to_vec(),
255 b"abc".to_vec(),
256 4,
257 ValueType::Value,
258 )),
259 memtable.get(b"abc", None)
260 );
261 }
262
263 #[test]
264 fn memtable_get_prefix() {
265 let memtable = Memtable::default();
266
267 memtable.insert(InternalValue::from_components(
268 b"abc0".to_vec(),
269 b"abc".to_vec(),
270 0,
271 ValueType::Value,
272 ));
273 memtable.insert(InternalValue::from_components(
274 b"abc".to_vec(),
275 b"abc".to_vec(),
276 255,
277 ValueType::Value,
278 ));
279
280 assert_eq!(
281 Some(InternalValue::from_components(
282 b"abc".to_vec(),
283 b"abc".to_vec(),
284 255,
285 ValueType::Value,
286 )),
287 memtable.get(b"abc", None)
288 );
289
290 assert_eq!(
291 Some(InternalValue::from_components(
292 b"abc0".to_vec(),
293 b"abc".to_vec(),
294 0,
295 ValueType::Value,
296 )),
297 memtable.get(b"abc0", None)
298 );
299 }
300
301 #[test]
302 fn memtable_get_old_version() {
303 let memtable = Memtable::default();
304
305 memtable.insert(InternalValue::from_components(
306 b"abc".to_vec(),
307 b"abc".to_vec(),
308 0,
309 ValueType::Value,
310 ));
311 memtable.insert(InternalValue::from_components(
312 b"abc".to_vec(),
313 b"abc".to_vec(),
314 99,
315 ValueType::Value,
316 ));
317 memtable.insert(InternalValue::from_components(
318 b"abc".to_vec(),
319 b"abc".to_vec(),
320 255,
321 ValueType::Value,
322 ));
323
324 assert_eq!(
325 Some(InternalValue::from_components(
326 b"abc".to_vec(),
327 b"abc".to_vec(),
328 255,
329 ValueType::Value,
330 )),
331 memtable.get(b"abc", None)
332 );
333
334 assert_eq!(
335 Some(InternalValue::from_components(
336 b"abc".to_vec(),
337 b"abc".to_vec(),
338 99,
339 ValueType::Value,
340 )),
341 memtable.get(b"abc", Some(100))
342 );
343
344 assert_eq!(
345 Some(InternalValue::from_components(
346 b"abc".to_vec(),
347 b"abc".to_vec(),
348 0,
349 ValueType::Value,
350 )),
351 memtable.get(b"abc", Some(50))
352 );
353 }
354}