1#[macro_use]
2extern crate tracing;
3
4#[cfg(feature = "fjall")]
5pub mod fjall_adapter;
6#[cfg(feature = "lmdb")]
7pub mod lmdb_adapter;
8#[cfg(feature = "sqlite")]
9pub mod sqlite_adapter;
10
11pub mod open;
12
13#[cfg(test)]
14pub mod test;
15
16use core::ops::{Bound, RangeBounds};
17
18use std::borrow::Cow;
19use std::cell::Cell;
20use std::path::PathBuf;
21use std::sync::Arc;
22
23use err_derive::Error;
24
25pub use open::*;
26
27pub(crate) type OnCommit = Vec<Box<dyn FnOnce()>>;
28
29#[derive(Clone)]
30pub struct Db(pub(crate) Arc<dyn IDb>);
31
32pub struct Transaction<'a> {
33 tx: &'a mut dyn ITx,
34 on_commit: OnCommit,
35}
36
37#[derive(Clone)]
38pub struct Tree(Arc<dyn IDb>, usize);
39
40pub type Value = Vec<u8>;
41pub type ValueIter<'a> = Box<dyn std::iter::Iterator<Item = Result<(Value, Value)>> + 'a>;
42pub type TxValueIter<'a> = Box<dyn std::iter::Iterator<Item = TxOpResult<(Value, Value)>> + 'a>;
43
44#[derive(Debug, Error)]
47#[error(display = "{}", _0)]
48pub struct Error(pub Cow<'static, str>);
49
50impl From<std::io::Error> for Error {
51 fn from(e: std::io::Error) -> Error {
52 Error(format!("IO: {}", e).into())
53 }
54}
55
56pub type Result<T> = std::result::Result<T, Error>;
57
58#[derive(Debug, Error)]
59#[error(display = "{}", _0)]
60pub struct TxOpError(pub(crate) Error);
61pub type TxOpResult<T> = std::result::Result<T, TxOpError>;
62
63#[derive(Debug)]
64pub enum TxError<E> {
65 Abort(E),
66 Db(Error),
67}
68pub type TxResult<R, E> = std::result::Result<R, TxError<E>>;
69
70impl<E> From<TxOpError> for TxError<E> {
71 fn from(e: TxOpError) -> TxError<E> {
72 TxError::Db(e.0)
73 }
74}
75
76pub fn unabort<R, E>(res: TxResult<R, E>) -> TxOpResult<std::result::Result<R, E>> {
77 match res {
78 Ok(v) => Ok(Ok(v)),
79 Err(TxError::Abort(e)) => Ok(Err(e)),
80 Err(TxError::Db(e)) => Err(TxOpError(e)),
81 }
82}
83
84impl Db {
87 pub fn engine(&self) -> String {
88 self.0.engine()
89 }
90
91 pub fn open_tree<S: AsRef<str>>(&self, name: S) -> Result<Tree> {
92 let tree_id = self.0.open_tree(name.as_ref())?;
93 Ok(Tree(self.0.clone(), tree_id))
94 }
95
96 pub fn list_trees(&self) -> Result<Vec<String>> {
97 self.0.list_trees()
98 }
99
100 pub fn transaction<R, E, F>(&self, fun: F) -> TxResult<R, E>
101 where
102 F: Fn(&mut Transaction<'_>) -> TxResult<R, E>,
103 {
104 let f = TxFn {
105 function: fun,
106 result: Cell::new(None),
107 };
108 let tx_res = self.0.transaction(&f);
109 let fn_res = f.result.into_inner();
110
111 match (tx_res, fn_res) {
112 (Ok(on_commit), Some(Ok(value))) => {
113 on_commit.into_iter().for_each(|f| f());
117 Ok(value)
118 }
119 (Err(TxError::Abort(())), Some(Err(TxError::Abort(e)))) => {
120 Err(TxError::Abort(e))
123 }
124 (Err(TxError::Db(_tx_e)), Some(Err(TxError::Db(fn_e)))) => {
125 Err(TxError::Db(fn_e))
129 }
130 (Err(TxError::Db(tx_e)), None) => {
131 Err(TxError::Db(tx_e))
134 }
135 (Err(TxError::Db(tx_e)), Some(Ok(_))) => {
136 Err(TxError::Db(tx_e))
139 }
140 (tx_res, fn_res) => {
141 panic!(
142 "unexpected error case: tx_res={:?}, fn_res={:?}",
143 tx_res.map(|_| "..."),
144 fn_res.map(|x| x.map(|_| "...").map_err(|_| "..."))
145 );
146 }
147 }
148 }
149
150 pub fn snapshot(&self, path: &PathBuf) -> Result<()> {
151 self.0.snapshot(path)
152 }
153
154 pub fn import(&self, other: &Db) -> Result<()> {
155 let existing_trees = self.list_trees()?;
156 if !existing_trees.is_empty() {
157 return Err(Error(
158 format!(
159 "destination database already contains data: {:?}",
160 existing_trees
161 )
162 .into(),
163 ));
164 }
165
166 let tree_names = other.list_trees()?;
167 for name in tree_names {
168 let tree = self.open_tree(&name)?;
169 if !tree.is_empty()? {
170 return Err(Error(format!("tree {} already contains data", name).into()));
171 }
172
173 let ex_tree = other.open_tree(&name)?;
174
175 let tx_res = self.transaction(|tx| {
176 let mut i = 0;
177 for item in ex_tree.iter().map_err(TxError::Abort)? {
178 let (k, v) = item.map_err(TxError::Abort)?;
179 tx.insert(&tree, k, v)?;
180 i += 1;
181 if i % 1000 == 0 {
182 println!("{}: imported {}", name, i);
183 }
184 }
185 Ok(i)
186 });
187 let total = match tx_res {
188 Err(TxError::Db(e)) => return Err(e),
189 Err(TxError::Abort(e)) => return Err(e),
190 Ok(x) => x,
191 };
192
193 println!("{}: finished importing, {} items", name, total);
194 }
195 Ok(())
196 }
197}
198
199#[allow(clippy::len_without_is_empty)]
200impl Tree {
201 #[inline]
202 pub fn db(&self) -> Db {
203 Db(self.0.clone())
204 }
205
206 #[inline]
207 pub fn get<T: AsRef<[u8]>>(&self, key: T) -> Result<Option<Value>> {
208 self.0.get(self.1, key.as_ref())
209 }
210 #[inline]
211 pub fn approximate_len(&self) -> Result<usize> {
212 self.0.approximate_len(self.1)
213 }
214 #[inline]
215 pub fn is_empty(&self) -> Result<bool> {
216 self.0.is_empty(self.1)
217 }
218
219 #[inline]
220 pub fn first(&self) -> Result<Option<(Value, Value)>> {
221 self.iter()?.next().transpose()
222 }
223 #[inline]
224 pub fn get_gt<T: AsRef<[u8]>>(&self, from: T) -> Result<Option<(Value, Value)>> {
225 self.range((Bound::Excluded(from), Bound::Unbounded))?
226 .next()
227 .transpose()
228 }
229
230 #[inline]
232 pub fn insert<T: AsRef<[u8]>, U: AsRef<[u8]>>(&self, key: T, value: U) -> Result<()> {
233 self.0.insert(self.1, key.as_ref(), value.as_ref())
234 }
235 #[inline]
237 pub fn remove<T: AsRef<[u8]>>(&self, key: T) -> Result<()> {
238 self.0.remove(self.1, key.as_ref())
239 }
240 #[inline]
242 pub fn clear(&self) -> Result<()> {
243 self.0.clear(self.1)
244 }
245
246 #[inline]
247 pub fn iter(&self) -> Result<ValueIter<'_>> {
248 self.0.iter(self.1)
249 }
250 #[inline]
251 pub fn iter_rev(&self) -> Result<ValueIter<'_>> {
252 self.0.iter_rev(self.1)
253 }
254
255 #[inline]
256 pub fn range<K, R>(&self, range: R) -> Result<ValueIter<'_>>
257 where
258 K: AsRef<[u8]>,
259 R: RangeBounds<K>,
260 {
261 let sb = range.start_bound();
262 let eb = range.end_bound();
263 self.0.range(self.1, get_bound(sb), get_bound(eb))
264 }
265 #[inline]
266 pub fn range_rev<K, R>(&self, range: R) -> Result<ValueIter<'_>>
267 where
268 K: AsRef<[u8]>,
269 R: RangeBounds<K>,
270 {
271 let sb = range.start_bound();
272 let eb = range.end_bound();
273 self.0.range_rev(self.1, get_bound(sb), get_bound(eb))
274 }
275}
276
277#[allow(clippy::len_without_is_empty)]
278impl<'a> Transaction<'a> {
279 #[inline]
280 pub fn get<T: AsRef<[u8]>>(&self, tree: &Tree, key: T) -> TxOpResult<Option<Value>> {
281 self.tx.get(tree.1, key.as_ref())
282 }
283 #[inline]
284 pub fn len(&self, tree: &Tree) -> TxOpResult<usize> {
285 self.tx.len(tree.1)
286 }
287
288 #[inline]
290 pub fn insert<T: AsRef<[u8]>, U: AsRef<[u8]>>(
291 &mut self,
292 tree: &Tree,
293 key: T,
294 value: U,
295 ) -> TxOpResult<()> {
296 self.tx.insert(tree.1, key.as_ref(), value.as_ref())
297 }
298 #[inline]
300 pub fn remove<T: AsRef<[u8]>>(&mut self, tree: &Tree, key: T) -> TxOpResult<()> {
301 self.tx.remove(tree.1, key.as_ref())
302 }
303 #[inline]
305 pub fn clear(&mut self, tree: &Tree) -> TxOpResult<()> {
306 self.tx.clear(tree.1)
307 }
308
309 #[inline]
310 pub fn iter(&self, tree: &Tree) -> TxOpResult<TxValueIter<'_>> {
311 self.tx.iter(tree.1)
312 }
313 #[inline]
314 pub fn iter_rev(&self, tree: &Tree) -> TxOpResult<TxValueIter<'_>> {
315 self.tx.iter_rev(tree.1)
316 }
317
318 #[inline]
319 pub fn range<K, R>(&self, tree: &Tree, range: R) -> TxOpResult<TxValueIter<'_>>
320 where
321 K: AsRef<[u8]>,
322 R: RangeBounds<K>,
323 {
324 let sb = range.start_bound();
325 let eb = range.end_bound();
326 self.tx.range(tree.1, get_bound(sb), get_bound(eb))
327 }
328 #[inline]
329 pub fn range_rev<K, R>(&self, tree: &Tree, range: R) -> TxOpResult<TxValueIter<'_>>
330 where
331 K: AsRef<[u8]>,
332 R: RangeBounds<K>,
333 {
334 let sb = range.start_bound();
335 let eb = range.end_bound();
336 self.tx.range_rev(tree.1, get_bound(sb), get_bound(eb))
337 }
338
339 #[inline]
340 pub fn on_commit<F: FnOnce() + 'static>(&mut self, f: F) {
341 self.on_commit.push(Box::new(f));
342 }
343}
344
345pub(crate) trait IDb: Send + Sync {
348 fn engine(&self) -> String;
349 fn open_tree(&self, name: &str) -> Result<usize>;
350 fn list_trees(&self) -> Result<Vec<String>>;
351 fn snapshot(&self, path: &PathBuf) -> Result<()>;
352
353 fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value>>;
354 fn approximate_len(&self, tree: usize) -> Result<usize>;
355 fn is_empty(&self, tree: usize) -> Result<bool>;
356
357 fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<()>;
358 fn remove(&self, tree: usize, key: &[u8]) -> Result<()>;
359 fn clear(&self, tree: usize) -> Result<()>;
360
361 fn iter(&self, tree: usize) -> Result<ValueIter<'_>>;
362 fn iter_rev(&self, tree: usize) -> Result<ValueIter<'_>>;
363
364 fn range<'r>(
365 &self,
366 tree: usize,
367 low: Bound<&'r [u8]>,
368 high: Bound<&'r [u8]>,
369 ) -> Result<ValueIter<'_>>;
370 fn range_rev<'r>(
371 &self,
372 tree: usize,
373 low: Bound<&'r [u8]>,
374 high: Bound<&'r [u8]>,
375 ) -> Result<ValueIter<'_>>;
376
377 fn transaction(&self, f: &dyn ITxFn) -> TxResult<OnCommit, ()>;
378}
379
380pub(crate) trait ITx {
381 fn get(&self, tree: usize, key: &[u8]) -> TxOpResult<Option<Value>>;
382 fn len(&self, tree: usize) -> TxOpResult<usize>;
383
384 fn insert(&mut self, tree: usize, key: &[u8], value: &[u8]) -> TxOpResult<()>;
385 fn remove(&mut self, tree: usize, key: &[u8]) -> TxOpResult<()>;
386 fn clear(&mut self, tree: usize) -> TxOpResult<()>;
387
388 fn iter(&self, tree: usize) -> TxOpResult<TxValueIter<'_>>;
389 fn iter_rev(&self, tree: usize) -> TxOpResult<TxValueIter<'_>>;
390
391 fn range<'r>(
392 &self,
393 tree: usize,
394 low: Bound<&'r [u8]>,
395 high: Bound<&'r [u8]>,
396 ) -> TxOpResult<TxValueIter<'_>>;
397 fn range_rev<'r>(
398 &self,
399 tree: usize,
400 low: Bound<&'r [u8]>,
401 high: Bound<&'r [u8]>,
402 ) -> TxOpResult<TxValueIter<'_>>;
403}
404
405pub(crate) trait ITxFn {
406 fn try_on(&self, tx: &mut dyn ITx) -> TxFnResult;
407}
408
409pub(crate) enum TxFnResult {
410 Ok(OnCommit),
411 Abort,
412 DbErr,
413}
414
415struct TxFn<F, R, E>
416where
417 F: Fn(&mut Transaction<'_>) -> TxResult<R, E>,
418{
419 function: F,
420 result: Cell<Option<TxResult<R, E>>>,
421}
422
423impl<F, R, E> ITxFn for TxFn<F, R, E>
424where
425 F: Fn(&mut Transaction<'_>) -> TxResult<R, E>,
426{
427 fn try_on(&self, tx: &mut dyn ITx) -> TxFnResult {
428 let mut tx = Transaction {
429 tx,
430 on_commit: vec![],
431 };
432 let res = (self.function)(&mut tx);
433 let res2 = match &res {
434 Ok(_) => TxFnResult::Ok(tx.on_commit),
435 Err(TxError::Abort(_)) => TxFnResult::Abort,
436 Err(TxError::Db(_)) => TxFnResult::DbErr,
437 };
438 self.result.set(Some(res));
439 res2
440 }
441}
442
443fn get_bound<K: AsRef<[u8]>>(b: Bound<&K>) -> Bound<&[u8]> {
446 match b {
447 Bound::Included(v) => Bound::Included(v.as_ref()),
448 Bound::Excluded(v) => Bound::Excluded(v.as_ref()),
449 Bound::Unbounded => Bound::Unbounded,
450 }
451}