1use std::time::Duration;
2
3use basteh::{
4 dev::{OwnedValue, Provider, Value},
5 BastehError,
6};
7use inner::RedbInner;
8use message::{Message, Request, Response};
9
10mod delayqueue;
11mod flags;
12mod inner;
13mod message;
14mod value;
15
16pub use redb::Database;
18
19#[derive(Clone)]
41pub struct RedbBackend<T = ()> {
42 inner: T,
43
44 perform_deletion: bool,
45 scan_db_on_start: bool,
46}
47
48impl RedbBackend<()> {
49 #[must_use = "Should be started by calling start method"]
50 pub fn from_db(db: redb::Database) -> RedbBackend<redb::Database> {
51 RedbBackend {
52 inner: db,
53 perform_deletion: false,
54 scan_db_on_start: false,
55 }
56 }
57}
58
59impl<T> RedbBackend<T> {
60 #[must_use = "Should be started by calling start method"]
63 pub fn perform_deletion(mut self, to: bool) -> Self {
64 self.perform_deletion = to;
65 self
66 }
67
68 #[must_use = "Should be started by calling start method"]
70 pub fn scan_db_on_start(mut self, to: bool) -> Self {
71 self.scan_db_on_start = to;
72 self
73 }
74}
75
76impl RedbBackend<redb::Database> {
77 pub fn start(self, thread_num: usize) -> RedbBackend<crossbeam_channel::Sender<Message>> {
78 let mut inner = RedbInner::from_db(self.inner);
79 let (tx, rx) = crossbeam_channel::bounded(4096);
80
81 if self.scan_db_on_start && self.perform_deletion {
82 inner.scan_db().ok();
83 }
84
85 if self.perform_deletion {
86 inner.spawn_expiry_thread();
87 }
88
89 for _ in 0..thread_num {
90 let mut inner = inner.clone();
91 let rx = rx.clone();
92 tokio::task::spawn_blocking(move || {
93 inner.listen(rx);
94 });
95 }
96
97 RedbBackend {
98 inner: tx,
99 perform_deletion: false,
100 scan_db_on_start: false,
101 }
102 }
103}
104
105impl RedbBackend<crossbeam_channel::Sender<Message>> {
106 async fn msg(&self, req: Request) -> basteh::Result<Response> {
107 let (tx, rx) = tokio::sync::oneshot::channel();
108
109 self.inner
110 .try_send(Message { req, tx })
111 .map_err(BastehError::custom)?;
112 rx.await.map_err(BastehError::custom)?
113 }
114}
115
116#[async_trait::async_trait]
117impl Provider for RedbBackend<crossbeam_channel::Sender<Message>> {
118 async fn keys(&self, scope: &str) -> basteh::Result<Box<dyn Iterator<Item = Vec<u8>>>> {
119 match self.msg(Request::Keys(scope.into())).await? {
120 Response::Iterator(r) => Ok(r),
121 _ => unreachable!(),
122 }
123 }
124
125 async fn set(&self, scope: &str, key: &[u8], value: Value<'_>) -> basteh::Result<()> {
126 match self
127 .msg(Request::Set(scope.into(), key.into(), value.into_owned()))
128 .await?
129 {
130 Response::Empty(r) => Ok(r),
131 _ => unreachable!(),
132 }
133 }
134
135 async fn get(&self, scope: &str, key: &[u8]) -> basteh::Result<Option<OwnedValue>> {
136 match self.msg(Request::Get(scope.into(), key.into())).await? {
137 Response::Value(r) => Ok(r),
138 _ => unreachable!(),
139 }
140 }
141
142 async fn get_range(
143 &self,
144 scope: &str,
145 key: &[u8],
146 start: i64,
147 end: i64,
148 ) -> basteh::Result<Vec<OwnedValue>> {
149 match self
150 .msg(Request::GetRange(scope.into(), key.into(), start, end))
151 .await?
152 {
153 Response::ValueVec(r) => Ok(r),
154 _ => unreachable!(),
155 }
156 }
157
158 async fn push(&self, scope: &str, key: &[u8], value: Value<'_>) -> basteh::Result<()> {
159 match self
160 .msg(Request::Push(scope.into(), key.into(), value.into_owned()))
161 .await?
162 {
163 Response::Empty(r) => Ok(r),
164 _ => unreachable!(),
165 }
166 }
167
168 async fn push_multiple(
169 &self,
170 scope: &str,
171 key: &[u8],
172 value: Vec<Value<'_>>,
173 ) -> basteh::Result<()> {
174 match self
175 .msg(Request::PushMulti(
176 scope.into(),
177 key.into(),
178 value.into_iter().map(|v| v.into_owned()).collect(),
179 ))
180 .await?
181 {
182 Response::Empty(r) => Ok(r),
183 _ => unreachable!(),
184 }
185 }
186
187 async fn pop(&self, scope: &str, key: &[u8]) -> basteh::Result<Option<OwnedValue>> {
188 match self.msg(Request::Pop(scope.into(), key.into())).await? {
189 Response::Value(r) => Ok(r),
190 _ => unreachable!(),
191 }
192 }
193
194 async fn mutate(
195 &self,
196 scope: &str,
197 key: &[u8],
198 mutations: basteh::dev::Mutation,
199 ) -> basteh::Result<i64> {
200 match self
201 .msg(Request::MutateNumber(scope.into(), key.into(), mutations))
202 .await?
203 {
204 Response::Number(r) => Ok(r),
205 _ => unreachable!(),
206 }
207 }
208
209 async fn remove(&self, scope: &str, key: &[u8]) -> basteh::Result<Option<OwnedValue>> {
210 match self.msg(Request::Remove(scope.into(), key.into())).await? {
211 Response::Value(r) => Ok(r),
212 _ => unreachable!(),
213 }
214 }
215
216 async fn contains_key(&self, scope: &str, key: &[u8]) -> basteh::Result<bool> {
217 match self
218 .msg(Request::Contains(scope.into(), key.into()))
219 .await?
220 {
221 Response::Bool(r) => Ok(r),
222 _ => unreachable!(),
223 }
224 }
225
226 async fn persist(&self, scope: &str, key: &[u8]) -> basteh::Result<()> {
227 match self.msg(Request::Persist(scope.into(), key.into())).await? {
228 Response::Empty(r) => Ok(r),
229 _ => unreachable!(),
230 }
231 }
232
233 async fn expire(&self, scope: &str, key: &[u8], expire_in: Duration) -> basteh::Result<()> {
234 match self
235 .msg(Request::Expire(scope.into(), key.into(), expire_in))
236 .await?
237 {
238 Response::Empty(r) => Ok(r),
239 _ => unreachable!(),
240 }
241 }
242
243 async fn expiry(&self, scope: &str, key: &[u8]) -> basteh::Result<Option<Duration>> {
244 match self.msg(Request::Expiry(scope.into(), key.into())).await? {
245 Response::Duration(r) => Ok(r),
246 _ => unreachable!(),
247 }
248 }
249
250 async fn extend(&self, scope: &str, key: &[u8], duration: Duration) -> basteh::Result<()> {
251 match self
252 .msg(Request::Extend(scope.into(), key.into(), duration))
253 .await?
254 {
255 Response::Empty(r) => Ok(r),
256 _ => unreachable!(),
257 }
258 }
259
260 async fn set_expiring(
261 &self,
262 scope: &str,
263 key: &[u8],
264 value: Value<'_>,
265 expire_in: Duration,
266 ) -> basteh::Result<()> {
267 match self
268 .msg(Request::SetExpiring(
269 scope.into(),
270 key.into(),
271 value.into_owned(),
272 expire_in,
273 ))
274 .await?
275 {
276 Response::Empty(r) => Ok(r),
277 _ => unreachable!(),
278 }
279 }
280
281 async fn get_expiring(
282 &self,
283 scope: &str,
284 key: &[u8],
285 ) -> basteh::Result<Option<(OwnedValue, Option<Duration>)>> {
286 match self
287 .msg(Request::GetExpiring(scope.into(), key.into()))
288 .await?
289 {
290 Response::ValueDuration(r) => Ok(r),
291 _ => unreachable!(),
292 }
293 }
294}
295
296#[cfg(test)]
297mod tests {
298 use std::path::Path;
299
300 use basteh::test_utils::*;
301
302 use crate::RedbBackend;
303
304 type ReDb = RedbBackend<redb::Database>;
305
306 fn open_database(path: &str) -> ReDb {
307 let p = Path::new(path);
308 if p.exists() {
309 std::fs::remove_file(p).ok();
310 }
311
312 RedbBackend::from_db(redb::Database::create(path).unwrap())
313 }
314
315 #[tokio::test]
316 async fn test_redb_store() {
317 test_store(open_database("/tmp/redb.store.db").start(1)).await;
318 }
319
320 #[tokio::test]
321 async fn test_redb_mutations() {
322 test_mutations(open_database("/tmp/redb.mutate.db").start(1)).await;
323 }
324
325 #[tokio::test]
326 async fn test_redb_expiry() {
327 test_expiry(open_database("/tmp/redb.expiry.db").start(1), 2).await;
328 }
329
330 #[tokio::test]
331 async fn test_redb_expiry_store() {
332 test_expiry_store(open_database("/tmp/redb.exp_store.db").start(1), 2).await;
333 }
334}