dbx_core/storage/native_wos/
backend.rs1use super::table_store::TableStore;
4use crate::engine::DirtyBufferMode;
5use crate::error::DbxResult;
6use crate::storage::StorageBackend;
7use dashmap::DashMap;
8use std::ops::RangeBounds;
9use std::path::{Path, PathBuf};
10use std::sync::Mutex;
11
12pub struct NativeWosBackend {
15 base_path: PathBuf,
16 tables: DashMap<String, Mutex<TableStore>>,
17 dirty_buffer_mode: DirtyBufferMode,
18 _temp_dir: Option<tempfile::TempDir>,
20}
21
22impl NativeWosBackend {
23 pub fn open(base_path: &Path) -> DbxResult<Self> {
24 Self::open_with_mode(base_path, DirtyBufferMode::default())
25 }
26
27 pub fn open_with_mode(base_path: &Path, mode: DirtyBufferMode) -> DbxResult<Self> {
29 std::fs::create_dir_all(base_path)?;
30 Ok(Self {
31 base_path: base_path.to_path_buf(),
32 tables: DashMap::new(),
33 dirty_buffer_mode: mode,
34 _temp_dir: None,
35 })
36 }
37
38 pub fn open_temporary() -> DbxResult<Self> {
40 Self::open_temporary_with_mode(DirtyBufferMode::default())
41 }
42
43 pub fn open_temporary_with_mode(mode: DirtyBufferMode) -> DbxResult<Self> {
45 let dir = tempfile::tempdir()?;
46 let path = dir.path().to_path_buf();
47 Ok(Self {
48 base_path: path,
49 tables: DashMap::new(),
50 dirty_buffer_mode: mode,
51 _temp_dir: Some(dir),
52 })
53 }
54
55 fn get_or_open(&self, table: &str) -> DbxResult<()> {
56 if !self.tables.contains_key(table) {
57 let safe_name = table.replace(['/', '\\', ':', '*', '?', '"', '<', '>', '|'], "_");
58 let path = self.base_path.join(format!("{safe_name}.wos"));
59 if let Some(parent) = path.parent() {
60 std::fs::create_dir_all(parent)?;
61 }
62 let store = TableStore::open_with_mode(&path, self.dirty_buffer_mode)?;
63 self.tables.insert(table.to_string(), Mutex::new(store));
64 }
65 Ok(())
66 }
67}
68
69impl StorageBackend for NativeWosBackend {
70 fn insert(&self, table: &str, key: &[u8], value: &[u8]) -> DbxResult<()> {
71 self.get_or_open(table)?;
72 self.tables
73 .get(table)
74 .unwrap()
75 .lock()
76 .unwrap()
77 .insert(key, value)
78 }
79
80 fn get(&self, table: &str, key: &[u8]) -> DbxResult<Option<Vec<u8>>> {
81 self.get_or_open(table)?;
82 self.tables.get(table).unwrap().lock().unwrap().get(key)
83 }
84
85 fn delete(&self, table: &str, key: &[u8]) -> DbxResult<bool> {
86 self.get_or_open(table)?;
87 self.tables.get(table).unwrap().lock().unwrap().delete(key)
88 }
89
90 fn scan<R: RangeBounds<Vec<u8>> + Clone>(
91 &self,
92 table: &str,
93 range: R,
94 ) -> DbxResult<Vec<(Vec<u8>, Vec<u8>)>> {
95 self.get_or_open(table)?;
96 self.tables.get(table).unwrap().lock().unwrap().scan(range)
97 }
98
99 fn scan_one<R: RangeBounds<Vec<u8>> + Clone>(
100 &self,
101 table: &str,
102 range: R,
103 ) -> DbxResult<Option<(Vec<u8>, Vec<u8>)>> {
104 self.get_or_open(table)?;
105 self.tables
106 .get(table)
107 .unwrap()
108 .lock()
109 .unwrap()
110 .scan_one(range)
111 }
112
113 fn flush(&self) -> DbxResult<()> {
114 for entry in self.tables.iter() {
115 entry.value().lock().unwrap().flush()?;
116 }
117 Ok(())
118 }
119
120 fn count(&self, table: &str) -> DbxResult<usize> {
121 self.get_or_open(table)?;
122 self.tables.get(table).unwrap().lock().unwrap().count()
123 }
124
125 fn table_names(&self) -> DbxResult<Vec<String>> {
126 Ok(self.tables.iter().map(|e| e.key().clone()).collect())
127 }
128}
129
130#[cfg(test)]
131mod tests {
132 use super::*;
133
134 fn temp_backend() -> NativeWosBackend {
135 NativeWosBackend::open_temporary().unwrap()
136 }
137
138 #[test]
139 fn insert_and_get() {
140 let b = temp_backend();
141 b.insert("users", b"key1", b"value1").unwrap();
142 assert_eq!(b.get("users", b"key1").unwrap(), Some(b"value1".to_vec()));
143 }
144
145 #[test]
146 fn get_nonexistent() {
147 let b = temp_backend();
148 assert_eq!(b.get("users", b"missing").unwrap(), None);
149 }
150
151 #[test]
152 fn delete_existing() {
153 let b = temp_backend();
154 b.insert("users", b"key1", b"value1").unwrap();
155 assert!(b.delete("users", b"key1").unwrap());
156 assert_eq!(b.get("users", b"key1").unwrap(), None);
157 }
158
159 #[test]
160 fn delete_nonexistent() {
161 let b = temp_backend();
162 assert!(!b.delete("users", b"missing").unwrap());
163 }
164
165 #[test]
166 fn upsert_overwrites() {
167 let b = temp_backend();
168 b.insert("t", b"k", b"v1").unwrap();
169 b.insert("t", b"k", b"v2").unwrap();
170 assert_eq!(b.get("t", b"k").unwrap(), Some(b"v2".to_vec()));
171 }
172
173 #[test]
174 fn scan_all() {
175 let b = temp_backend();
176 b.insert("t", b"a", b"1").unwrap();
177 b.insert("t", b"b", b"2").unwrap();
178 b.insert("t", b"c", b"3").unwrap();
179 let all = b.scan("t", ..).unwrap();
180 assert_eq!(all.len(), 3);
181 assert_eq!(all[0].0, b"a");
182 assert_eq!(all[2].0, b"c");
183 }
184
185 #[test]
186 fn scan_range() {
187 let b = temp_backend();
188 b.insert("t", b"a", b"1").unwrap();
189 b.insert("t", b"b", b"2").unwrap();
190 b.insert("t", b"c", b"3").unwrap();
191 b.insert("t", b"d", b"4").unwrap();
192 let res = b.scan("t", b"b".to_vec()..b"d".to_vec()).unwrap();
193 assert_eq!(res.len(), 2);
194 assert_eq!(res[0].0, b"b");
195 assert_eq!(res[1].0, b"c");
196 }
197
198 #[test]
199 fn count() {
200 let b = temp_backend();
201 assert_eq!(b.count("t").unwrap(), 0);
202 b.insert("t", b"a", b"1").unwrap();
203 b.insert("t", b"b", b"2").unwrap();
204 assert_eq!(b.count("t").unwrap(), 2);
205 }
206
207 #[test]
208 fn table_names() {
209 let b = temp_backend();
210 b.insert("users", b"a", b"1").unwrap();
211 b.insert("orders", b"b", b"2").unwrap();
212 let mut names = b.table_names().unwrap();
213 names.sort();
214 assert_eq!(names, vec!["orders".to_string(), "users".to_string()]);
215 }
216
217 #[test]
218 fn multiple_tables_isolation() {
219 let b = temp_backend();
220 b.insert("t1", b"k", b"v1").unwrap();
221 b.insert("t2", b"k", b"v2").unwrap();
222 assert_eq!(b.get("t1", b"k").unwrap(), Some(b"v1".to_vec()));
223 assert_eq!(b.get("t2", b"k").unwrap(), Some(b"v2".to_vec()));
224 }
225}