Skip to main content

dbx_core/storage/
wos.rs

1//! WOS (Write-Optimized Store) — Tier 3: sled-backed durable row storage.
2//!
3//! Provides persistent key-value storage with B+Tree indexing (~10μs latency).
4//! Each table maps to a separate sled `Tree`.
5
6use crate::error::DbxResult;
7use crate::storage::StorageBackend;
8use std::ops::RangeBounds;
9use std::path::Path;
10
11/// Tier 3: sled-backed persistent storage with B+Tree indexing.
12pub struct WosBackend {
13    db: sled::Db,
14}
15
16impl WosBackend {
17    /// Open WOS at the given directory path.
18    pub fn open(path: &Path) -> DbxResult<Self> {
19        let db = sled::open(path)?;
20        Ok(Self { db })
21    }
22
23    /// Open a temporary WOS (for testing). Data is deleted on drop.
24    pub fn open_temporary() -> DbxResult<Self> {
25        let config = sled::Config::new().temporary(true);
26        let db = config.open()?;
27        Ok(Self { db })
28    }
29
30    /// Get or create a sled Tree for the given table name.
31    fn tree(&self, table: &str) -> DbxResult<sled::Tree> {
32        Ok(self.db.open_tree(table)?)
33    }
34}
35
36impl StorageBackend for WosBackend {
37    fn insert(&self, table: &str, key: &[u8], value: &[u8]) -> DbxResult<()> {
38        let tree = self.tree(table)?;
39        tree.insert(key, value)?;
40        Ok(())
41    }
42
43    fn get(&self, table: &str, key: &[u8]) -> DbxResult<Option<Vec<u8>>> {
44        let tree = self.tree(table)?;
45        Ok(tree.get(key)?.map(|ivec| ivec.to_vec()))
46    }
47
48    fn delete(&self, table: &str, key: &[u8]) -> DbxResult<bool> {
49        let tree = self.tree(table)?;
50        Ok(tree.remove(key)?.is_some())
51    }
52
53    fn scan<R: RangeBounds<Vec<u8>> + Clone>(
54        &self,
55        table: &str,
56        range: R,
57    ) -> DbxResult<Vec<(Vec<u8>, Vec<u8>)>> {
58        let tree = self.tree(table)?;
59
60        // Convert RangeBounds<Vec<u8>> to sled-compatible range
61        let iter = match (range.start_bound(), range.end_bound()) {
62            (std::ops::Bound::Unbounded, std::ops::Bound::Unbounded) => tree.iter(),
63            (std::ops::Bound::Included(start), std::ops::Bound::Unbounded) => {
64                tree.range(start.as_slice()..)
65            }
66            (std::ops::Bound::Included(start), std::ops::Bound::Excluded(end)) => {
67                tree.range(start.as_slice()..end.as_slice())
68            }
69            (std::ops::Bound::Included(start), std::ops::Bound::Included(end)) => {
70                tree.range(start.as_slice()..=end.as_slice())
71            }
72            (std::ops::Bound::Unbounded, std::ops::Bound::Excluded(end)) => {
73                tree.range(..end.as_slice())
74            }
75            (std::ops::Bound::Unbounded, std::ops::Bound::Included(end)) => {
76                tree.range(..=end.as_slice())
77            }
78            (std::ops::Bound::Excluded(_), _) => {
79                // sled doesn't directly support excluded start bounds,
80                // use full iteration with manual filter
81                tree.iter()
82            }
83        };
84
85        let mut result = Vec::new();
86        for item in iter {
87            let (k, v) = item?;
88            let key_vec = k.to_vec();
89            // For excluded start bound, manually filter
90            if let std::ops::Bound::Excluded(start) = range.start_bound()
91                && key_vec <= *start
92            {
93                continue;
94            }
95            result.push((key_vec, v.to_vec()));
96        }
97        Ok(result)
98    }
99
100    fn scan_one<R: RangeBounds<Vec<u8>> + Clone>(
101        &self,
102        table: &str,
103        range: R,
104    ) -> DbxResult<Option<(Vec<u8>, Vec<u8>)>> {
105        let tree = self.tree(table)?;
106
107        // Convert RangeBounds<Vec<u8>> to sled-compatible range
108        let mut iter = match (range.start_bound(), range.end_bound()) {
109            (std::ops::Bound::Unbounded, std::ops::Bound::Unbounded) => tree.iter(),
110            (std::ops::Bound::Included(start), std::ops::Bound::Unbounded) => {
111                tree.range(start.as_slice()..)
112            }
113            (std::ops::Bound::Included(start), std::ops::Bound::Excluded(end)) => {
114                tree.range(start.as_slice()..end.as_slice())
115            }
116            (std::ops::Bound::Included(start), std::ops::Bound::Included(end)) => {
117                tree.range(start.as_slice()..=end.as_slice())
118            }
119            (std::ops::Bound::Unbounded, std::ops::Bound::Excluded(end)) => {
120                tree.range(..end.as_slice())
121            }
122            (std::ops::Bound::Unbounded, std::ops::Bound::Included(end)) => {
123                tree.range(..=end.as_slice())
124            }
125            (std::ops::Bound::Excluded(_), _) => tree.iter(),
126        };
127
128        if let Some(item) = iter.next() {
129            let (k, v) = item?;
130            let key_vec = k.to_vec();
131            // For excluded start bound, manually filter
132            if let std::ops::Bound::Excluded(start) = range.start_bound()
133                && key_vec <= *start
134            {
135                // Fallback to full iteration if the first item doesn't match
136                // (This is inefficient but consistent with scan's current logic)
137                for next_item in iter {
138                    let (nk, nv) = next_item?;
139                    let nkey_vec = nk.to_vec();
140                    if nkey_vec > *start {
141                        return Ok(Some((nkey_vec, nv.to_vec())));
142                    }
143                }
144                return Ok(None);
145            }
146            return Ok(Some((key_vec, v.to_vec())));
147        }
148        Ok(None)
149    }
150
151    fn flush(&self) -> DbxResult<()> {
152        self.db.flush()?;
153        Ok(())
154    }
155
156    fn count(&self, table: &str) -> DbxResult<usize> {
157        let tree = self.tree(table)?;
158        Ok(tree.len())
159    }
160
161    fn table_names(&self) -> DbxResult<Vec<String>> {
162        Ok(self
163            .db
164            .tree_names()
165            .into_iter()
166            .filter_map(|name| {
167                let s = String::from_utf8(name.to_vec()).ok()?;
168                // sled has a default tree named "__sled__default"
169                if s == "__sled__default" {
170                    None
171                } else {
172                    Some(s)
173                }
174            })
175            .collect())
176    }
177}
178
179#[cfg(test)]
180mod tests {
181    use super::*;
182
183    fn temp_wos() -> WosBackend {
184        WosBackend::open_temporary().unwrap()
185    }
186
187    #[test]
188    fn insert_and_get() {
189        let wos = temp_wos();
190        wos.insert("users", b"key1", b"value1").unwrap();
191        let result = wos.get("users", b"key1").unwrap();
192        assert_eq!(result, Some(b"value1".to_vec()));
193    }
194
195    #[test]
196    fn get_nonexistent() {
197        let wos = temp_wos();
198        let result = wos.get("users", b"missing").unwrap();
199        assert_eq!(result, None);
200    }
201
202    #[test]
203    fn delete_existing() {
204        let wos = temp_wos();
205        wos.insert("users", b"key1", b"value1").unwrap();
206        assert!(wos.delete("users", b"key1").unwrap());
207        assert_eq!(wos.get("users", b"key1").unwrap(), None);
208    }
209
210    #[test]
211    fn delete_nonexistent() {
212        let wos = temp_wos();
213        assert!(!wos.delete("users", b"missing").unwrap());
214    }
215
216    #[test]
217    fn upsert_overwrites() {
218        let wos = temp_wos();
219        wos.insert("t", b"k", b"v1").unwrap();
220        wos.insert("t", b"k", b"v2").unwrap();
221        assert_eq!(wos.get("t", b"k").unwrap(), Some(b"v2".to_vec()));
222    }
223
224    #[test]
225    fn scan_all() {
226        let wos = temp_wos();
227        wos.insert("t", b"a", b"1").unwrap();
228        wos.insert("t", b"b", b"2").unwrap();
229        wos.insert("t", b"c", b"3").unwrap();
230
231        let all: Vec<(Vec<u8>, Vec<u8>)> = wos.scan("t", ..).unwrap();
232        assert_eq!(all.len(), 3);
233        assert_eq!(all[0].0, b"a");
234        assert_eq!(all[2].0, b"c");
235    }
236
237    #[test]
238    fn scan_range() {
239        let wos = temp_wos();
240        wos.insert("t", b"a", b"1").unwrap();
241        wos.insert("t", b"b", b"2").unwrap();
242        wos.insert("t", b"c", b"3").unwrap();
243        wos.insert("t", b"d", b"4").unwrap();
244
245        let range_result = wos.scan("t", b"b".to_vec()..b"d".to_vec()).unwrap();
246        assert_eq!(range_result.len(), 2);
247        assert_eq!(range_result[0].0, b"b");
248        assert_eq!(range_result[1].0, b"c");
249    }
250
251    #[test]
252    fn count() {
253        let wos = temp_wos();
254        assert_eq!(wos.count("t").unwrap(), 0);
255        wos.insert("t", b"a", b"1").unwrap();
256        wos.insert("t", b"b", b"2").unwrap();
257        assert_eq!(wos.count("t").unwrap(), 2);
258    }
259
260    #[test]
261    fn table_names() {
262        let wos = temp_wos();
263        wos.insert("users", b"a", b"1").unwrap();
264        wos.insert("orders", b"b", b"2").unwrap();
265        let mut names = wos.table_names().unwrap();
266        names.sort();
267        assert_eq!(names, vec!["orders".to_string(), "users".to_string()]);
268    }
269
270    #[test]
271    fn flush_persists() {
272        let wos = temp_wos();
273        wos.insert("t", b"key", b"val").unwrap();
274        wos.flush().unwrap();
275        // After flush, data should still be readable
276        assert_eq!(wos.get("t", b"key").unwrap(), Some(b"val".to_vec()));
277    }
278
279    #[test]
280    fn multiple_tables_isolation() {
281        let wos = temp_wos();
282        wos.insert("t1", b"k", b"v1").unwrap();
283        wos.insert("t2", b"k", b"v2").unwrap();
284        assert_eq!(wos.get("t1", b"k").unwrap(), Some(b"v1".to_vec()));
285        assert_eq!(wos.get("t2", b"k").unwrap(), Some(b"v2".to_vec()));
286    }
287}