dbx_core/storage/
delta_store.rs1use crate::error::DbxResult;
8use crate::storage::StorageBackend;
9
10use crossbeam_skiplist::SkipMap;
11use dashmap::DashMap;
12use std::ops::{Bound, RangeBounds};
13use std::sync::Arc;
14
15const DEFAULT_FLUSH_THRESHOLD: usize = 10_000;
17
18pub struct DeltaStore {
23 #[allow(clippy::type_complexity)]
26 tables: DashMap<String, Arc<SkipMap<Vec<u8>, Arc<Vec<u8>>>>>,
27 flush_threshold: usize,
29 entry_count: std::sync::atomic::AtomicUsize,
31}
32
33impl DeltaStore {
34 pub fn new() -> Self {
36 Self::with_threshold(DEFAULT_FLUSH_THRESHOLD)
37 }
38
39 pub fn with_threshold(threshold: usize) -> Self {
41 Self {
42 tables: DashMap::new(),
43 flush_threshold: threshold,
44 entry_count: std::sync::atomic::AtomicUsize::new(0),
45 }
46 }
47
48 pub fn should_flush(&self) -> bool {
50 self.entry_count() >= self.flush_threshold
51 }
52
53 pub fn entry_count(&self) -> usize {
55 self.entry_count.load(std::sync::atomic::Ordering::Relaxed)
56 }
57
58 #[allow(clippy::type_complexity)]
61 pub fn drain_all(&self) -> Vec<(String, Vec<(Vec<u8>, Vec<u8>)>)> {
62 let mut result = Vec::new();
63
64 let table_names: Vec<String> = self.tables.iter().map(|e| e.key().clone()).collect();
66
67 for table_name in table_names {
68 if let Some((_, table_map)) = self.tables.remove(&table_name) {
69 let entries: Vec<(Vec<u8>, Vec<u8>)> = table_map
70 .iter()
71 .map(|e| (e.key().clone(), (**e.value()).clone()))
72 .collect();
73
74 result.push((table_name, entries));
75 }
76 }
77
78 self.entry_count
80 .store(0, std::sync::atomic::Ordering::Relaxed);
81
82 result
83 }
84
85 fn get_or_create_table(&self, table: &str) -> Arc<SkipMap<Vec<u8>, Arc<Vec<u8>>>> {
87 self.tables
88 .entry(table.to_string())
89 .or_insert_with(|| Arc::new(SkipMap::new()))
90 .value()
91 .clone()
92 }
93
94 fn convert_bound(bound: Bound<&Vec<u8>>) -> Bound<Vec<u8>> {
96 match bound {
97 Bound::Included(v) => Bound::Included(v.clone()),
98 Bound::Excluded(v) => Bound::Excluded(v.clone()),
99 Bound::Unbounded => Bound::Unbounded,
100 }
101 }
102}
103
104impl Default for DeltaStore {
105 fn default() -> Self {
106 Self::new()
107 }
108}
109
110impl StorageBackend for DeltaStore {
111 fn insert(&self, table: &str, key: &[u8], value: &[u8]) -> DbxResult<()> {
112 let table_map = self.get_or_create_table(table);
113 table_map.insert(key.to_vec(), Arc::new(value.to_vec()));
114 self.entry_count
115 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
116
117 Ok(())
118 }
119
120 fn insert_batch(&self, table: &str, rows: Vec<(Vec<u8>, Vec<u8>)>) -> DbxResult<()> {
121 let table_map = self.get_or_create_table(table);
122
123 for (key, value) in rows {
124 table_map.insert(key, Arc::new(value));
125 self.entry_count
126 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
127 }
128
129 Ok(())
130 }
131
132 fn get(&self, table: &str, key: &[u8]) -> DbxResult<Option<Vec<u8>>> {
133 let Some(table_map) = self.tables.get(table) else {
134 return Ok(None);
135 };
136
137 Ok(table_map.get(key).map(|e| (**e.value()).clone()))
138 }
139
140 fn delete(&self, table: &str, key: &[u8]) -> DbxResult<bool> {
141 let Some(table_map) = self.tables.get(table) else {
142 return Ok(false);
143 };
144
145 if table_map.remove(key).is_some() {
146 self.entry_count
147 .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
148 Ok(true)
149 } else {
150 Ok(false)
151 }
152 }
153
154 fn scan<R: RangeBounds<Vec<u8>> + Clone>(
155 &self,
156 table: &str,
157 range: R,
158 ) -> DbxResult<Vec<(Vec<u8>, Vec<u8>)>> {
159 let Some(table_map) = self.tables.get(table) else {
160 return Ok(Vec::new());
161 };
162
163 if table_map.is_empty() {
165 return Ok(Vec::new());
166 }
167
168 let start = Self::convert_bound(range.start_bound());
169 let end = Self::convert_bound(range.end_bound());
170
171 let entries: Vec<(Vec<u8>, Vec<u8>)> = table_map
172 .range((start, end))
173 .map(|e| (e.key().clone(), (**e.value()).clone()))
174 .collect();
175
176 Ok(entries)
177 }
178
179 fn scan_one<R: RangeBounds<Vec<u8>> + Clone>(
180 &self,
181 table: &str,
182 range: R,
183 ) -> DbxResult<Option<(Vec<u8>, Vec<u8>)>> {
184 let Some(table_map) = self.tables.get(table) else {
185 return Ok(None);
186 };
187
188 let start = Self::convert_bound(range.start_bound());
189 let end = Self::convert_bound(range.end_bound());
190
191 Ok(table_map
192 .range((start, end))
193 .next()
194 .map(|e| (e.key().clone(), (**e.value()).clone())))
195 }
196
197 fn flush(&self) -> DbxResult<()> {
198 Ok(())
199 }
200
201 fn count(&self, table: &str) -> DbxResult<usize> {
202 let Some(table_map) = self.tables.get(table) else {
203 return Ok(0);
204 };
205
206 Ok(table_map.len())
207 }
208
209 fn table_names(&self) -> DbxResult<Vec<String>> {
210 Ok(self.tables.iter().map(|e| e.key().clone()).collect())
211 }
212}
213
214#[cfg(test)]
215mod tests {
216 use super::*;
217 use crate::transaction::mvcc::version::VersionedKey;
218
219 #[test]
220 fn insert_and_get() {
221 let store = DeltaStore::new();
222 store.insert("users", b"key1", b"value1").unwrap();
223 let result = store.get("users", b"key1").unwrap();
224 assert_eq!(result, Some(b"value1".to_vec()));
225 }
226
227 #[test]
228 fn test_versioned_storage() {
229 let store = DeltaStore::new();
230 let vk1 = VersionedKey::new(b"key1".to_vec(), 100);
231 let vk2 = VersionedKey::new(b"key1".to_vec(), 200);
232
233 store.insert("users", &vk1.encode(), b"v1").unwrap();
234 store.insert("users", &vk2.encode(), b"v2").unwrap();
235
236 assert_eq!(
238 store.get("users", &vk1.encode()).unwrap(),
239 Some(b"v1".to_vec())
240 );
241 assert_eq!(
242 store.get("users", &vk2.encode()).unwrap(),
243 Some(b"v2".to_vec())
244 );
245
246 let results = store.scan("users", Vec::<u8>::new()..).unwrap();
248 assert_eq!(results.len(), 2);
249 assert_eq!(VersionedKey::decode(&results[0].0).unwrap().commit_ts, 200);
250 assert_eq!(VersionedKey::decode(&results[1].0).unwrap().commit_ts, 100);
251 }
252
253 #[test]
254 fn delete_existing_key() {
255 let store = DeltaStore::new();
256 store.insert("users", b"key1", b"value1").unwrap();
257 assert!(store.delete("users", b"key1").unwrap());
258 assert_eq!(store.get("users", b"key1").unwrap(), None);
259 }
260
261 #[test]
262 fn entry_count_tracking() {
263 let store = DeltaStore::new();
264 assert_eq!(store.entry_count(), 0);
265 store.insert("t1", b"a", b"1").unwrap();
266 store.insert("t1", b"b", b"2").unwrap();
267 store.insert("t2", b"c", b"3").unwrap();
268 assert_eq!(store.entry_count(), 3);
269 }
270}