keyvaluedb/
lib.rs

1//! Key-Value store abstraction.
2
3#![deny(clippy::all)]
4
5mod io_stats;
6
7pub use io_stats::{IoStats, Kind as IoStatsKind};
8use std::future::Future;
9use std::io;
10use std::pin::Pin;
11
12/// Required length of prefixes.
13pub const PREFIX_LEN: usize = 12;
14
15/// Database value.
16pub type DBValue = Vec<u8>;
17pub type DBKey = Vec<u8>;
18pub type DBKeyValue = (DBKey, DBValue);
19pub type DBKeyRef<'a> = &'a DBKey;
20pub type DBKeyValueRef<'a> = (&'a DBKey, &'a DBValue);
21
22/// Write transaction. Batches a sequence of put/delete operations for efficiency.
23#[derive(Default, Debug, Clone, Eq, PartialEq)]
24pub struct DBTransaction {
25    /// Database operations.
26    pub ops: Vec<DBOp>,
27}
28
29/// Database operation.
30#[derive(Clone, Debug, PartialEq, Eq)]
31pub enum DBOp {
32    Insert {
33        col: u32,
34        key: DBKey,
35        value: DBValue,
36    },
37    Delete {
38        col: u32,
39        key: DBKey,
40    },
41    DeletePrefix {
42        col: u32,
43        prefix: DBKey,
44    },
45}
46
47impl DBOp {
48    /// Returns the key associated with this operation.
49    pub fn key(&self) -> &DBKey {
50        match *self {
51            DBOp::Insert { ref key, .. } => key,
52            DBOp::Delete { ref key, .. } => key,
53            DBOp::DeletePrefix { ref prefix, .. } => prefix,
54        }
55    }
56    /// Returns the value associated with this operation.
57    pub fn value(&self) -> Option<&DBValue> {
58        match *self {
59            DBOp::Insert { ref value, .. } => Some(value),
60            DBOp::Delete { .. } => None,
61            DBOp::DeletePrefix { .. } => None,
62        }
63    }
64    /// Returns the column associated with this operation.
65    pub fn col(&self) -> u32 {
66        match *self {
67            DBOp::Insert { col, .. } => col,
68            DBOp::Delete { col, .. } => col,
69            DBOp::DeletePrefix { col, .. } => col,
70        }
71    }
72}
73
74impl DBTransaction {
75    /// Create new transaction.
76    pub fn new() -> DBTransaction {
77        DBTransaction::with_capacity(256)
78    }
79
80    /// Create new transaction with capacity.
81    pub fn with_capacity(cap: usize) -> DBTransaction {
82        DBTransaction {
83            ops: Vec::with_capacity(cap),
84        }
85    }
86
87    /// Insert a key-value pair in the transaction. Any existing value will be overwritten upon write.
88    pub fn put<K, V>(&mut self, col: u32, key: K, value: V)
89    where
90        K: AsRef<[u8]>,
91        V: AsRef<[u8]>,
92    {
93        self.ops.push(DBOp::Insert {
94            col,
95            key: DBKey::from(key.as_ref()),
96            value: DBValue::from(value.as_ref()),
97        })
98    }
99    pub fn put_owned(&mut self, col: u32, key: Vec<u8>, value: Vec<u8>) {
100        self.ops.push(DBOp::Insert { col, key, value })
101    }
102
103    /// Delete value by key.
104    pub fn delete<K>(&mut self, col: u32, key: K)
105    where
106        K: AsRef<[u8]>,
107    {
108        self.ops.push(DBOp::Delete {
109            col,
110            key: DBKey::from(key.as_ref()),
111        });
112    }
113    pub fn delete_owned(&mut self, col: u32, key: Vec<u8>) {
114        self.ops.push(DBOp::Delete { col, key });
115    }
116
117    /// Delete all values with the given key prefix.
118    /// Using an empty prefix here will remove all keys
119    /// (all keys start with the empty prefix).
120    pub fn delete_prefix<K>(&mut self, col: u32, prefix: K)
121    where
122        K: AsRef<[u8]>,
123    {
124        self.ops.push(DBOp::DeletePrefix {
125            col,
126            prefix: DBKey::from(prefix.as_ref()),
127        });
128    }
129    pub fn delete_prefix_owned(&mut self, col: u32, prefix: Vec<u8>) {
130        self.ops.push(DBOp::DeletePrefix { col, prefix });
131    }
132}
133
134/// Transaction Result, returns the transaction unchanged upon error
135#[derive(Debug)]
136pub struct DBTransactionError {
137    pub error: io::Error,
138    pub transaction: DBTransaction,
139}
140
141impl std::fmt::Display for DBTransactionError {
142    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
143        f.debug_struct("TransactionError")
144            .field("error", &self.error)
145            .finish()
146    }
147}
148impl std::error::Error for DBTransactionError {}
149
150/// Generic key-value database.
151///
152/// The `KeyValueDB` deals with "column families", which can be thought of as distinct
153/// stores within a database. Keys written in one column family will not be accessible from
154/// any other. The number of column families must be specified at initialization, with a
155/// differing interface for each database.
156///
157/// The API laid out here, along with the `Sync` bound implies interior synchronization for
158/// implementation.
159/// Clone is here so we can pass an owned self to async functions, requiring interior locked mutability.
160pub trait KeyValueDB: Sync + Send + Clone + 'static {
161    /// Helper to create a new transaction.
162    fn transaction(&self) -> DBTransaction {
163        DBTransaction::new()
164    }
165
166    /// Get a value by key.
167    fn get<'a>(
168        &self,
169        col: u32,
170        key: &'a [u8],
171    ) -> Pin<Box<dyn Future<Output = io::Result<Option<DBValue>>> + Send + 'a>>;
172
173    /// Remove a value by key, returning the old value
174    fn delete<'a>(
175        &self,
176        col: u32,
177        key: &'a [u8],
178    ) -> Pin<Box<dyn Future<Output = io::Result<Option<DBValue>>> + Send + 'a>>;
179
180    /// Write a transaction of changes to the backing store.
181    fn write(
182        &self,
183        transaction: DBTransaction,
184    ) -> Pin<Box<dyn Future<Output = Result<(), DBTransactionError>> + Send>>;
185
186    /// Iterate over the data for a given column.
187    /// Return all key/value pairs, optionally where the key starts with the given prefix.
188    /// Iterator closure returns true for more items, false to stop iteration.
189    fn iter<'a, T: 'a, F: FnMut(DBKeyValueRef) -> io::Result<Option<T>> + Send + Sync + 'a>(
190        &self,
191        col: u32,
192        prefix: Option<&'a [u8]>,
193        f: F,
194    ) -> Pin<Box<dyn Future<Output = io::Result<Option<T>>> + Send + 'a>>;
195
196    /// Iterate over the data for a given column.
197    /// Return all keys, optionally where the key starts with the given prefix.
198    /// Iterator closure returns true for more items, false to stop iteration.
199    fn iter_keys<'a, T: 'a, F: FnMut(DBKeyRef) -> io::Result<Option<T>> + Send + Sync + 'a>(
200        &self,
201        col: u32,
202        prefix: Option<&'a [u8]>,
203        f: F,
204    ) -> Pin<Box<dyn Future<Output = io::Result<Option<T>>> + Send + 'a>>;
205
206    /// Attempt to replace this database with a new one located at the given path.
207    fn restore(&self, new_db: &str) -> io::Result<()>;
208
209    /// Query statistics.
210    ///
211    /// Not all keyvaluedb implementations are able or expected to implement this, so by
212    /// default, empty statistics is returned. Also, not all keyvaluedb implementation
213    /// can return every statistic or configured to do so (some statistics gathering
214    /// may impede the performance and might be off by default).
215    fn io_stats(&self, _kind: IoStatsKind) -> IoStats {
216        IoStats::empty()
217    }
218
219    /// The number of column families in the db.
220    fn num_columns(&self) -> io::Result<u32>;
221
222    /// The number of keys in a column (estimated).
223    fn num_keys(&self, col: u32) -> Pin<Box<dyn Future<Output = io::Result<u64>> + Send>>;
224
225    /// Check for the existence of a value by key.
226    fn has_key<'a>(
227        &self,
228        col: u32,
229        key: &'a [u8],
230    ) -> Pin<Box<dyn Future<Output = io::Result<bool>> + Send + 'a>> {
231        let this = self.clone();
232        Box::pin(async move { Ok(this.get(col, key).await?.is_some()) })
233    }
234
235    /// Check for the existence of a value by prefix.
236    fn has_prefix<'a>(
237        &self,
238        col: u32,
239        prefix: &'a [u8],
240    ) -> Pin<Box<dyn Future<Output = io::Result<bool>> + Send + 'a>> {
241        let this = self.clone();
242        Box::pin(async move {
243            Ok(this
244                .iter_keys(col, Some(prefix), |_| Ok(Some(())))
245                .await?
246                .is_some())
247        })
248    }
249
250    /// Get the first value matching the given prefix.
251    fn first_with_prefix<'a>(
252        &self,
253        col: u32,
254        prefix: &'a [u8],
255    ) -> Pin<Box<dyn Future<Output = io::Result<Option<DBKeyValue>>> + Send + 'a>> {
256        let this = self.clone();
257        Box::pin(async move {
258            let out = this
259                .iter(col, Some(prefix), |(k, v)| {
260                    Ok(Some((k.to_vec(), v.to_vec())))
261                })
262                .await?;
263            Ok(out)
264        })
265    }
266}
267
268/// For a given start prefix (inclusive), returns the correct end prefix (non-inclusive).
269/// This assumes the key bytes are ordered in lexicographical order.
270/// Since key length is not limited, for some case we return `None` because there is
271/// no bounded limit (every keys in the serie `[]`, `[255]`, `[255, 255]` ...).
272pub fn end_prefix(prefix: &[u8]) -> Option<Vec<u8>> {
273    let mut end_range = prefix.to_vec();
274    while let Some(0xff) = end_range.last() {
275        end_range.pop();
276    }
277    if let Some(byte) = end_range.last_mut() {
278        *byte += 1;
279        Some(end_range)
280    } else {
281        None
282    }
283}
284
285#[cfg(test)]
286mod test {
287    use super::end_prefix;
288
289    #[test]
290    fn end_prefix_test() {
291        assert_eq!(end_prefix(&[5, 6, 7]), Some(vec![5, 6, 8]));
292        assert_eq!(end_prefix(&[5, 6, 255]), Some(vec![5, 7]));
293        // This is not equal as the result is before start.
294        assert_ne!(end_prefix(&[5, 255, 255]), Some(vec![5, 255]));
295        // This is equal ([5, 255] will not be deleted because
296        // it is before start).
297        assert_eq!(end_prefix(&[5, 255, 255]), Some(vec![6]));
298        assert_eq!(end_prefix(&[255, 255, 255]), None);
299
300        assert_eq!(end_prefix(&[0x00, 0xff]), Some(vec![0x01]));
301        assert_eq!(end_prefix(&[0xff]), None);
302        assert_eq!(end_prefix(&[]), None);
303        assert_eq!(end_prefix(b"0"), Some(b"1".to_vec()));
304    }
305}