reddb/
lib.rs

1use failure::ResultExt;
2use futures::stream::{self, StreamExt};
3use futures::TryStreamExt;
4use std::collections::HashMap;
5use std::fmt::Debug;
6use std::sync::Arc;
7use std::thread;
8use tokio::runtime::Runtime;
9use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
10pub use uuid::Uuid;
11
12mod document;
13mod error;
14pub mod serializer;
15mod status;
16mod storage;
17
18pub use document::Document;
19use error::{RedDbErrorKind, Result};
20use serde::{Deserialize, Serialize};
21use serializer::Serializer;
22use status::Status;
23pub use storage::FileStorage;
24use storage::Storage;
25
26type RedDbHM = HashMap<Uuid, Vec<u8>>;
27
28#[cfg(feature = "bin_ser")]
29pub type BinDb = RedDb<serializer::Bin, FileStorage<serializer::Bin>>;
30#[cfg(feature = "json_ser")]
31pub type JsonDb = RedDb<serializer::Json, FileStorage<serializer::Json>>;
32#[cfg(feature = "yaml_ser")]
33pub type YamlDb = RedDb<serializer::Yaml, FileStorage<serializer::Yaml>>;
34#[cfg(feature = "ron_ser")]
35pub type RonDb = RedDb<serializer::Ron, FileStorage<serializer::Ron>>;
36
37#[derive(Debug)]
38pub struct RedDb<SE, ST> {
39    storage: ST,
40    serializer: SE,
41    data: Arc<RwLock<RedDbHM>>,
42}
43
44impl<'a, SE, ST: 'static> RedDb<SE, ST>
45where
46    for<'de> SE: Serializer<'de> + Debug,
47    for<'de> ST: Storage + Debug + Send + Sync,
48{
49    pub fn new<T>(db_name: &'static str) -> Result<Self>
50    where
51        for<'de> T: Serialize + Deserialize<'de> + Debug + PartialEq + Send + Sync,
52    {
53        let mut rt = Runtime::new().unwrap();
54
55        let (data, storage) = thread::spawn(move || {
56            let storage = rt.block_on(async { ST::new(db_name).await.unwrap() });
57            let data = rt.block_on(async { storage.load::<T>().await.unwrap() });
58            (data, storage)
59        })
60        .join()
61        .map_err(|_| RedDbErrorKind::Datapersist)?;
62
63        Ok(Self {
64            storage,
65            data: Arc::new(RwLock::new(data)),
66            serializer: SE::default(),
67        })
68    }
69
70    async fn read(&'a self) -> Result<RwLockReadGuard<'a, RedDbHM>> {
71        let lock = self.data.read().await;
72        Ok(lock)
73    }
74
75    async fn write(&'a self) -> Result<RwLockWriteGuard<'a, RedDbHM>> {
76        let lock = self.data.write().await;
77        Ok(lock)
78    }
79
80    fn create_doc<T>(&self, id: &Uuid, value: T, status: Status) -> Document<T>
81    where
82        for<'de> T: Serialize + Deserialize<'de> + Debug + PartialEq,
83    {
84        Document::new(*id, value, status)
85    }
86
87    async fn find_uuids<T>(&self, search: &T) -> Result<Vec<Uuid>>
88    where
89        for<'de> T: Serialize + Deserialize<'de> + Debug + PartialEq,
90    {
91        let data = self
92            .read()
93            .await
94            .map_err(|_| RedDbErrorKind::PoisonedValue)
95            .unwrap();
96
97        let serialized = self.serialize(search)?;
98
99        let docs: Vec<Uuid> = data
100            .iter()
101            .filter(|(_id, value)| **value == serialized)
102            .map(|(_id, _value)| *_id)
103            .collect();
104
105        Ok(docs)
106    }
107
108    async fn insert_document<T>(&self, value: T) -> Result<Document<T>>
109    where
110        for<'de> T: Serialize + Deserialize<'de> + Debug + PartialEq,
111    {
112        let mut data = self.write().await?;
113        let id = Uuid::new_v4();
114        let serialized = self.serialize(&value)?;
115        data.insert(id, serialized);
116        let result = self.create_doc(&id, value, Status::default());
117
118        Ok(result)
119    }
120
121    pub async fn insert_one<T>(&self, value: T) -> Result<Document<T>>
122    where
123        for<'de> T: Serialize + Deserialize<'de> + Debug + Clone + PartialEq + Send + Sync,
124    {
125        let doc = self.insert_document(value).await?;
126        self.storage
127            .persist(&[doc.to_owned()])
128            .await
129            .context(RedDbErrorKind::Datapersist)?;
130        Ok(doc)
131    }
132
133    pub async fn insert<T>(&self, values: Vec<T>) -> Result<Vec<Document<T>>>
134    where
135        for<'de> T: Serialize + Deserialize<'de> + Debug + PartialEq + Send + Sync,
136    {
137        let docs: Vec<Document<T>> = stream::iter(values)
138            .then(|data| self.insert_document(data))
139            .try_collect()
140            .await?;
141
142        self.storage
143            .persist(&docs)
144            .await
145            .context(RedDbErrorKind::Datapersist)?;
146
147        Ok(docs)
148    }
149
150    pub async fn find_one<T>(&self, id: &Uuid) -> Result<Document<T>>
151    where
152        for<'de> T: Serialize + Deserialize<'de> + Debug + PartialEq,
153    {
154        let data = self
155            .read()
156            .await
157            .map_err(|_| RedDbErrorKind::PoisonedValue)?;
158
159        let data = data.get(&id).ok_or(RedDbErrorKind::NotFound { _id: *id })?;
160
161        let data = self.deserialize(&*data)?;
162        let doc = self.create_doc(id, data, Status::In);
163        Ok(doc)
164    }
165
166    pub async fn update_one<T>(&'a self, id: &Uuid, new_value: T) -> Result<bool>
167    where
168        for<'de> T: Serialize + Deserialize<'de> + Debug + PartialEq + Send + Sync,
169    {
170        let mut data = self
171            .write()
172            .await
173            .map_err(|_| RedDbErrorKind::PoisonedValue)?;
174
175        if data.contains_key(id) {
176            let data = data
177                .get_mut(&id)
178                .ok_or(RedDbErrorKind::NotFound { _id: *id })?;
179
180            *data = self.serialize(&new_value)?;
181            let doc = self.create_doc(id, new_value, Status::Up);
182
183            self.storage
184                .persist(&[doc])
185                .await
186                .context(RedDbErrorKind::Datapersist)?;
187
188            Ok(true)
189        } else {
190            Ok(false)
191        }
192    }
193
194    pub async fn remove_document<T>(&self, id: Uuid) -> Result<Document<T>>
195    where
196        for<'de> T: Serialize + Deserialize<'de> + Debug + PartialEq + Send + Sync,
197    {
198        let mut data = self.write().await?;
199        let value = data.remove(&id).unwrap();
200        let data = self.deserialize(&value)?;
201        let doc = self.create_doc(&id, data, Status::De);
202        Ok(doc)
203    }
204
205    pub async fn delete_one<T>(&self, id: &Uuid) -> Result<Document<T>>
206    where
207        for<'de> T: Serialize + Deserialize<'de> + Debug + PartialEq + Send + Sync,
208    {
209        let result = self.remove_document(*id).await?;
210        Ok(result)
211    }
212
213    pub async fn find_all<T>(&self) -> Result<Vec<Document<T>>>
214    where
215        for<'de> T: Serialize + Deserialize<'de> + Debug + PartialEq,
216    {
217        let data = self
218            .read()
219            .await
220            .map_err(|_| RedDbErrorKind::PoisonedValue)?;
221
222        let docs: Vec<Document<T>> = data
223            .iter()
224            .map(|(id, data)| {
225                let data = self.deserialize(&*data).unwrap();
226                self.create_doc(id, data, Status::In)
227            })
228            .collect();
229
230        Ok(docs)
231    }
232
233    pub async fn find<T>(&self, search: &T) -> Result<Vec<Document<T>>>
234    where
235        for<'de> T: Serialize + Deserialize<'de> + Debug + PartialEq,
236    {
237        let data = self
238            .read()
239            .await
240            .map_err(|_| RedDbErrorKind::PoisonedValue)?;
241
242        let serialized = self.serialize(search)?;
243
244        let docs: Vec<Document<T>> = data
245            .iter()
246            .filter(|(_id, data)| **data == serialized)
247            .map(|(_id, data)| {
248                let data = self.deserialize(&*data).unwrap();
249                self.create_doc(_id, data, Status::In)
250            })
251            .collect();
252
253        Ok(docs)
254    }
255
256    pub async fn update<T>(&self, search: &T, new_value: &T) -> Result<usize>
257    where
258        for<'de> T: Serialize + Deserialize<'de> + Clone + Debug + PartialEq + Send + Sync,
259    {
260        let mut data = self
261            .write()
262            .await
263            .map_err(|_| RedDbErrorKind::PoisonedValue)?;
264
265        let query = self.serialize(search)?;
266
267        let docs: Vec<Document<T>> = data
268            .iter_mut()
269            .filter(|(_id, data)| **data == query)
270            .map(|(_id, data)| {
271                *data = self.serialize(new_value).unwrap();
272                self.create_doc(_id, new_value.to_owned(), Status::Up)
273            })
274            .collect();
275
276        let result = docs.len();
277
278        self.storage
279            .persist(&docs)
280            .await
281            .context(RedDbErrorKind::Datapersist)?;
282
283        Ok(result)
284    }
285
286    pub async fn delete<T>(&self, search: &T) -> Result<usize>
287    where
288        for<'de> T: Serialize + Deserialize<'de> + Debug + PartialEq + Send + Sync,
289    {
290        let uuids = self.find_uuids(search).await?;
291
292        let docs: Vec<Document<T>> = stream::iter(uuids)
293            .then(|_id| self.remove_document(_id))
294            .try_collect()
295            .await?;
296
297        self.storage
298            .persist(&docs)
299            .await
300            .context(RedDbErrorKind::Datapersist)?;
301
302        Ok(docs.len())
303    }
304
305    fn serialize<T>(&self, value: &T) -> Result<Vec<u8>>
306    where
307        for<'de> T: Serialize + Deserialize<'de> + Debug + PartialEq,
308    {
309        Ok(self
310            .serializer
311            .serialize(value)
312            .context(RedDbErrorKind::Serialization)?)
313    }
314
315    fn deserialize<T>(&self, value: &[u8]) -> Result<T>
316    where
317        for<'de> T: Serialize + Deserialize<'de> + Debug + PartialEq,
318    {
319        Ok(self
320            .serializer
321            .deserialize(value)
322            .context(RedDbErrorKind::Deserialization)?)
323    }
324}
325
326#[cfg(test)]
327#[cfg_attr(not(feature = "ron_ser"), ignore)]
328mod tests {
329    use super::*;
330    use crate::RonDb;
331    use std::fs;
332
333    #[derive(Clone, Debug, Serialize, PartialEq, Deserialize)]
334    struct TestStruct {
335        foo: String,
336    }
337
338    #[tokio::test]
339    async fn insert_document() {
340        let db = RonDb::new::<TestStruct>(".test.db").unwrap();
341        let _id = &Uuid::new_v4();
342        let data = TestStruct {
343            foo: "test".to_owned(),
344        };
345        let doc: Document<TestStruct> = db.insert_document(data).await.unwrap();
346        let find: Document<TestStruct> = db.find_one(&doc._id).await.unwrap();
347        assert_eq!(find.data, doc.data);
348    }
349    #[tokio::test]
350    async fn find_uuids() {
351        let db = RonDb::new::<TestStruct>(".test.db").unwrap();
352        let doc: Document<TestStruct> = db
353            .insert_document(TestStruct {
354                foo: "test".to_owned(),
355            })
356            .await
357            .unwrap();
358
359        let doc2: Document<TestStruct> = db
360            .insert_document(TestStruct {
361                foo: "test2".to_owned(),
362            })
363            .await
364            .unwrap();
365
366        let doc3: Document<TestStruct> = db
367            .insert_document(TestStruct {
368                foo: "test".to_owned(),
369            })
370            .await
371            .unwrap();
372        let uuids: Vec<Uuid> = db
373            .find_uuids(&TestStruct {
374                foo: "test".to_owned(),
375            })
376            .await
377            .unwrap();
378
379        assert_eq!(uuids.contains(&doc._id), true);
380        assert_eq!(uuids.contains(&doc2._id), false);
381        assert_eq!(uuids.contains(&doc3._id), true);
382
383        fs::remove_file(".test.db.ron").unwrap();
384    }
385    #[tokio::test]
386    async fn insert_and_find_one() {
387        let db = RonDb::new::<TestStruct>(".insert_and_find_one.db").unwrap();
388        let doc: Document<TestStruct> = db
389            .insert_one(TestStruct {
390                foo: "test".to_owned(),
391            })
392            .await
393            .unwrap();
394
395        let find: Document<TestStruct> = db.find_one(&doc._id).await.unwrap();
396        assert_eq!(find._id, doc._id);
397        assert_eq!(find.data, doc.data);
398
399        fs::remove_file(".insert_and_find_one.db.ron").unwrap();
400    }
401    #[tokio::test]
402    async fn find() {
403        let db = RonDb::new::<TestStruct>(".find.db").unwrap();
404
405        let one = TestStruct {
406            foo: String::from("one"),
407        };
408
409        let two = TestStruct {
410            foo: String::from("two"),
411        };
412
413        let many = vec![one.clone(), one.clone(), two.clone()];
414        db.insert(many).await.unwrap();
415        let result = db.find(&one).await.unwrap();
416        assert_eq!(result.len(), 2);
417        fs::remove_file(".find.db.ron").unwrap();
418    }
419    #[tokio::test]
420    async fn update_one() {
421        let db = RonDb::new::<TestStruct>(".update_one.db").unwrap();
422        let original = TestStruct {
423            foo: "hi".to_owned(),
424        };
425
426        let updated = TestStruct {
427            foo: "bye".to_owned(),
428        };
429
430        let doc = db.insert_one(original.clone()).await.unwrap();
431        db.update_one(&doc._id, updated.clone()).await.unwrap();
432        let result: Document<TestStruct> = db.find_one(&doc._id).await.unwrap();
433        assert_eq!(result.data, updated);
434        fs::remove_file(".update_one.db.ron").unwrap();
435    }
436
437    #[tokio::test]
438    async fn update() {
439        let db = RonDb::new::<TestStruct>(".update.db").unwrap();
440        let one = TestStruct {
441            foo: String::from("one"),
442        };
443        let two = TestStruct {
444            foo: String::from("two"),
445        };
446
447        let many = vec![one.clone(), one.clone(), two.clone()];
448        db.insert(many).await.unwrap();
449        let updated = db.update(&one, &two).await.unwrap();
450        assert_eq!(updated, 2);
451        let result = db.find(&two).await.unwrap();
452        assert_eq!(result.len(), 3);
453        fs::remove_file(".update.db.ron").unwrap();
454    }
455
456    #[tokio::test]
457    async fn delete_and_find_one() {
458        let db = RonDb::new::<TestStruct>(".delete_one.db").unwrap();
459        let search = TestStruct {
460            foo: "test".to_owned(),
461        };
462
463        let doc = db.insert_one(search.clone()).await.unwrap();
464        let deleted = db.delete_one(&doc._id).await.unwrap();
465        assert_eq!(
466            deleted,
467            Document {
468                _id: doc._id,
469                data: doc.data,
470                _st: Status::De
471            }
472        );
473        fs::remove_file(".delete_one.db.ron").unwrap();
474    }
475
476    async fn delete() {
477        let db = RonDb::new::<TestStruct>(".delete.db").unwrap();
478        let one = TestStruct {
479            foo: "one".to_owned(),
480        };
481
482        let two = TestStruct {
483            foo: "two".to_owned(),
484        };
485
486        let many = vec![one.clone(), one.clone(), two.clone()];
487        db.insert(many).await.unwrap();
488        let deleted = db.delete(&one).await.unwrap();
489        assert_eq!(deleted, 2);
490
491        let not_deleted = db.delete(&one).await.unwrap();
492        assert_eq!(not_deleted, 0);
493        fs::remove_file(".delete.db.ron").unwrap();
494    }
495    #[tokio::test]
496    async fn serialie_deserialize() {
497        let db = RonDb::new::<TestStruct>(".serialize.db").unwrap();
498        let test = TestStruct {
499            foo: "one".to_owned(),
500        };
501        let byte_str = [40, 102, 111, 111, 58, 34, 111, 110, 101, 34, 41, 10];
502        let serialized = db.serializer.serialize(&test).unwrap();
503        assert_eq!(serialized, byte_str);
504        let deserialized: TestStruct = db.serializer.deserialize(&byte_str).unwrap();
505        assert_eq!(deserialized, test);
506        fs::remove_file(".serialize.db.ron").unwrap();
507    }
508}