1use std::ops::Bound;
2use std::path::Path;
3use std::sync::atomic::AtomicUsize;
4use std::sync::Arc;
5
6use anyhow::Result;
7use bytes::Bytes;
8use crossbeam_skiplist::map::Entry;
9use crossbeam_skiplist::SkipMap;
10use ouroboros::self_referencing;
11
12use crate::iterators::StorageIterator;
13use crate::key::KeySlice;
14use crate::table::SsTableBuilder;
15use crate::wal::Wal;
16
17pub struct MemTable {
22 map: Arc<SkipMap<Bytes, Bytes>>,
23 wal: Option<Wal>,
24 id: usize,
25 approximate_size: Arc<AtomicUsize>,
26}
27
28pub(crate) fn map_bound(bound: Bound<&[u8]>) -> Bound<Bytes> {
30 match bound {
31 Bound::Included(x) => Bound::Included(Bytes::copy_from_slice(x)),
32 Bound::Excluded(x) => Bound::Excluded(Bytes::copy_from_slice(x)),
33 Bound::Unbounded => Bound::Unbounded,
34 }
35}
36
37impl MemTable {
38 pub fn create(id: usize) -> Self {
40 Self {
41 id,
42 map: Arc::new(SkipMap::new()),
43 wal: None,
44 approximate_size: Arc::new(AtomicUsize::new(0)),
45 }
46 }
47
48 pub fn create_with_wal(id: usize, path: impl AsRef<Path>) -> Result<Self> {
50 Ok(Self {
51 id,
52 map: Arc::new(SkipMap::new()),
53 wal: Some(Wal::create(path.as_ref())?),
54 approximate_size: Arc::new(AtomicUsize::new(0)),
55 })
56 }
57
58 pub fn recover_from_wal(id: usize, path: impl AsRef<Path>) -> Result<Self> {
60 let map = Arc::new(SkipMap::new());
61 Ok(Self {
62 id,
63 wal: Some(Wal::recover(path.as_ref(), &map)?),
64 map,
65 approximate_size: Arc::new(AtomicUsize::new(0)),
66 })
67 }
68
69 pub fn for_testing_put_slice(&self, key: &[u8], value: &[u8]) -> Result<()> {
70 self.put(key, value)
71 }
72
73 pub fn for_testing_get_slice(&self, key: &[u8]) -> Option<Bytes> {
74 self.get(key)
75 }
76
77 pub fn for_testing_scan_slice(
78 &self,
79 lower: Bound<&[u8]>,
80 upper: Bound<&[u8]>,
81 ) -> MemTableIterator {
82 self.scan(lower, upper)
83 }
84
85 pub fn get(&self, key: &[u8]) -> Option<Bytes> {
87 self.map.get(key).map(|e| e.value().clone())
88 }
89
90 pub fn put(&self, key: &[u8], value: &[u8]) -> Result<()> {
95 let estimated_size = key.len() + value.len();
96 self.map
97 .insert(Bytes::copy_from_slice(key), Bytes::copy_from_slice(value));
98 self.approximate_size
99 .fetch_add(estimated_size, std::sync::atomic::Ordering::Relaxed);
100 if let Some(ref wal) = self.wal {
101 wal.put(key, value)?;
102 }
103 Ok(())
104 }
105
106 pub fn sync_wal(&self) -> Result<()> {
107 if let Some(ref wal) = self.wal {
108 wal.sync()?;
109 }
110 Ok(())
111 }
112
113 pub fn scan(&self, lower: Bound<&[u8]>, upper: Bound<&[u8]>) -> MemTableIterator {
115 let (lower, upper) = (map_bound(lower), map_bound(upper));
116 let mut iter = MemTableIteratorBuilder {
117 map: self.map.clone(),
118 iter_builder: |map| map.range((lower, upper)),
119 item: (Bytes::new(), Bytes::new()),
120 }
121 .build();
122 iter.next().unwrap();
123 iter
124 }
125
126 pub fn flush(&self, builder: &mut SsTableBuilder) -> Result<()> {
128 for entry in self.map.iter() {
129 builder.add(KeySlice::from_slice(&entry.key()[..]), &entry.value()[..]);
130 }
131 Ok(())
132 }
133
134 pub fn id(&self) -> usize {
135 self.id
136 }
137
138 pub fn approximate_size(&self) -> usize {
139 self.approximate_size
140 .load(std::sync::atomic::Ordering::Relaxed)
141 }
142
143 pub fn is_empty(&self) -> bool {
145 self.map.is_empty()
146 }
147}
148
149type SkipMapRangeIter<'a> =
150 crossbeam_skiplist::map::Range<'a, Bytes, (Bound<Bytes>, Bound<Bytes>), Bytes, Bytes>;
151
152#[self_referencing]
157pub struct MemTableIterator {
158 map: Arc<SkipMap<Bytes, Bytes>>,
160 #[borrows(map)]
162 #[not_covariant]
163 iter: SkipMapRangeIter<'this>,
164 item: (Bytes, Bytes),
166}
167
168impl MemTableIterator {
169 fn entry_to_item(entry: Option<Entry<'_, Bytes, Bytes>>) -> (Bytes, Bytes) {
170 entry
171 .map(|x| (x.key().clone(), x.value().clone()))
172 .unwrap_or_else(|| (Bytes::from_static(&[]), Bytes::from_static(&[])))
173 }
174}
175
176impl StorageIterator for MemTableIterator {
177 type KeyType<'a> = KeySlice<'a>;
178
179 fn value(&self) -> &[u8] {
180 &self.borrow_item().1[..]
181 }
182
183 fn key(&self) -> KeySlice {
184 KeySlice::from_slice(&self.borrow_item().0[..])
185 }
186
187 fn is_valid(&self) -> bool {
188 !self.borrow_item().0.is_empty()
189 }
190
191 fn next(&mut self) -> Result<()> {
192 let entry = self.with_iter_mut(|iter| MemTableIterator::entry_to_item(iter.next()));
193 self.with_mut(|x| *x.item = entry);
194 Ok(())
195 }
196}