loro_internal/
kv_store.rs1use crate::sync::Mutex;
2use bytes::Bytes;
3pub use loro_kv_store::compress::CompressionType;
4pub use loro_kv_store::MemKvStore;
5use std::sync::Arc;
6use std::{collections::BTreeMap, ops::Bound};
7
8pub trait KvStore: std::fmt::Debug + Send + Sync {
9 fn get(&self, key: &[u8]) -> Option<Bytes>;
10 fn set(&mut self, key: &[u8], value: Bytes);
11 fn compare_and_swap(&mut self, key: &[u8], old: Option<Bytes>, new: Bytes) -> bool;
12 fn remove(&mut self, key: &[u8]) -> Option<Bytes>;
13 fn contains_key(&self, key: &[u8]) -> bool;
14 fn scan(
15 &self,
16 start: Bound<&[u8]>,
17 end: Bound<&[u8]>,
18 ) -> Box<dyn DoubleEndedIterator<Item = (Bytes, Bytes)> + '_>;
19 fn len(&self) -> usize;
20 fn is_empty(&self) -> bool;
21 fn size(&self) -> usize;
22 fn export_all(&mut self) -> Bytes;
23 fn import_all(&mut self, bytes: Bytes) -> Result<(), String>;
24 fn clone_store(&self) -> Arc<Mutex<dyn KvStore>>;
25}
26
27fn get_common_prefix_len_and_strip<'a, T: AsRef<[u8]> + ?Sized>(
28 this: &'a T,
29 last: &T,
30) -> (u8, &'a [u8]) {
31 let mut common_prefix_len = 0;
32 for (i, (a, b)) in this.as_ref().iter().zip(last.as_ref().iter()).enumerate() {
33 if a != b || i == 255 {
34 common_prefix_len = i;
35 break;
36 }
37 }
38
39 let suffix = &this.as_ref()[common_prefix_len..];
40 (common_prefix_len as u8, suffix)
41}
42
43impl KvStore for MemKvStore {
44 fn get(&self, key: &[u8]) -> Option<Bytes> {
45 self.get(key)
46 }
47
48 fn set(&mut self, key: &[u8], value: Bytes) {
49 self.set(key, value)
50 }
51
52 fn compare_and_swap(&mut self, key: &[u8], old: Option<Bytes>, new: Bytes) -> bool {
53 self.compare_and_swap(key, old, new)
54 }
55
56 fn remove(&mut self, key: &[u8]) -> Option<Bytes> {
57 let ans = self.get(key);
58 self.remove(key);
59 ans
60 }
61
62 fn contains_key(&self, key: &[u8]) -> bool {
63 self.contains_key(key)
64 }
65
66 fn scan(
67 &self,
68 start: Bound<&[u8]>,
69 end: Bound<&[u8]>,
70 ) -> Box<dyn DoubleEndedIterator<Item = (Bytes, Bytes)> + '_> {
71 self.scan(start, end)
72 }
73
74 fn len(&self) -> usize {
75 self.len()
76 }
77
78 fn is_empty(&self) -> bool {
79 self.is_empty()
80 }
81
82 fn size(&self) -> usize {
83 self.size()
84 }
85
86 fn export_all(&mut self) -> Bytes {
87 self.export_all()
88 }
89
90 fn import_all(&mut self, bytes: Bytes) -> Result<(), String> {
91 self.import_all(bytes)
92 }
93
94 fn clone_store(&self) -> Arc<Mutex<dyn KvStore>> {
95 Arc::new(Mutex::new(self.clone()))
96 }
97}
98
99mod default_binary_format {
100 use bytes::Bytes;
105
106 use super::get_common_prefix_len_and_strip;
107
108 pub fn export_by_scan(store: &impl super::KvStore) -> bytes::Bytes {
109 let mut buf = Vec::new();
110 let mut last_key: Option<Bytes> = None;
111 for (k, v) in store.scan(std::ops::Bound::Unbounded, std::ops::Bound::Unbounded) {
112 {
113 match last_key.take() {
115 None => {
116 leb128::write::unsigned(&mut buf, k.len() as u64).unwrap();
117 buf.extend_from_slice(&k);
118 }
119 Some(last) => {
120 let (common, suffix) = get_common_prefix_len_and_strip(&k, &last);
121 buf.push(common);
122 leb128::write::unsigned(&mut buf, suffix.len() as u64).unwrap();
123 buf.extend_from_slice(suffix);
124 }
125 };
126
127 last_key = Some(k);
128 }
129
130 leb128::write::unsigned(&mut buf, v.len() as u64).unwrap();
132 buf.extend_from_slice(&v);
133 }
134
135 buf.into()
136 }
137
138 pub fn import(store: &mut impl super::KvStore, bytes: bytes::Bytes) -> Result<(), String> {
139 let mut bytes: &[u8] = &bytes;
140 let mut last_key = Vec::new();
141
142 while !bytes.is_empty() {
143 let mut key = Vec::new();
145 if last_key.is_empty() {
146 let key_len = leb128::read::unsigned(&mut bytes).map_err(|e| e.to_string())?;
147 let key_len = key_len as usize;
148 key.extend_from_slice(&bytes[..key_len]);
149 bytes = &bytes[key_len..];
150 } else {
151 let common_prefix_len = bytes[0] as usize;
152 bytes = &bytes[1..];
153 key.extend_from_slice(&last_key[..common_prefix_len]);
154 let suffix_len = leb128::read::unsigned(&mut bytes).map_err(|e| e.to_string())?;
155 let suffix_len = suffix_len as usize;
156 key.extend_from_slice(&bytes[..suffix_len]);
157 bytes = &bytes[suffix_len..];
158 }
159
160 let value_len = leb128::read::unsigned(&mut bytes).map_err(|e| e.to_string())?;
162 let value_len = value_len as usize;
163 let value = Bytes::copy_from_slice(&bytes[..value_len]);
164 bytes = &bytes[value_len..];
165
166 store.set(&key, value);
168
169 last_key = key;
170 }
171
172 Ok(())
173 }
174}
175
176impl KvStore for BTreeMap<Bytes, Bytes> {
177 fn get(&self, key: &[u8]) -> Option<Bytes> {
178 self.get(key).cloned()
179 }
180
181 fn set(&mut self, key: &[u8], value: Bytes) {
182 self.insert(Bytes::copy_from_slice(key), value);
183 }
184
185 fn compare_and_swap(&mut self, key: &[u8], old: Option<Bytes>, new: Bytes) -> bool {
186 let key = Bytes::copy_from_slice(key);
187 match self.get_mut(&key) {
188 Some(v) => {
189 if old.as_ref() == Some(v) {
190 self.insert(key, new);
191 true
192 } else {
193 false
194 }
195 }
196 None => {
197 if old.is_none() {
198 self.insert(key, new);
199 true
200 } else {
201 false
202 }
203 }
204 }
205 }
206
207 fn remove(&mut self, key: &[u8]) -> Option<Bytes> {
208 self.remove(key)
209 }
210
211 fn contains_key(&self, key: &[u8]) -> bool {
212 self.contains_key(key)
213 }
214
215 fn scan(
216 &self,
217 start: Bound<&[u8]>,
218 end: Bound<&[u8]>,
219 ) -> Box<dyn DoubleEndedIterator<Item = (Bytes, Bytes)> + '_> {
220 Box::new(
221 self.range::<[u8], _>((start, end))
222 .map(|(k, v)| (k.clone(), v.clone())),
223 )
224 }
225
226 fn len(&self) -> usize {
227 self.len()
228 }
229
230 fn is_empty(&self) -> bool {
231 self.is_empty()
232 }
233
234 fn size(&self) -> usize {
235 self.iter().fold(0, |acc, (k, v)| acc + k.len() + v.len())
236 }
237
238 fn export_all(&mut self) -> Bytes {
239 default_binary_format::export_by_scan(self)
240 }
241
242 fn import_all(&mut self, bytes: Bytes) -> Result<(), String> {
243 default_binary_format::import(self, bytes)
244 }
245
246 fn clone_store(&self) -> Arc<Mutex<dyn KvStore>> {
247 Arc::new(Mutex::new(self.clone()))
248 }
249}