loro_internal/
kv_store.rs

1use crate::sync::Mutex;
2use bytes::Bytes;
3pub use loro_kv_store::compress::CompressionType;
4pub use loro_kv_store::MemKvStore;
5use std::sync::Arc;
6use std::{collections::BTreeMap, ops::Bound};
7
8pub trait KvStore: std::fmt::Debug + Send + Sync {
9    fn get(&self, key: &[u8]) -> Option<Bytes>;
10    fn set(&mut self, key: &[u8], value: Bytes);
11    fn compare_and_swap(&mut self, key: &[u8], old: Option<Bytes>, new: Bytes) -> bool;
12    fn remove(&mut self, key: &[u8]) -> Option<Bytes>;
13    fn contains_key(&self, key: &[u8]) -> bool;
14    fn scan(
15        &self,
16        start: Bound<&[u8]>,
17        end: Bound<&[u8]>,
18    ) -> Box<dyn DoubleEndedIterator<Item = (Bytes, Bytes)> + '_>;
19    fn len(&self) -> usize;
20    fn is_empty(&self) -> bool;
21    fn size(&self) -> usize;
22    fn export_all(&mut self) -> Bytes;
23    fn import_all(&mut self, bytes: Bytes) -> Result<(), String>;
24    fn clone_store(&self) -> Arc<Mutex<dyn KvStore>>;
25}
26
27fn get_common_prefix_len_and_strip<'a, T: AsRef<[u8]> + ?Sized>(
28    this: &'a T,
29    last: &T,
30) -> (u8, &'a [u8]) {
31    let mut common_prefix_len = 0;
32    for (i, (a, b)) in this.as_ref().iter().zip(last.as_ref().iter()).enumerate() {
33        if a != b || i == 255 {
34            common_prefix_len = i;
35            break;
36        }
37    }
38
39    let suffix = &this.as_ref()[common_prefix_len..];
40    (common_prefix_len as u8, suffix)
41}
42
43impl KvStore for MemKvStore {
44    fn get(&self, key: &[u8]) -> Option<Bytes> {
45        self.get(key)
46    }
47
48    fn set(&mut self, key: &[u8], value: Bytes) {
49        self.set(key, value)
50    }
51
52    fn compare_and_swap(&mut self, key: &[u8], old: Option<Bytes>, new: Bytes) -> bool {
53        self.compare_and_swap(key, old, new)
54    }
55
56    fn remove(&mut self, key: &[u8]) -> Option<Bytes> {
57        let ans = self.get(key);
58        self.remove(key);
59        ans
60    }
61
62    fn contains_key(&self, key: &[u8]) -> bool {
63        self.contains_key(key)
64    }
65
66    fn scan(
67        &self,
68        start: Bound<&[u8]>,
69        end: Bound<&[u8]>,
70    ) -> Box<dyn DoubleEndedIterator<Item = (Bytes, Bytes)> + '_> {
71        self.scan(start, end)
72    }
73
74    fn len(&self) -> usize {
75        self.len()
76    }
77
78    fn is_empty(&self) -> bool {
79        self.is_empty()
80    }
81
82    fn size(&self) -> usize {
83        self.size()
84    }
85
86    fn export_all(&mut self) -> Bytes {
87        self.export_all()
88    }
89
90    fn import_all(&mut self, bytes: Bytes) -> Result<(), String> {
91        self.import_all(bytes)
92    }
93
94    fn clone_store(&self) -> Arc<Mutex<dyn KvStore>> {
95        Arc::new(Mutex::new(self.clone()))
96    }
97}
98
99mod default_binary_format {
100    //! Default binary format for the key-value store.
101    //!
102    //! It will compress the prefix of the keys that are common with the previous key.
103
104    use bytes::Bytes;
105
106    use super::get_common_prefix_len_and_strip;
107
108    pub fn export_by_scan(store: &impl super::KvStore) -> bytes::Bytes {
109        let mut buf = Vec::new();
110        let mut last_key: Option<Bytes> = None;
111        for (k, v) in store.scan(std::ops::Bound::Unbounded, std::ops::Bound::Unbounded) {
112            {
113                // Write the key
114                match last_key.take() {
115                    None => {
116                        leb128::write::unsigned(&mut buf, k.len() as u64).unwrap();
117                        buf.extend_from_slice(&k);
118                    }
119                    Some(last) => {
120                        let (common, suffix) = get_common_prefix_len_and_strip(&k, &last);
121                        buf.push(common);
122                        leb128::write::unsigned(&mut buf, suffix.len() as u64).unwrap();
123                        buf.extend_from_slice(suffix);
124                    }
125                };
126
127                last_key = Some(k);
128            }
129
130            // Write the value
131            leb128::write::unsigned(&mut buf, v.len() as u64).unwrap();
132            buf.extend_from_slice(&v);
133        }
134
135        buf.into()
136    }
137
138    pub fn import(store: &mut impl super::KvStore, bytes: bytes::Bytes) -> Result<(), String> {
139        let mut bytes: &[u8] = &bytes;
140        let mut last_key = Vec::new();
141
142        while !bytes.is_empty() {
143            // Read the key
144            let mut key = Vec::new();
145            if last_key.is_empty() {
146                let key_len = leb128::read::unsigned(&mut bytes).map_err(|e| e.to_string())?;
147                let key_len = key_len as usize;
148                key.extend_from_slice(&bytes[..key_len]);
149                bytes = &bytes[key_len..];
150            } else {
151                let common_prefix_len = bytes[0] as usize;
152                bytes = &bytes[1..];
153                key.extend_from_slice(&last_key[..common_prefix_len]);
154                let suffix_len = leb128::read::unsigned(&mut bytes).map_err(|e| e.to_string())?;
155                let suffix_len = suffix_len as usize;
156                key.extend_from_slice(&bytes[..suffix_len]);
157                bytes = &bytes[suffix_len..];
158            }
159
160            // Read the value
161            let value_len = leb128::read::unsigned(&mut bytes).map_err(|e| e.to_string())?;
162            let value_len = value_len as usize;
163            let value = Bytes::copy_from_slice(&bytes[..value_len]);
164            bytes = &bytes[value_len..];
165
166            // Store the key-value pair
167            store.set(&key, value);
168
169            last_key = key;
170        }
171
172        Ok(())
173    }
174}
175
176impl KvStore for BTreeMap<Bytes, Bytes> {
177    fn get(&self, key: &[u8]) -> Option<Bytes> {
178        self.get(key).cloned()
179    }
180
181    fn set(&mut self, key: &[u8], value: Bytes) {
182        self.insert(Bytes::copy_from_slice(key), value);
183    }
184
185    fn compare_and_swap(&mut self, key: &[u8], old: Option<Bytes>, new: Bytes) -> bool {
186        let key = Bytes::copy_from_slice(key);
187        match self.get_mut(&key) {
188            Some(v) => {
189                if old.as_ref() == Some(v) {
190                    self.insert(key, new);
191                    true
192                } else {
193                    false
194                }
195            }
196            None => {
197                if old.is_none() {
198                    self.insert(key, new);
199                    true
200                } else {
201                    false
202                }
203            }
204        }
205    }
206
207    fn remove(&mut self, key: &[u8]) -> Option<Bytes> {
208        self.remove(key)
209    }
210
211    fn contains_key(&self, key: &[u8]) -> bool {
212        self.contains_key(key)
213    }
214
215    fn scan(
216        &self,
217        start: Bound<&[u8]>,
218        end: Bound<&[u8]>,
219    ) -> Box<dyn DoubleEndedIterator<Item = (Bytes, Bytes)> + '_> {
220        Box::new(
221            self.range::<[u8], _>((start, end))
222                .map(|(k, v)| (k.clone(), v.clone())),
223        )
224    }
225
226    fn len(&self) -> usize {
227        self.len()
228    }
229
230    fn is_empty(&self) -> bool {
231        self.is_empty()
232    }
233
234    fn size(&self) -> usize {
235        self.iter().fold(0, |acc, (k, v)| acc + k.len() + v.len())
236    }
237
238    fn export_all(&mut self) -> Bytes {
239        default_binary_format::export_by_scan(self)
240    }
241
242    fn import_all(&mut self, bytes: Bytes) -> Result<(), String> {
243        default_binary_format::import(self, bytes)
244    }
245
246    fn clone_store(&self) -> Arc<Mutex<dyn KvStore>> {
247        Arc::new(Mutex::new(self.clone()))
248    }
249}