cozo_ce/storage/
mod.rs

1/*
2 * Copyright 2022, The Cozo Project Authors.
3 *
4 * This Source Code Form is subject to the terms of the Mozilla Public License, v. 2.0.
5 * If a copy of the MPL was not distributed with this file,
6 * You can obtain one at https://mozilla.org/MPL/2.0/.
7 */
8
9use itertools::Itertools;
10use miette::Result;
11
12use crate::data::tuple::Tuple;
13use crate::data::value::ValidityTs;
14use crate::decode_tuple_from_kv;
15
16pub(crate) mod mem;
17#[cfg(feature = "storage-new-rocksdb")]
18pub mod newrocks;
19#[cfg(feature = "storage-rocksdb")]
20pub(crate) mod rocks;
21#[cfg(feature = "storage-sled")]
22pub(crate) mod sled;
23#[cfg(feature = "storage-sqlite")]
24pub(crate) mod sqlite;
25pub(crate) mod temp;
26#[cfg(feature = "storage-tikv")]
27pub(crate) mod tikv;
28// pub(crate) mod re;
29
30/// Swappable storage trait for Cozo's storage engine
31pub trait Storage<'s>: Send + Sync + Clone {
32    /// The associated transaction type used by this engine
33    type Tx: StoreTx<'s>;
34
35    /// Returns a string that identifies the storage kind
36    fn storage_kind(&self) -> &'static str;
37
38    /// Create a transaction object. Write ops will only be called when `write == true`.
39    fn transact(&'s self, write: bool) -> Result<Self::Tx>;
40
41    /// Compact the key range. Can be a no-op if the storage engine does not
42    /// have the concept of compaction.
43    fn range_compact(&'s self, lower: &[u8], upper: &[u8]) -> Result<()>;
44
45    /// Put multiple key-value pairs into the database.
46    /// No duplicate data will be sent, and the order data come in is strictly ascending.
47    /// There will be no other access to the database while this function is running.
48    fn batch_put<'a>(
49        &'a self,
50        data: Box<dyn Iterator<Item = Result<(Vec<u8>, Vec<u8>)>> + 'a>,
51    ) -> Result<()>;
52}
53
54/// Trait for the associated transaction type of a storage engine.
55/// A transaction needs to guarantee MVCC semantics for all operations.
56pub trait StoreTx<'s>: Sync {
57    /// Get a key. If `for_update` is `true` (only possible in a write transaction),
58    /// then the database needs to guarantee that `commit()` can only succeed if
59    /// the key has not been modified outside the transaction.
60    fn get(&self, key: &[u8], for_update: bool) -> Result<Option<Vec<u8>>>;
61
62    /// Get multiple keys. If `for_update` is `true` (only possible in a write transaction),
63    /// then the database needs to guarantee that `commit()` can only succeed if
64    /// the keys have not been modified outside the transaction.
65    fn multi_get(&self, keys: &[Vec<u8>], for_update: bool) -> Result<Vec<Option<Vec<u8>>>> {
66        keys.iter().map(|k| self.get(k, for_update)).collect()
67    }
68
69    /// Put a key-value pair into the storage. In case of existing key,
70    /// the storage engine needs to overwrite the old value.
71    fn put(&mut self, key: &[u8], val: &[u8]) -> Result<()>;
72
73    /// Should return true if the engine supports parallel put, false otherwise.
74    fn supports_par_put(&self) -> bool;
75
76    /// Put a key-value pair into the storage. In case of existing key,
77    /// the storage engine needs to overwrite the old value.
78    /// The difference between this one and `put` is the mutability of self.
79    /// It is OK to always panic if `supports_par_put` returns `false`.
80    fn par_put(&self, _key: &[u8], _val: &[u8]) -> Result<()> {
81        panic!("par_put is not supported")
82    }
83
84    /// Delete a key-value pair from the storage.
85    fn del(&mut self, key: &[u8]) -> Result<()>;
86
87    /// Delete a key-value pair from the storage.
88    /// The difference between this one and `del` is the mutability of self.
89    /// It is OK to always panic if `supports_par_put` returns `false`.
90    fn par_del(&self, _key: &[u8]) -> Result<()> {
91        panic!("par_del is not supported")
92    }
93
94    /// Delete a range from persisted data only.
95    fn del_range_from_persisted(&mut self, lower: &[u8], upper: &[u8]) -> Result<()>;
96
97    /// Check if a key exists. If `for_update` is `true` (only possible in a write transaction),
98    /// then the database needs to guarantee that `commit()` can only succeed if
99    /// the key has not been modified outside the transaction.
100    fn exists(&self, key: &[u8], for_update: bool) -> Result<bool>;
101
102    /// Commit a transaction. Must return an `Err` if MVCC consistency cannot be guaranteed,
103    /// and discard all changes introduced by this transaction.
104    fn commit(&mut self) -> Result<()>;
105
106    /// Scan on a range. `lower` is inclusive whereas `upper` is exclusive.
107    /// The default implementation calls [`range_scan_owned`](Self::range_scan) and converts the results.
108    ///
109    /// The implementation must call [`decode_tuple_from_kv`](crate::decode_tuple_from_kv) to obtain
110    /// a decoded tuple in the loop of the iterator.
111    fn range_scan_tuple<'a>(
112        &'a self,
113        lower: &[u8],
114        upper: &[u8],
115    ) -> Box<dyn Iterator<Item = Result<Tuple>> + 'a>
116    where
117        's: 'a,
118    {
119        let it = self.range_scan(lower, upper);
120        Box::new(it.map_ok(|(k, v)| decode_tuple_from_kv(&k, &v, None)))
121    }
122
123    /// Scan on a range with a certain validity.
124    ///
125    /// `lower` is inclusive whereas `upper` is exclusive.
126    /// For tuples that differ only with respect to their validity, which must be at
127    /// the last slot of the key,
128    /// only the tuple that has validity equal to or earlier than (i.e. greater by the comparator)
129    /// `valid_at` should be considered for returning, and only those with an assertive validity
130    /// should be returned. Every other tuple should be skipped.
131    ///
132    /// Ideally, implementations should take advantage of seeking capabilities of the
133    /// underlying storage so that not every tuple within the `lower` and `upper` range
134    /// need to be looked at.
135    ///
136    /// For custom implementations, it is OK to return an iterator that always error out,
137    /// in which case the database with the engine does not support time travelling.
138    /// You should indicate this clearly in your error message.
139    fn range_skip_scan_tuple<'a>(
140        &'a self,
141        lower: &[u8],
142        upper: &[u8],
143        valid_at: ValidityTs,
144    ) -> Box<dyn Iterator<Item = Result<Tuple>> + 'a>;
145
146    /// Scan on a range and return the raw results.
147    /// `lower` is inclusive whereas `upper` is exclusive.
148    fn range_scan<'a>(
149        &'a self,
150        lower: &[u8],
151        upper: &[u8],
152    ) -> Box<dyn Iterator<Item = Result<(Vec<u8>, Vec<u8>)>> + 'a>
153    where
154        's: 'a;
155
156    /// Return the number of rows in the range.
157    fn range_count<'a>(&'a self, lower: &[u8], upper: &[u8]) -> Result<usize>
158    where
159        's: 'a;
160
161    /// Scan for all rows. The rows are required to be in ascending order.
162    fn total_scan<'a>(&'a self) -> Box<dyn Iterator<Item = Result<(Vec<u8>, Vec<u8>)>> + 'a>
163    where
164        's: 'a;
165}