1use crate::error::DbxResult;
8use crate::storage::StorageBackend;
9use crate::transaction::mvcc::version::VersionedKey;
10use crossbeam_skiplist::SkipMap;
11use dashmap::DashMap;
12use std::ops::{Bound, RangeBounds};
13use std::sync::Arc;
14
15const DEFAULT_FLUSH_THRESHOLD: usize = 10_000;
17
18pub struct DeltaStore {
23 #[allow(clippy::type_complexity)]
26 tables: DashMap<String, Arc<SkipMap<VersionedKey, Arc<Vec<u8>>>>>,
27 flush_threshold: usize,
29 entry_count: std::sync::atomic::AtomicUsize,
31}
32
33impl DeltaStore {
34 pub fn new() -> Self {
36 Self::with_threshold(DEFAULT_FLUSH_THRESHOLD)
37 }
38
39 pub fn with_threshold(threshold: usize) -> Self {
41 Self {
42 tables: DashMap::new(),
43 flush_threshold: threshold,
44 entry_count: std::sync::atomic::AtomicUsize::new(0),
45 }
46 }
47
48 pub fn should_flush(&self) -> bool {
50 self.entry_count() >= self.flush_threshold
51 }
52
53 pub fn entry_count(&self) -> usize {
55 self.entry_count.load(std::sync::atomic::Ordering::Relaxed)
56 }
57
58 #[allow(clippy::type_complexity)]
63 pub fn drain_all(&self) -> Vec<(String, Vec<(Vec<u8>, Vec<u8>)>)> {
64 let mut result = Vec::new();
65
66 let table_names: Vec<String> = self.tables.iter().map(|e| e.key().clone()).collect();
68
69 for table_name in table_names {
70 if let Some((_, table_map)) = self.tables.remove(&table_name) {
71 let entries: Vec<(Vec<u8>, Vec<u8>)> = table_map
72 .iter()
73 .map(|e| (e.key().encode(), (**e.value()).clone()))
74 .collect();
75
76 result.push((table_name, entries));
77 }
78 }
79
80 self.entry_count
82 .store(0, std::sync::atomic::Ordering::Relaxed);
83
84 result
85 }
86
87 fn get_or_create_table(&self, table: &str) -> Arc<SkipMap<VersionedKey, Arc<Vec<u8>>>> {
89 self.tables
90 .entry(table.to_string())
91 .or_insert_with(|| Arc::new(SkipMap::new()))
92 .value()
93 .clone()
94 }
95
96 fn to_versioned_key(key: &[u8], default_ts: u64) -> VersionedKey {
98 if key.len() > 8
101 && let Ok(vk) = VersionedKey::decode(key)
102 {
103 return vk;
104 }
105 VersionedKey::new(key.to_vec(), default_ts)
106 }
107
108 fn convert_start_bound(bound: Bound<&Vec<u8>>) -> Bound<VersionedKey> {
110 match bound {
111 Bound::Included(v) => {
112 if v.is_empty() {
113 Bound::Included(VersionedKey::new(vec![], u64::MAX))
114 } else {
115 Bound::Included(Self::to_versioned_key(v, u64::MAX))
116 }
117 }
118 Bound::Excluded(v) => {
119 if v.is_empty() {
120 Bound::Excluded(VersionedKey::new(vec![], u64::MAX))
121 } else {
122 Bound::Excluded(Self::to_versioned_key(v, u64::MAX))
123 }
124 }
125 Bound::Unbounded => Bound::Unbounded,
126 }
127 }
128
129 fn convert_end_bound(bound: Bound<&Vec<u8>>) -> Bound<VersionedKey> {
130 match bound {
131 Bound::Included(v) => {
132 if v.is_empty() {
133 Bound::Included(VersionedKey::new(vec![], 0))
134 } else {
135 Bound::Included(Self::to_versioned_key(v, 0))
136 }
137 }
138 Bound::Excluded(v) => {
139 if v.is_empty() {
140 Bound::Excluded(VersionedKey::new(vec![], 0))
141 } else {
142 Bound::Excluded(Self::to_versioned_key(v, 0))
143 }
144 }
145 Bound::Unbounded => Bound::Unbounded,
146 }
147 }
148}
149
150impl Default for DeltaStore {
151 fn default() -> Self {
152 Self::new()
153 }
154}
155
156impl StorageBackend for DeltaStore {
157 fn insert(&self, table: &str, key: &[u8], value: &[u8]) -> DbxResult<()> {
158 let table_map = self.get_or_create_table(table);
159 let vk = Self::to_versioned_key(key, 0);
162 table_map.insert(vk, Arc::new(value.to_vec()));
163 self.entry_count
164 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
165
166 Ok(())
167 }
168
169 fn insert_batch(&self, table: &str, rows: Vec<(Vec<u8>, Vec<u8>)>) -> DbxResult<()> {
170 let table_map = self.get_or_create_table(table);
171
172 for (key, value) in rows {
173 let vk = Self::to_versioned_key(&key, 0);
174 table_map.insert(vk, Arc::new(value));
175 self.entry_count
176 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
177 }
178
179 Ok(())
180 }
181
182 fn get(&self, table: &str, key: &[u8]) -> DbxResult<Option<Vec<u8>>> {
183 let Some(table_map) = self.tables.get(table) else {
184 return Ok(None);
185 };
186
187 let vk = Self::to_versioned_key(key, 0);
188 Ok(table_map.get(&vk).map(|e| (**e.value()).clone()))
189 }
190
191 fn delete(&self, table: &str, key: &[u8]) -> DbxResult<bool> {
192 let Some(table_map) = self.tables.get(table) else {
193 return Ok(false);
194 };
195
196 let vk = Self::to_versioned_key(key, 0);
197 if table_map.remove(&vk).is_some() {
198 self.entry_count
199 .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
200 Ok(true)
201 } else {
202 Ok(false)
203 }
204 }
205
206 fn scan<R: RangeBounds<Vec<u8>> + Clone>(
207 &self,
208 table: &str,
209 range: R,
210 ) -> DbxResult<Vec<(Vec<u8>, Vec<u8>)>> {
211 let Some(table_map) = self.tables.get(table) else {
212 return Ok(Vec::new());
213 };
214
215 if table_map.is_empty() {
217 return Ok(Vec::new());
218 }
219
220 let start = Self::convert_start_bound(range.start_bound());
221 let end = Self::convert_end_bound(range.end_bound());
222
223 let entries: Vec<(Vec<u8>, Vec<u8>)> = table_map
224 .range((start, end))
225 .map(|e| (e.key().encode(), (**e.value()).clone()))
226 .collect();
227
228 Ok(entries)
229 }
230
231 fn scan_one<R: RangeBounds<Vec<u8>> + Clone>(
232 &self,
233 table: &str,
234 range: R,
235 ) -> DbxResult<Option<(Vec<u8>, Vec<u8>)>> {
236 let Some(table_map) = self.tables.get(table) else {
237 return Ok(None);
238 };
239
240 let start = Self::convert_start_bound(range.start_bound());
241 let end = Self::convert_end_bound(range.end_bound());
242
243 Ok(table_map
244 .range((start, end))
245 .next()
246 .map(|e| (e.key().encode(), (**e.value()).clone())))
247 }
248
249 fn flush(&self) -> DbxResult<()> {
250 Ok(())
251 }
252
253 fn count(&self, table: &str) -> DbxResult<usize> {
254 let Some(table_map) = self.tables.get(table) else {
255 return Ok(0);
256 };
257
258 Ok(table_map.len())
259 }
260
261 fn table_names(&self) -> DbxResult<Vec<String>> {
262 Ok(self.tables.iter().map(|e| e.key().clone()).collect())
263 }
264}
265
266#[cfg(test)]
267mod tests {
268 use super::*;
269
270 #[test]
271 fn insert_and_get() {
272 let store = DeltaStore::new();
273 store.insert("users", b"key1", b"value1").unwrap();
274 let result = store.get("users", b"key1").unwrap();
275 assert_eq!(result, Some(b"value1".to_vec()));
276 }
277
278 #[test]
279 fn test_versioned_storage() {
280 let store = DeltaStore::new();
281 let vk1 = VersionedKey::new(b"key1".to_vec(), 100);
282 let vk2 = VersionedKey::new(b"key1".to_vec(), 200);
283
284 store.insert("users", &vk1.encode(), b"v1").unwrap();
285 store.insert("users", &vk2.encode(), b"v2").unwrap();
286
287 assert_eq!(
289 store.get("users", &vk1.encode()).unwrap(),
290 Some(b"v1".to_vec())
291 );
292 assert_eq!(
293 store.get("users", &vk2.encode()).unwrap(),
294 Some(b"v2".to_vec())
295 );
296
297 let results = store.scan("users", Vec::<u8>::new()..).unwrap();
299 assert_eq!(results.len(), 2);
300 assert_eq!(VersionedKey::decode(&results[0].0).unwrap().commit_ts, 200);
301 assert_eq!(VersionedKey::decode(&results[1].0).unwrap().commit_ts, 100);
302 }
303
304 #[test]
305 fn delete_existing_key() {
306 let store = DeltaStore::new();
307 store.insert("users", b"key1", b"value1").unwrap();
308 assert!(store.delete("users", b"key1").unwrap());
309 assert_eq!(store.get("users", b"key1").unwrap(), None);
310 }
311
312 #[test]
313 fn entry_count_tracking() {
314 let store = DeltaStore::new();
315 assert_eq!(store.entry_count(), 0);
316 store.insert("t1", b"a", b"1").unwrap();
317 store.insert("t1", b"b", b"2").unwrap();
318 store.insert("t2", b"c", b"3").unwrap();
319 assert_eq!(store.entry_count(), 3);
320 }
321}