garage_db/
lib.rs

1#[macro_use]
2extern crate tracing;
3
4#[cfg(feature = "fjall")]
5pub mod fjall_adapter;
6#[cfg(feature = "lmdb")]
7pub mod lmdb_adapter;
8#[cfg(feature = "sqlite")]
9pub mod sqlite_adapter;
10
11pub mod open;
12
13#[cfg(test)]
14pub mod test;
15
16use core::ops::{Bound, RangeBounds};
17
18use std::borrow::Cow;
19use std::cell::Cell;
20use std::path::PathBuf;
21use std::sync::Arc;
22
23use err_derive::Error;
24
25pub use open::*;
26
27pub(crate) type OnCommit = Vec<Box<dyn FnOnce()>>;
28
29#[derive(Clone)]
30pub struct Db(pub(crate) Arc<dyn IDb>);
31
32pub struct Transaction<'a> {
33	tx: &'a mut dyn ITx,
34	on_commit: OnCommit,
35}
36
37#[derive(Clone)]
38pub struct Tree(Arc<dyn IDb>, usize);
39
40pub type Value = Vec<u8>;
41pub type ValueIter<'a> = Box<dyn std::iter::Iterator<Item = Result<(Value, Value)>> + 'a>;
42pub type TxValueIter<'a> = Box<dyn std::iter::Iterator<Item = TxOpResult<(Value, Value)>> + 'a>;
43
44// ----
45
46#[derive(Debug, Error)]
47#[error(display = "{}", _0)]
48pub struct Error(pub Cow<'static, str>);
49
50impl From<std::io::Error> for Error {
51	fn from(e: std::io::Error) -> Error {
52		Error(format!("IO: {}", e).into())
53	}
54}
55
56pub type Result<T> = std::result::Result<T, Error>;
57
58#[derive(Debug, Error)]
59#[error(display = "{}", _0)]
60pub struct TxOpError(pub(crate) Error);
61pub type TxOpResult<T> = std::result::Result<T, TxOpError>;
62
63#[derive(Debug)]
64pub enum TxError<E> {
65	Abort(E),
66	Db(Error),
67}
68pub type TxResult<R, E> = std::result::Result<R, TxError<E>>;
69
70impl<E> From<TxOpError> for TxError<E> {
71	fn from(e: TxOpError) -> TxError<E> {
72		TxError::Db(e.0)
73	}
74}
75
76pub fn unabort<R, E>(res: TxResult<R, E>) -> TxOpResult<std::result::Result<R, E>> {
77	match res {
78		Ok(v) => Ok(Ok(v)),
79		Err(TxError::Abort(e)) => Ok(Err(e)),
80		Err(TxError::Db(e)) => Err(TxOpError(e)),
81	}
82}
83
84// ----
85
86impl Db {
87	pub fn engine(&self) -> String {
88		self.0.engine()
89	}
90
91	pub fn open_tree<S: AsRef<str>>(&self, name: S) -> Result<Tree> {
92		let tree_id = self.0.open_tree(name.as_ref())?;
93		Ok(Tree(self.0.clone(), tree_id))
94	}
95
96	pub fn list_trees(&self) -> Result<Vec<String>> {
97		self.0.list_trees()
98	}
99
100	pub fn transaction<R, E, F>(&self, fun: F) -> TxResult<R, E>
101	where
102		F: Fn(&mut Transaction<'_>) -> TxResult<R, E>,
103	{
104		let f = TxFn {
105			function: fun,
106			result: Cell::new(None),
107		};
108		let tx_res = self.0.transaction(&f);
109		let fn_res = f.result.into_inner();
110
111		match (tx_res, fn_res) {
112			(Ok(on_commit), Some(Ok(value))) => {
113				// Transaction succeeded
114				// TxFn stored the value to return to the user in fn_res
115				// tx_res contains the on_commit list of callbacks, run them now
116				on_commit.into_iter().for_each(|f| f());
117				Ok(value)
118			}
119			(Err(TxError::Abort(())), Some(Err(TxError::Abort(e)))) => {
120				// Transaction was aborted by user code
121				// The abort error value is stored in fn_res
122				Err(TxError::Abort(e))
123			}
124			(Err(TxError::Db(_tx_e)), Some(Err(TxError::Db(fn_e)))) => {
125				// Transaction encountered a DB error in user code
126				// The error value encountered is the one in fn_res,
127				// tx_res contains only a dummy error message
128				Err(TxError::Db(fn_e))
129			}
130			(Err(TxError::Db(tx_e)), None) => {
131				// Transaction encounterred a DB error when initializing the transaction,
132				// before user code was called
133				Err(TxError::Db(tx_e))
134			}
135			(Err(TxError::Db(tx_e)), Some(Ok(_))) => {
136				// Transaction encounterred a DB error when commiting the transaction,
137				// after user code was called
138				Err(TxError::Db(tx_e))
139			}
140			(tx_res, fn_res) => {
141				panic!(
142					"unexpected error case: tx_res={:?}, fn_res={:?}",
143					tx_res.map(|_| "..."),
144					fn_res.map(|x| x.map(|_| "...").map_err(|_| "..."))
145				);
146			}
147		}
148	}
149
150	pub fn snapshot(&self, path: &PathBuf) -> Result<()> {
151		self.0.snapshot(path)
152	}
153
154	pub fn import(&self, other: &Db) -> Result<()> {
155		let existing_trees = self.list_trees()?;
156		if !existing_trees.is_empty() {
157			return Err(Error(
158				format!(
159					"destination database already contains data: {:?}",
160					existing_trees
161				)
162				.into(),
163			));
164		}
165
166		let tree_names = other.list_trees()?;
167		for name in tree_names {
168			let tree = self.open_tree(&name)?;
169			if !tree.is_empty()? {
170				return Err(Error(format!("tree {} already contains data", name).into()));
171			}
172
173			let ex_tree = other.open_tree(&name)?;
174
175			let tx_res = self.transaction(|tx| {
176				let mut i = 0;
177				for item in ex_tree.iter().map_err(TxError::Abort)? {
178					let (k, v) = item.map_err(TxError::Abort)?;
179					tx.insert(&tree, k, v)?;
180					i += 1;
181					if i % 1000 == 0 {
182						println!("{}: imported {}", name, i);
183					}
184				}
185				Ok(i)
186			});
187			let total = match tx_res {
188				Err(TxError::Db(e)) => return Err(e),
189				Err(TxError::Abort(e)) => return Err(e),
190				Ok(x) => x,
191			};
192
193			println!("{}: finished importing, {} items", name, total);
194		}
195		Ok(())
196	}
197}
198
199#[allow(clippy::len_without_is_empty)]
200impl Tree {
201	#[inline]
202	pub fn db(&self) -> Db {
203		Db(self.0.clone())
204	}
205
206	#[inline]
207	pub fn get<T: AsRef<[u8]>>(&self, key: T) -> Result<Option<Value>> {
208		self.0.get(self.1, key.as_ref())
209	}
210	#[inline]
211	pub fn approximate_len(&self) -> Result<usize> {
212		self.0.approximate_len(self.1)
213	}
214	#[inline]
215	pub fn is_empty(&self) -> Result<bool> {
216		self.0.is_empty(self.1)
217	}
218
219	#[inline]
220	pub fn first(&self) -> Result<Option<(Value, Value)>> {
221		self.iter()?.next().transpose()
222	}
223	#[inline]
224	pub fn get_gt<T: AsRef<[u8]>>(&self, from: T) -> Result<Option<(Value, Value)>> {
225		self.range((Bound::Excluded(from), Bound::Unbounded))?
226			.next()
227			.transpose()
228	}
229
230	/// Returns the old value if there was one
231	#[inline]
232	pub fn insert<T: AsRef<[u8]>, U: AsRef<[u8]>>(&self, key: T, value: U) -> Result<()> {
233		self.0.insert(self.1, key.as_ref(), value.as_ref())
234	}
235	/// Returns the old value if there was one
236	#[inline]
237	pub fn remove<T: AsRef<[u8]>>(&self, key: T) -> Result<()> {
238		self.0.remove(self.1, key.as_ref())
239	}
240	/// Clears all values from the tree
241	#[inline]
242	pub fn clear(&self) -> Result<()> {
243		self.0.clear(self.1)
244	}
245
246	#[inline]
247	pub fn iter(&self) -> Result<ValueIter<'_>> {
248		self.0.iter(self.1)
249	}
250	#[inline]
251	pub fn iter_rev(&self) -> Result<ValueIter<'_>> {
252		self.0.iter_rev(self.1)
253	}
254
255	#[inline]
256	pub fn range<K, R>(&self, range: R) -> Result<ValueIter<'_>>
257	where
258		K: AsRef<[u8]>,
259		R: RangeBounds<K>,
260	{
261		let sb = range.start_bound();
262		let eb = range.end_bound();
263		self.0.range(self.1, get_bound(sb), get_bound(eb))
264	}
265	#[inline]
266	pub fn range_rev<K, R>(&self, range: R) -> Result<ValueIter<'_>>
267	where
268		K: AsRef<[u8]>,
269		R: RangeBounds<K>,
270	{
271		let sb = range.start_bound();
272		let eb = range.end_bound();
273		self.0.range_rev(self.1, get_bound(sb), get_bound(eb))
274	}
275}
276
277#[allow(clippy::len_without_is_empty)]
278impl<'a> Transaction<'a> {
279	#[inline]
280	pub fn get<T: AsRef<[u8]>>(&self, tree: &Tree, key: T) -> TxOpResult<Option<Value>> {
281		self.tx.get(tree.1, key.as_ref())
282	}
283	#[inline]
284	pub fn len(&self, tree: &Tree) -> TxOpResult<usize> {
285		self.tx.len(tree.1)
286	}
287
288	/// Returns the old value if there was one
289	#[inline]
290	pub fn insert<T: AsRef<[u8]>, U: AsRef<[u8]>>(
291		&mut self,
292		tree: &Tree,
293		key: T,
294		value: U,
295	) -> TxOpResult<()> {
296		self.tx.insert(tree.1, key.as_ref(), value.as_ref())
297	}
298	/// Returns the old value if there was one
299	#[inline]
300	pub fn remove<T: AsRef<[u8]>>(&mut self, tree: &Tree, key: T) -> TxOpResult<()> {
301		self.tx.remove(tree.1, key.as_ref())
302	}
303	/// Clears all values in a tree
304	#[inline]
305	pub fn clear(&mut self, tree: &Tree) -> TxOpResult<()> {
306		self.tx.clear(tree.1)
307	}
308
309	#[inline]
310	pub fn iter(&self, tree: &Tree) -> TxOpResult<TxValueIter<'_>> {
311		self.tx.iter(tree.1)
312	}
313	#[inline]
314	pub fn iter_rev(&self, tree: &Tree) -> TxOpResult<TxValueIter<'_>> {
315		self.tx.iter_rev(tree.1)
316	}
317
318	#[inline]
319	pub fn range<K, R>(&self, tree: &Tree, range: R) -> TxOpResult<TxValueIter<'_>>
320	where
321		K: AsRef<[u8]>,
322		R: RangeBounds<K>,
323	{
324		let sb = range.start_bound();
325		let eb = range.end_bound();
326		self.tx.range(tree.1, get_bound(sb), get_bound(eb))
327	}
328	#[inline]
329	pub fn range_rev<K, R>(&self, tree: &Tree, range: R) -> TxOpResult<TxValueIter<'_>>
330	where
331		K: AsRef<[u8]>,
332		R: RangeBounds<K>,
333	{
334		let sb = range.start_bound();
335		let eb = range.end_bound();
336		self.tx.range_rev(tree.1, get_bound(sb), get_bound(eb))
337	}
338
339	#[inline]
340	pub fn on_commit<F: FnOnce() + 'static>(&mut self, f: F) {
341		self.on_commit.push(Box::new(f));
342	}
343}
344
345// ---- Internal interfaces
346
347pub(crate) trait IDb: Send + Sync {
348	fn engine(&self) -> String;
349	fn open_tree(&self, name: &str) -> Result<usize>;
350	fn list_trees(&self) -> Result<Vec<String>>;
351	fn snapshot(&self, path: &PathBuf) -> Result<()>;
352
353	fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value>>;
354	fn approximate_len(&self, tree: usize) -> Result<usize>;
355	fn is_empty(&self, tree: usize) -> Result<bool>;
356
357	fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<()>;
358	fn remove(&self, tree: usize, key: &[u8]) -> Result<()>;
359	fn clear(&self, tree: usize) -> Result<()>;
360
361	fn iter(&self, tree: usize) -> Result<ValueIter<'_>>;
362	fn iter_rev(&self, tree: usize) -> Result<ValueIter<'_>>;
363
364	fn range<'r>(
365		&self,
366		tree: usize,
367		low: Bound<&'r [u8]>,
368		high: Bound<&'r [u8]>,
369	) -> Result<ValueIter<'_>>;
370	fn range_rev<'r>(
371		&self,
372		tree: usize,
373		low: Bound<&'r [u8]>,
374		high: Bound<&'r [u8]>,
375	) -> Result<ValueIter<'_>>;
376
377	fn transaction(&self, f: &dyn ITxFn) -> TxResult<OnCommit, ()>;
378}
379
380pub(crate) trait ITx {
381	fn get(&self, tree: usize, key: &[u8]) -> TxOpResult<Option<Value>>;
382	fn len(&self, tree: usize) -> TxOpResult<usize>;
383
384	fn insert(&mut self, tree: usize, key: &[u8], value: &[u8]) -> TxOpResult<()>;
385	fn remove(&mut self, tree: usize, key: &[u8]) -> TxOpResult<()>;
386	fn clear(&mut self, tree: usize) -> TxOpResult<()>;
387
388	fn iter(&self, tree: usize) -> TxOpResult<TxValueIter<'_>>;
389	fn iter_rev(&self, tree: usize) -> TxOpResult<TxValueIter<'_>>;
390
391	fn range<'r>(
392		&self,
393		tree: usize,
394		low: Bound<&'r [u8]>,
395		high: Bound<&'r [u8]>,
396	) -> TxOpResult<TxValueIter<'_>>;
397	fn range_rev<'r>(
398		&self,
399		tree: usize,
400		low: Bound<&'r [u8]>,
401		high: Bound<&'r [u8]>,
402	) -> TxOpResult<TxValueIter<'_>>;
403}
404
405pub(crate) trait ITxFn {
406	fn try_on(&self, tx: &mut dyn ITx) -> TxFnResult;
407}
408
409pub(crate) enum TxFnResult {
410	Ok(OnCommit),
411	Abort,
412	DbErr,
413}
414
415struct TxFn<F, R, E>
416where
417	F: Fn(&mut Transaction<'_>) -> TxResult<R, E>,
418{
419	function: F,
420	result: Cell<Option<TxResult<R, E>>>,
421}
422
423impl<F, R, E> ITxFn for TxFn<F, R, E>
424where
425	F: Fn(&mut Transaction<'_>) -> TxResult<R, E>,
426{
427	fn try_on(&self, tx: &mut dyn ITx) -> TxFnResult {
428		let mut tx = Transaction {
429			tx,
430			on_commit: vec![],
431		};
432		let res = (self.function)(&mut tx);
433		let res2 = match &res {
434			Ok(_) => TxFnResult::Ok(tx.on_commit),
435			Err(TxError::Abort(_)) => TxFnResult::Abort,
436			Err(TxError::Db(_)) => TxFnResult::DbErr,
437		};
438		self.result.set(Some(res));
439		res2
440	}
441}
442
443// ----
444
445fn get_bound<K: AsRef<[u8]>>(b: Bound<&K>) -> Bound<&[u8]> {
446	match b {
447		Bound::Included(v) => Bound::Included(v.as_ref()),
448		Bound::Excluded(v) => Bound::Excluded(v.as_ref()),
449		Bound::Unbounded => Bound::Unbounded,
450	}
451}