basteh_redb/
lib.rs

1use std::time::Duration;
2
3use basteh::{
4    dev::{OwnedValue, Provider, Value},
5    BastehError,
6};
7use inner::RedbInner;
8use message::{Message, Request, Response};
9
10mod delayqueue;
11mod flags;
12mod inner;
13mod message;
14mod value;
15
16/// Reexport of redb Database, to make sure we're using the same version
17pub use redb::Database;
18
19/// An implementation of [`ExpiryStore`](basteh::dev::ExpiryStore) using sled with tokio's blocking
20/// tasksZ
21///
22/// It stores expiration data as the value's suffix in sled, using byteorder, so to share data this actor
23/// creates with other programs outside of its scope, you need to remove the suffix of it exported as
24/// [`ExpiryFlags`](struct.ExpiryFlags.html), or directly use encode/decode methods provided.
25///
26/// ## Example
27/// ```no_run
28/// use basteh::Basteh;
29/// use basteh_redb::{RedbBackend, Database};
30///
31/// const THREADS_NUMBER: usize = 4;
32///
33/// # async fn your_main() {
34/// let db = Database::open("/tmp/test.db").expect("Couldn't open sled database");
35/// let provider = RedbBackend::from_db(db).start(THREADS_NUMBER);
36/// let storage = Basteh::build().provider(provider).finish();
37/// # }
38/// ```
39///
40#[derive(Clone)]
41pub struct RedbBackend<T = ()> {
42    inner: T,
43
44    perform_deletion: bool,
45    scan_db_on_start: bool,
46}
47
48impl RedbBackend<()> {
49    #[must_use = "Should be started by calling start method"]
50    pub fn from_db(db: redb::Database) -> RedbBackend<redb::Database> {
51        RedbBackend {
52            inner: db,
53            perform_deletion: false,
54            scan_db_on_start: false,
55        }
56    }
57}
58
59impl<T> RedbBackend<T> {
60    /// If set to true, it will perform real deletion when an item expires instead of soft deleting it,
61    /// it requires a seprate thread(in tokio threadpool) for expiration notification.
62    #[must_use = "Should be started by calling start method"]
63    pub fn perform_deletion(mut self, to: bool) -> Self {
64        self.perform_deletion = to;
65        self
66    }
67
68    /// If set to true, actor will scan the database on start to mark expired items.
69    #[must_use = "Should be started by calling start method"]
70    pub fn scan_db_on_start(mut self, to: bool) -> Self {
71        self.scan_db_on_start = to;
72        self
73    }
74}
75
76impl RedbBackend<redb::Database> {
77    pub fn start(self, thread_num: usize) -> RedbBackend<crossbeam_channel::Sender<Message>> {
78        let mut inner = RedbInner::from_db(self.inner);
79        let (tx, rx) = crossbeam_channel::bounded(4096);
80
81        if self.scan_db_on_start && self.perform_deletion {
82            inner.scan_db().ok();
83        }
84
85        if self.perform_deletion {
86            inner.spawn_expiry_thread();
87        }
88
89        for _ in 0..thread_num {
90            let mut inner = inner.clone();
91            let rx = rx.clone();
92            tokio::task::spawn_blocking(move || {
93                inner.listen(rx);
94            });
95        }
96
97        RedbBackend {
98            inner: tx,
99            perform_deletion: false,
100            scan_db_on_start: false,
101        }
102    }
103}
104
105impl RedbBackend<crossbeam_channel::Sender<Message>> {
106    async fn msg(&self, req: Request) -> basteh::Result<Response> {
107        let (tx, rx) = tokio::sync::oneshot::channel();
108
109        self.inner
110            .try_send(Message { req, tx })
111            .map_err(BastehError::custom)?;
112        rx.await.map_err(BastehError::custom)?
113    }
114}
115
116#[async_trait::async_trait]
117impl Provider for RedbBackend<crossbeam_channel::Sender<Message>> {
118    async fn keys(&self, scope: &str) -> basteh::Result<Box<dyn Iterator<Item = Vec<u8>>>> {
119        match self.msg(Request::Keys(scope.into())).await? {
120            Response::Iterator(r) => Ok(r),
121            _ => unreachable!(),
122        }
123    }
124
125    async fn set(&self, scope: &str, key: &[u8], value: Value<'_>) -> basteh::Result<()> {
126        match self
127            .msg(Request::Set(scope.into(), key.into(), value.into_owned()))
128            .await?
129        {
130            Response::Empty(r) => Ok(r),
131            _ => unreachable!(),
132        }
133    }
134
135    async fn get(&self, scope: &str, key: &[u8]) -> basteh::Result<Option<OwnedValue>> {
136        match self.msg(Request::Get(scope.into(), key.into())).await? {
137            Response::Value(r) => Ok(r),
138            _ => unreachable!(),
139        }
140    }
141
142    async fn get_range(
143        &self,
144        scope: &str,
145        key: &[u8],
146        start: i64,
147        end: i64,
148    ) -> basteh::Result<Vec<OwnedValue>> {
149        match self
150            .msg(Request::GetRange(scope.into(), key.into(), start, end))
151            .await?
152        {
153            Response::ValueVec(r) => Ok(r),
154            _ => unreachable!(),
155        }
156    }
157
158    async fn push(&self, scope: &str, key: &[u8], value: Value<'_>) -> basteh::Result<()> {
159        match self
160            .msg(Request::Push(scope.into(), key.into(), value.into_owned()))
161            .await?
162        {
163            Response::Empty(r) => Ok(r),
164            _ => unreachable!(),
165        }
166    }
167
168    async fn push_multiple(
169        &self,
170        scope: &str,
171        key: &[u8],
172        value: Vec<Value<'_>>,
173    ) -> basteh::Result<()> {
174        match self
175            .msg(Request::PushMulti(
176                scope.into(),
177                key.into(),
178                value.into_iter().map(|v| v.into_owned()).collect(),
179            ))
180            .await?
181        {
182            Response::Empty(r) => Ok(r),
183            _ => unreachable!(),
184        }
185    }
186
187    async fn pop(&self, scope: &str, key: &[u8]) -> basteh::Result<Option<OwnedValue>> {
188        match self.msg(Request::Pop(scope.into(), key.into())).await? {
189            Response::Value(r) => Ok(r),
190            _ => unreachable!(),
191        }
192    }
193
194    async fn mutate(
195        &self,
196        scope: &str,
197        key: &[u8],
198        mutations: basteh::dev::Mutation,
199    ) -> basteh::Result<i64> {
200        match self
201            .msg(Request::MutateNumber(scope.into(), key.into(), mutations))
202            .await?
203        {
204            Response::Number(r) => Ok(r),
205            _ => unreachable!(),
206        }
207    }
208
209    async fn remove(&self, scope: &str, key: &[u8]) -> basteh::Result<Option<OwnedValue>> {
210        match self.msg(Request::Remove(scope.into(), key.into())).await? {
211            Response::Value(r) => Ok(r),
212            _ => unreachable!(),
213        }
214    }
215
216    async fn contains_key(&self, scope: &str, key: &[u8]) -> basteh::Result<bool> {
217        match self
218            .msg(Request::Contains(scope.into(), key.into()))
219            .await?
220        {
221            Response::Bool(r) => Ok(r),
222            _ => unreachable!(),
223        }
224    }
225
226    async fn persist(&self, scope: &str, key: &[u8]) -> basteh::Result<()> {
227        match self.msg(Request::Persist(scope.into(), key.into())).await? {
228            Response::Empty(r) => Ok(r),
229            _ => unreachable!(),
230        }
231    }
232
233    async fn expire(&self, scope: &str, key: &[u8], expire_in: Duration) -> basteh::Result<()> {
234        match self
235            .msg(Request::Expire(scope.into(), key.into(), expire_in))
236            .await?
237        {
238            Response::Empty(r) => Ok(r),
239            _ => unreachable!(),
240        }
241    }
242
243    async fn expiry(&self, scope: &str, key: &[u8]) -> basteh::Result<Option<Duration>> {
244        match self.msg(Request::Expiry(scope.into(), key.into())).await? {
245            Response::Duration(r) => Ok(r),
246            _ => unreachable!(),
247        }
248    }
249
250    async fn extend(&self, scope: &str, key: &[u8], duration: Duration) -> basteh::Result<()> {
251        match self
252            .msg(Request::Extend(scope.into(), key.into(), duration))
253            .await?
254        {
255            Response::Empty(r) => Ok(r),
256            _ => unreachable!(),
257        }
258    }
259
260    async fn set_expiring(
261        &self,
262        scope: &str,
263        key: &[u8],
264        value: Value<'_>,
265        expire_in: Duration,
266    ) -> basteh::Result<()> {
267        match self
268            .msg(Request::SetExpiring(
269                scope.into(),
270                key.into(),
271                value.into_owned(),
272                expire_in,
273            ))
274            .await?
275        {
276            Response::Empty(r) => Ok(r),
277            _ => unreachable!(),
278        }
279    }
280
281    async fn get_expiring(
282        &self,
283        scope: &str,
284        key: &[u8],
285    ) -> basteh::Result<Option<(OwnedValue, Option<Duration>)>> {
286        match self
287            .msg(Request::GetExpiring(scope.into(), key.into()))
288            .await?
289        {
290            Response::ValueDuration(r) => Ok(r),
291            _ => unreachable!(),
292        }
293    }
294}
295
296#[cfg(test)]
297mod tests {
298    use std::path::Path;
299
300    use basteh::test_utils::*;
301
302    use crate::RedbBackend;
303
304    type ReDb = RedbBackend<redb::Database>;
305
306    fn open_database(path: &str) -> ReDb {
307        let p = Path::new(path);
308        if p.exists() {
309            std::fs::remove_file(p).ok();
310        }
311
312        RedbBackend::from_db(redb::Database::create(path).unwrap())
313    }
314
315    #[tokio::test]
316    async fn test_redb_store() {
317        test_store(open_database("/tmp/redb.store.db").start(1)).await;
318    }
319
320    #[tokio::test]
321    async fn test_redb_mutations() {
322        test_mutations(open_database("/tmp/redb.mutate.db").start(1)).await;
323    }
324
325    #[tokio::test]
326    async fn test_redb_expiry() {
327        test_expiry(open_database("/tmp/redb.expiry.db").start(1), 2).await;
328    }
329
330    #[tokio::test]
331    async fn test_redb_expiry_store() {
332        test_expiry_store(open_database("/tmp/redb.exp_store.db").start(1), 2).await;
333    }
334}