Skip to main content

dbx_core/storage/
delta_store.rs

1//! Delta Store — Tier 1: In-memory write buffer backed by SkipList and DashMap.
2//!
3//! Provides concurrent insert/get/delete with O(log N) latency and
4//! O(log N + K) range scans without sorting overhead.
5//! Now MVCC-aware using VersionedKey and DashMap for table management.
6
7use crate::error::DbxResult;
8use crate::storage::StorageBackend;
9use crate::transaction::mvcc::version::VersionedKey;
10use crossbeam_skiplist::SkipMap;
11use dashmap::DashMap;
12use std::ops::{Bound, RangeBounds};
13use std::sync::Arc;
14
15/// Default flush threshold: flush to WOS when entry count exceeds this.
16const DEFAULT_FLUSH_THRESHOLD: usize = 10_000;
17
18/// Tier 1: Concurrent in-memory store with ordered versioned keys.
19///
20/// Uses `DashMap` for O(1) table lookups and `SkipMap` for O(log N) ordered storage.
21/// Each table is a separate `SkipMap` instance.
22pub struct DeltaStore {
23    /// Table name → SkipMap mapping
24    /// Using DashMap for O(1) table access
25    #[allow(clippy::type_complexity)]
26    tables: DashMap<String, Arc<SkipMap<VersionedKey, Arc<Vec<u8>>>>>,
27    /// Threshold to trigger flush
28    flush_threshold: usize,
29    /// Atomic entry count across all tables
30    entry_count: std::sync::atomic::AtomicUsize,
31}
32
33impl DeltaStore {
34    /// Create a new Delta Store with the default flush threshold (10,000).
35    pub fn new() -> Self {
36        Self::with_threshold(DEFAULT_FLUSH_THRESHOLD)
37    }
38
39    /// Create a new Delta Store with a custom flush threshold.
40    pub fn with_threshold(threshold: usize) -> Self {
41        Self {
42            tables: DashMap::new(),
43            flush_threshold: threshold,
44            entry_count: std::sync::atomic::AtomicUsize::new(0),
45        }
46    }
47
48    /// Check if the store should be flushed to a lower tier.
49    pub fn should_flush(&self) -> bool {
50        self.entry_count() >= self.flush_threshold
51    }
52
53    /// Get the current entry count across all tables.
54    pub fn entry_count(&self) -> usize {
55        self.entry_count.load(std::sync::atomic::Ordering::Relaxed)
56    }
57
58    /// Drain all data from the store, returning table→entries mapping.
59    /// Used during flush to move data to WOS.
60    ///
61    /// Note: Returns encoded keys for backward compatibility with WOS.
62    #[allow(clippy::type_complexity)]
63    pub fn drain_all(&self) -> Vec<(String, Vec<(Vec<u8>, Vec<u8>)>)> {
64        let mut result = Vec::new();
65
66        // Collect all table names
67        let table_names: Vec<String> = self.tables.iter().map(|e| e.key().clone()).collect();
68
69        for table_name in table_names {
70            if let Some((_, table_map)) = self.tables.remove(&table_name) {
71                let entries: Vec<(Vec<u8>, Vec<u8>)> = table_map
72                    .iter()
73                    .map(|e| (e.key().encode(), (**e.value()).clone()))
74                    .collect();
75
76                result.push((table_name, entries));
77            }
78        }
79
80        // Reset entry count
81        self.entry_count
82            .store(0, std::sync::atomic::Ordering::Relaxed);
83
84        result
85    }
86
87    /// Get or create the SkipMap for a table.
88    fn get_or_create_table(&self, table: &str) -> Arc<SkipMap<VersionedKey, Arc<Vec<u8>>>> {
89        self.tables
90            .entry(table.to_string())
91            .or_insert_with(|| Arc::new(SkipMap::new()))
92            .value()
93            .clone()
94    }
95
96    /// Helper to convert raw bytes to VersionedKey.
97    fn to_versioned_key(key: &[u8], default_ts: u64) -> VersionedKey {
98        // If it looks like a versioned key (length > 8), try to decode it.
99        // Versioned keys are [user_key] + [8 bytes timestamp].
100        if key.len() > 8
101            && let Ok(vk) = VersionedKey::decode(key)
102        {
103            return vk;
104        }
105        VersionedKey::new(key.to_vec(), default_ts)
106    }
107
108    /// Helper to convert Bound<&Vec<u8>> to Bound<VersionedKey>.
109    fn convert_start_bound(bound: Bound<&Vec<u8>>) -> Bound<VersionedKey> {
110        match bound {
111            Bound::Included(v) => {
112                if v.is_empty() {
113                    Bound::Included(VersionedKey::new(vec![], u64::MAX))
114                } else {
115                    Bound::Included(Self::to_versioned_key(v, u64::MAX))
116                }
117            }
118            Bound::Excluded(v) => {
119                if v.is_empty() {
120                    Bound::Excluded(VersionedKey::new(vec![], u64::MAX))
121                } else {
122                    Bound::Excluded(Self::to_versioned_key(v, u64::MAX))
123                }
124            }
125            Bound::Unbounded => Bound::Unbounded,
126        }
127    }
128
129    fn convert_end_bound(bound: Bound<&Vec<u8>>) -> Bound<VersionedKey> {
130        match bound {
131            Bound::Included(v) => {
132                if v.is_empty() {
133                    Bound::Included(VersionedKey::new(vec![], 0))
134                } else {
135                    Bound::Included(Self::to_versioned_key(v, 0))
136                }
137            }
138            Bound::Excluded(v) => {
139                if v.is_empty() {
140                    Bound::Excluded(VersionedKey::new(vec![], 0))
141                } else {
142                    Bound::Excluded(Self::to_versioned_key(v, 0))
143                }
144            }
145            Bound::Unbounded => Bound::Unbounded,
146        }
147    }
148}
149
150impl Default for DeltaStore {
151    fn default() -> Self {
152        Self::new()
153    }
154}
155
156impl StorageBackend for DeltaStore {
157    fn insert(&self, table: &str, key: &[u8], value: &[u8]) -> DbxResult<()> {
158        let table_map = self.get_or_create_table(table);
159        // For inserts, if it's a raw key, we use ts=0 (legacy).
160        // If it's encoded, decode() will find the correct TS.
161        let vk = Self::to_versioned_key(key, 0);
162        table_map.insert(vk, Arc::new(value.to_vec()));
163        self.entry_count
164            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
165
166        Ok(())
167    }
168
169    fn insert_batch(&self, table: &str, rows: Vec<(Vec<u8>, Vec<u8>)>) -> DbxResult<()> {
170        let table_map = self.get_or_create_table(table);
171
172        for (key, value) in rows {
173            let vk = Self::to_versioned_key(&key, 0);
174            table_map.insert(vk, Arc::new(value));
175            self.entry_count
176                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
177        }
178
179        Ok(())
180    }
181
182    fn get(&self, table: &str, key: &[u8]) -> DbxResult<Option<Vec<u8>>> {
183        let Some(table_map) = self.tables.get(table) else {
184            return Ok(None);
185        };
186
187        let vk = Self::to_versioned_key(key, 0);
188        Ok(table_map.get(&vk).map(|e| (**e.value()).clone()))
189    }
190
191    fn delete(&self, table: &str, key: &[u8]) -> DbxResult<bool> {
192        let Some(table_map) = self.tables.get(table) else {
193            return Ok(false);
194        };
195
196        let vk = Self::to_versioned_key(key, 0);
197        if table_map.remove(&vk).is_some() {
198            self.entry_count
199                .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
200            Ok(true)
201        } else {
202            Ok(false)
203        }
204    }
205
206    fn scan<R: RangeBounds<Vec<u8>> + Clone>(
207        &self,
208        table: &str,
209        range: R,
210    ) -> DbxResult<Vec<(Vec<u8>, Vec<u8>)>> {
211        let Some(table_map) = self.tables.get(table) else {
212            return Ok(Vec::new());
213        };
214
215        // Fast-path: Delta가 비어있으면 스캔 불필요
216        if table_map.is_empty() {
217            return Ok(Vec::new());
218        }
219
220        let start = Self::convert_start_bound(range.start_bound());
221        let end = Self::convert_end_bound(range.end_bound());
222
223        let entries: Vec<(Vec<u8>, Vec<u8>)> = table_map
224            .range((start, end))
225            .map(|e| (e.key().encode(), (**e.value()).clone()))
226            .collect();
227
228        Ok(entries)
229    }
230
231    fn scan_one<R: RangeBounds<Vec<u8>> + Clone>(
232        &self,
233        table: &str,
234        range: R,
235    ) -> DbxResult<Option<(Vec<u8>, Vec<u8>)>> {
236        let Some(table_map) = self.tables.get(table) else {
237            return Ok(None);
238        };
239
240        let start = Self::convert_start_bound(range.start_bound());
241        let end = Self::convert_end_bound(range.end_bound());
242
243        Ok(table_map
244            .range((start, end))
245            .next()
246            .map(|e| (e.key().encode(), (**e.value()).clone())))
247    }
248
249    fn flush(&self) -> DbxResult<()> {
250        Ok(())
251    }
252
253    fn count(&self, table: &str) -> DbxResult<usize> {
254        let Some(table_map) = self.tables.get(table) else {
255            return Ok(0);
256        };
257
258        Ok(table_map.len())
259    }
260
261    fn table_names(&self) -> DbxResult<Vec<String>> {
262        Ok(self.tables.iter().map(|e| e.key().clone()).collect())
263    }
264}
265
266#[cfg(test)]
267mod tests {
268    use super::*;
269
270    #[test]
271    fn insert_and_get() {
272        let store = DeltaStore::new();
273        store.insert("users", b"key1", b"value1").unwrap();
274        let result = store.get("users", b"key1").unwrap();
275        assert_eq!(result, Some(b"value1".to_vec()));
276    }
277
278    #[test]
279    fn test_versioned_storage() {
280        let store = DeltaStore::new();
281        let vk1 = VersionedKey::new(b"key1".to_vec(), 100);
282        let vk2 = VersionedKey::new(b"key1".to_vec(), 200);
283
284        store.insert("users", &vk1.encode(), b"v1").unwrap();
285        store.insert("users", &vk2.encode(), b"v2").unwrap();
286
287        // Should be able to get both versions if we use the exact versioned key
288        assert_eq!(
289            store.get("users", &vk1.encode()).unwrap(),
290            Some(b"v1".to_vec())
291        );
292        assert_eq!(
293            store.get("users", &vk2.encode()).unwrap(),
294            Some(b"v2".to_vec())
295        );
296
297        // Scan should return them in correct order (latest first for same key)
298        let results = store.scan("users", Vec::<u8>::new()..).unwrap();
299        assert_eq!(results.len(), 2);
300        assert_eq!(VersionedKey::decode(&results[0].0).unwrap().commit_ts, 200);
301        assert_eq!(VersionedKey::decode(&results[1].0).unwrap().commit_ts, 100);
302    }
303
304    #[test]
305    fn delete_existing_key() {
306        let store = DeltaStore::new();
307        store.insert("users", b"key1", b"value1").unwrap();
308        assert!(store.delete("users", b"key1").unwrap());
309        assert_eq!(store.get("users", b"key1").unwrap(), None);
310    }
311
312    #[test]
313    fn entry_count_tracking() {
314        let store = DeltaStore::new();
315        assert_eq!(store.entry_count(), 0);
316        store.insert("t1", b"a", b"1").unwrap();
317        store.insert("t1", b"b", b"2").unwrap();
318        store.insert("t2", b"c", b"3").unwrap();
319        assert_eq!(store.entry_count(), 3);
320    }
321}