mini_lsm/
mem_table.rs

1use std::ops::Bound;
2use std::path::Path;
3use std::sync::atomic::AtomicUsize;
4use std::sync::Arc;
5
6use anyhow::Result;
7use bytes::Bytes;
8use crossbeam_skiplist::map::Entry;
9use crossbeam_skiplist::SkipMap;
10use ouroboros::self_referencing;
11
12use crate::iterators::StorageIterator;
13use crate::key::KeySlice;
14use crate::table::SsTableBuilder;
15use crate::wal::Wal;
16
17/// A basic mem-table based on crossbeam-skiplist.
18///
19/// An initial implementation of memtable is part of week 1, day 1. It will be incrementally implemented in other
20/// chapters of week 1 and week 2.
21pub struct MemTable {
22    map: Arc<SkipMap<Bytes, Bytes>>,
23    wal: Option<Wal>,
24    id: usize,
25    approximate_size: Arc<AtomicUsize>,
26}
27
28/// Create a bound of `Bytes` from a bound of `&[u8]`.
29pub(crate) fn map_bound(bound: Bound<&[u8]>) -> Bound<Bytes> {
30    match bound {
31        Bound::Included(x) => Bound::Included(Bytes::copy_from_slice(x)),
32        Bound::Excluded(x) => Bound::Excluded(Bytes::copy_from_slice(x)),
33        Bound::Unbounded => Bound::Unbounded,
34    }
35}
36
37impl MemTable {
38    /// Create a new mem-table.
39    pub fn create(id: usize) -> Self {
40        Self {
41            id,
42            map: Arc::new(SkipMap::new()),
43            wal: None,
44            approximate_size: Arc::new(AtomicUsize::new(0)),
45        }
46    }
47
48    /// Create a new mem-table with WAL
49    pub fn create_with_wal(id: usize, path: impl AsRef<Path>) -> Result<Self> {
50        Ok(Self {
51            id,
52            map: Arc::new(SkipMap::new()),
53            wal: Some(Wal::create(path.as_ref())?),
54            approximate_size: Arc::new(AtomicUsize::new(0)),
55        })
56    }
57
58    /// Create a memtable from WAL
59    pub fn recover_from_wal(id: usize, path: impl AsRef<Path>) -> Result<Self> {
60        let map = Arc::new(SkipMap::new());
61        Ok(Self {
62            id,
63            wal: Some(Wal::recover(path.as_ref(), &map)?),
64            map,
65            approximate_size: Arc::new(AtomicUsize::new(0)),
66        })
67    }
68
69    pub fn for_testing_put_slice(&self, key: &[u8], value: &[u8]) -> Result<()> {
70        self.put(key, value)
71    }
72
73    pub fn for_testing_get_slice(&self, key: &[u8]) -> Option<Bytes> {
74        self.get(key)
75    }
76
77    pub fn for_testing_scan_slice(
78        &self,
79        lower: Bound<&[u8]>,
80        upper: Bound<&[u8]>,
81    ) -> MemTableIterator {
82        self.scan(lower, upper)
83    }
84
85    /// Get a value by key.
86    pub fn get(&self, key: &[u8]) -> Option<Bytes> {
87        self.map.get(key).map(|e| e.value().clone())
88    }
89
90    /// Put a key-value pair into the mem-table.
91    ///
92    /// In week 1, day 1, simply put the key-value pair into the skipmap.
93    /// In week 2, day 6, also flush the data to WAL.
94    pub fn put(&self, key: &[u8], value: &[u8]) -> Result<()> {
95        let estimated_size = key.len() + value.len();
96        self.map
97            .insert(Bytes::copy_from_slice(key), Bytes::copy_from_slice(value));
98        self.approximate_size
99            .fetch_add(estimated_size, std::sync::atomic::Ordering::Relaxed);
100        if let Some(ref wal) = self.wal {
101            wal.put(key, value)?;
102        }
103        Ok(())
104    }
105
106    pub fn sync_wal(&self) -> Result<()> {
107        if let Some(ref wal) = self.wal {
108            wal.sync()?;
109        }
110        Ok(())
111    }
112
113    /// Get an iterator over a range of keys.
114    pub fn scan(&self, lower: Bound<&[u8]>, upper: Bound<&[u8]>) -> MemTableIterator {
115        let (lower, upper) = (map_bound(lower), map_bound(upper));
116        let mut iter = MemTableIteratorBuilder {
117            map: self.map.clone(),
118            iter_builder: |map| map.range((lower, upper)),
119            item: (Bytes::new(), Bytes::new()),
120        }
121        .build();
122        iter.next().unwrap();
123        iter
124    }
125
126    /// Flush the mem-table to SSTable. Implement in week 1 day 6.
127    pub fn flush(&self, builder: &mut SsTableBuilder) -> Result<()> {
128        for entry in self.map.iter() {
129            builder.add(KeySlice::from_slice(&entry.key()[..]), &entry.value()[..]);
130        }
131        Ok(())
132    }
133
134    pub fn id(&self) -> usize {
135        self.id
136    }
137
138    pub fn approximate_size(&self) -> usize {
139        self.approximate_size
140            .load(std::sync::atomic::Ordering::Relaxed)
141    }
142
143    /// Only use this function when closing the database
144    pub fn is_empty(&self) -> bool {
145        self.map.is_empty()
146    }
147}
148
149type SkipMapRangeIter<'a> =
150    crossbeam_skiplist::map::Range<'a, Bytes, (Bound<Bytes>, Bound<Bytes>), Bytes, Bytes>;
151
152/// An iterator over a range of `SkipMap`. This is a self-referential structure and please refer to week 1, day 2
153/// chapter for more information.
154///
155/// This is part of week 1, day 2.
156#[self_referencing]
157pub struct MemTableIterator {
158    /// Stores a reference to the skipmap.
159    map: Arc<SkipMap<Bytes, Bytes>>,
160    /// Stores a skipmap iterator that refers to the lifetime of `MemTableIterator` itself.
161    #[borrows(map)]
162    #[not_covariant]
163    iter: SkipMapRangeIter<'this>,
164    /// Stores the current key-value pair.
165    item: (Bytes, Bytes),
166}
167
168impl MemTableIterator {
169    fn entry_to_item(entry: Option<Entry<'_, Bytes, Bytes>>) -> (Bytes, Bytes) {
170        entry
171            .map(|x| (x.key().clone(), x.value().clone()))
172            .unwrap_or_else(|| (Bytes::from_static(&[]), Bytes::from_static(&[])))
173    }
174}
175
176impl StorageIterator for MemTableIterator {
177    type KeyType<'a> = KeySlice<'a>;
178
179    fn value(&self) -> &[u8] {
180        &self.borrow_item().1[..]
181    }
182
183    fn key(&self) -> KeySlice {
184        KeySlice::from_slice(&self.borrow_item().0[..])
185    }
186
187    fn is_valid(&self) -> bool {
188        !self.borrow_item().0.is_empty()
189    }
190
191    fn next(&mut self) -> Result<()> {
192        let entry = self.with_iter_mut(|iter| MemTableIterator::entry_to_item(iter.next()));
193        self.with_mut(|x| *x.item = entry);
194        Ok(())
195    }
196}