Skip to main content

dbx_core/storage/
delta_store.rs

1//! Delta Store — Tier 1: In-memory write buffer backed by SkipList and DashMap.
2//!
3//! Provides concurrent insert/get/delete with O(log N) latency and
4//! O(log N + K) range scans without sorting overhead.
5//! Now MVCC-aware using VersionedKey and DashMap for table management.
6
7use crate::error::DbxResult;
8use crate::storage::StorageBackend;
9
10use crossbeam_skiplist::SkipMap;
11use dashmap::DashMap;
12use std::ops::{Bound, RangeBounds};
13use std::sync::Arc;
14
15/// Default flush threshold: flush to WOS when entry count exceeds this.
16const DEFAULT_FLUSH_THRESHOLD: usize = 10_000;
17
18/// Tier 1: Concurrent in-memory store with ordered keys.
19///
20/// Uses `DashMap` for O(1) table lookups and `SkipMap` for O(log N) ordered storage.
21/// Each table is a separate `SkipMap` instance.
22pub struct DeltaStore {
23    /// Table name → SkipMap mapping
24    /// Using DashMap for O(1) table access
25    #[allow(clippy::type_complexity)]
26    tables: DashMap<String, Arc<SkipMap<Vec<u8>, Arc<Vec<u8>>>>>,
27    /// Threshold to trigger flush
28    flush_threshold: usize,
29    /// Atomic entry count across all tables
30    entry_count: std::sync::atomic::AtomicUsize,
31}
32
33impl DeltaStore {
34    /// Create a new Delta Store with the default flush threshold (10,000).
35    pub fn new() -> Self {
36        Self::with_threshold(DEFAULT_FLUSH_THRESHOLD)
37    }
38
39    /// Create a new Delta Store with a custom flush threshold.
40    pub fn with_threshold(threshold: usize) -> Self {
41        Self {
42            tables: DashMap::new(),
43            flush_threshold: threshold,
44            entry_count: std::sync::atomic::AtomicUsize::new(0),
45        }
46    }
47
48    /// Check if the store should be flushed to a lower tier.
49    pub fn should_flush(&self) -> bool {
50        self.entry_count() >= self.flush_threshold
51    }
52
53    /// Get the current entry count across all tables.
54    pub fn entry_count(&self) -> usize {
55        self.entry_count.load(std::sync::atomic::Ordering::Relaxed)
56    }
57
58    /// Drain all data from the store, returning table→entries mapping.
59    /// Used during flush to move data to WOS.
60    #[allow(clippy::type_complexity)]
61    pub fn drain_all(&self) -> Vec<(String, Vec<(Vec<u8>, Vec<u8>)>)> {
62        let mut result = Vec::new();
63
64        // Collect all table names
65        let table_names: Vec<String> = self.tables.iter().map(|e| e.key().clone()).collect();
66
67        for table_name in table_names {
68            if let Some((_, table_map)) = self.tables.remove(&table_name) {
69                let entries: Vec<(Vec<u8>, Vec<u8>)> = table_map
70                    .iter()
71                    .map(|e| (e.key().clone(), (**e.value()).clone()))
72                    .collect();
73
74                result.push((table_name, entries));
75            }
76        }
77
78        // Reset entry count
79        self.entry_count
80            .store(0, std::sync::atomic::Ordering::Relaxed);
81
82        result
83    }
84
85    /// Get or create the SkipMap for a table.
86    fn get_or_create_table(&self, table: &str) -> Arc<SkipMap<Vec<u8>, Arc<Vec<u8>>>> {
87        self.tables
88            .entry(table.to_string())
89            .or_insert_with(|| Arc::new(SkipMap::new()))
90            .value()
91            .clone()
92    }
93
94    /// Helper to convert Bound<&Vec<u8>> to Bound<Vec<u8>>.
95    fn convert_bound(bound: Bound<&Vec<u8>>) -> Bound<Vec<u8>> {
96        match bound {
97            Bound::Included(v) => Bound::Included(v.clone()),
98            Bound::Excluded(v) => Bound::Excluded(v.clone()),
99            Bound::Unbounded => Bound::Unbounded,
100        }
101    }
102}
103
104impl Default for DeltaStore {
105    fn default() -> Self {
106        Self::new()
107    }
108}
109
110impl StorageBackend for DeltaStore {
111    fn insert(&self, table: &str, key: &[u8], value: &[u8]) -> DbxResult<()> {
112        let table_map = self.get_or_create_table(table);
113        table_map.insert(key.to_vec(), Arc::new(value.to_vec()));
114        self.entry_count
115            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
116
117        Ok(())
118    }
119
120    fn insert_batch(&self, table: &str, rows: Vec<(Vec<u8>, Vec<u8>)>) -> DbxResult<()> {
121        let table_map = self.get_or_create_table(table);
122
123        for (key, value) in rows {
124            table_map.insert(key, Arc::new(value));
125            self.entry_count
126                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
127        }
128
129        Ok(())
130    }
131
132    fn get(&self, table: &str, key: &[u8]) -> DbxResult<Option<Vec<u8>>> {
133        let Some(table_map) = self.tables.get(table) else {
134            return Ok(None);
135        };
136
137        Ok(table_map.get(key).map(|e| (**e.value()).clone()))
138    }
139
140    fn delete(&self, table: &str, key: &[u8]) -> DbxResult<bool> {
141        let Some(table_map) = self.tables.get(table) else {
142            return Ok(false);
143        };
144
145        if table_map.remove(key).is_some() {
146            self.entry_count
147                .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
148            Ok(true)
149        } else {
150            Ok(false)
151        }
152    }
153
154    fn scan<R: RangeBounds<Vec<u8>> + Clone>(
155        &self,
156        table: &str,
157        range: R,
158    ) -> DbxResult<Vec<(Vec<u8>, Vec<u8>)>> {
159        let Some(table_map) = self.tables.get(table) else {
160            return Ok(Vec::new());
161        };
162
163        // Fast-path: Delta가 비어있으면 스캔 불필요
164        if table_map.is_empty() {
165            return Ok(Vec::new());
166        }
167
168        let start = Self::convert_bound(range.start_bound());
169        let end = Self::convert_bound(range.end_bound());
170
171        let entries: Vec<(Vec<u8>, Vec<u8>)> = table_map
172            .range((start, end))
173            .map(|e| (e.key().clone(), (**e.value()).clone()))
174            .collect();
175
176        Ok(entries)
177    }
178
179    fn scan_one<R: RangeBounds<Vec<u8>> + Clone>(
180        &self,
181        table: &str,
182        range: R,
183    ) -> DbxResult<Option<(Vec<u8>, Vec<u8>)>> {
184        let Some(table_map) = self.tables.get(table) else {
185            return Ok(None);
186        };
187
188        let start = Self::convert_bound(range.start_bound());
189        let end = Self::convert_bound(range.end_bound());
190
191        Ok(table_map
192            .range((start, end))
193            .next()
194            .map(|e| (e.key().clone(), (**e.value()).clone())))
195    }
196
197    fn flush(&self) -> DbxResult<()> {
198        Ok(())
199    }
200
201    fn count(&self, table: &str) -> DbxResult<usize> {
202        let Some(table_map) = self.tables.get(table) else {
203            return Ok(0);
204        };
205
206        Ok(table_map.len())
207    }
208
209    fn table_names(&self) -> DbxResult<Vec<String>> {
210        Ok(self.tables.iter().map(|e| e.key().clone()).collect())
211    }
212}
213
214#[cfg(test)]
215mod tests {
216    use super::*;
217    use crate::transaction::mvcc::version::VersionedKey;
218
219    #[test]
220    fn insert_and_get() {
221        let store = DeltaStore::new();
222        store.insert("users", b"key1", b"value1").unwrap();
223        let result = store.get("users", b"key1").unwrap();
224        assert_eq!(result, Some(b"value1".to_vec()));
225    }
226
227    #[test]
228    fn test_versioned_storage() {
229        let store = DeltaStore::new();
230        let vk1 = VersionedKey::new(b"key1".to_vec(), 100);
231        let vk2 = VersionedKey::new(b"key1".to_vec(), 200);
232
233        store.insert("users", &vk1.encode(), b"v1").unwrap();
234        store.insert("users", &vk2.encode(), b"v2").unwrap();
235
236        // Should be able to get both versions if we use the exact versioned key
237        assert_eq!(
238            store.get("users", &vk1.encode()).unwrap(),
239            Some(b"v1".to_vec())
240        );
241        assert_eq!(
242            store.get("users", &vk2.encode()).unwrap(),
243            Some(b"v2".to_vec())
244        );
245
246        // Scan should return them in correct order (latest first for same key)
247        let results = store.scan("users", Vec::<u8>::new()..).unwrap();
248        assert_eq!(results.len(), 2);
249        assert_eq!(VersionedKey::decode(&results[0].0).unwrap().commit_ts, 200);
250        assert_eq!(VersionedKey::decode(&results[1].0).unwrap().commit_ts, 100);
251    }
252
253    #[test]
254    fn delete_existing_key() {
255        let store = DeltaStore::new();
256        store.insert("users", b"key1", b"value1").unwrap();
257        assert!(store.delete("users", b"key1").unwrap());
258        assert_eq!(store.get("users", b"key1").unwrap(), None);
259    }
260
261    #[test]
262    fn entry_count_tracking() {
263        let store = DeltaStore::new();
264        assert_eq!(store.entry_count(), 0);
265        store.insert("t1", b"a", b"1").unwrap();
266        store.insert("t1", b"b", b"2").unwrap();
267        store.insert("t2", b"c", b"3").unwrap();
268        assert_eq!(store.entry_count(), 3);
269    }
270}