1use failure::ResultExt;
2use futures::stream::{self, StreamExt};
3use futures::TryStreamExt;
4use std::collections::HashMap;
5use std::fmt::Debug;
6use std::sync::Arc;
7use std::thread;
8use tokio::runtime::Runtime;
9use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
10pub use uuid::Uuid;
11
12mod document;
13mod error;
14pub mod serializer;
15mod status;
16mod storage;
17
18pub use document::Document;
19use error::{RedDbErrorKind, Result};
20use serde::{Deserialize, Serialize};
21use serializer::Serializer;
22use status::Status;
23pub use storage::FileStorage;
24use storage::Storage;
25
26type RedDbHM = HashMap<Uuid, Vec<u8>>;
27
28#[cfg(feature = "bin_ser")]
29pub type BinDb = RedDb<serializer::Bin, FileStorage<serializer::Bin>>;
30#[cfg(feature = "json_ser")]
31pub type JsonDb = RedDb<serializer::Json, FileStorage<serializer::Json>>;
32#[cfg(feature = "yaml_ser")]
33pub type YamlDb = RedDb<serializer::Yaml, FileStorage<serializer::Yaml>>;
34#[cfg(feature = "ron_ser")]
35pub type RonDb = RedDb<serializer::Ron, FileStorage<serializer::Ron>>;
36
37#[derive(Debug)]
38pub struct RedDb<SE, ST> {
39 storage: ST,
40 serializer: SE,
41 data: Arc<RwLock<RedDbHM>>,
42}
43
44impl<'a, SE, ST: 'static> RedDb<SE, ST>
45where
46 for<'de> SE: Serializer<'de> + Debug,
47 for<'de> ST: Storage + Debug + Send + Sync,
48{
49 pub fn new<T>(db_name: &'static str) -> Result<Self>
50 where
51 for<'de> T: Serialize + Deserialize<'de> + Debug + PartialEq + Send + Sync,
52 {
53 let mut rt = Runtime::new().unwrap();
54
55 let (data, storage) = thread::spawn(move || {
56 let storage = rt.block_on(async { ST::new(db_name).await.unwrap() });
57 let data = rt.block_on(async { storage.load::<T>().await.unwrap() });
58 (data, storage)
59 })
60 .join()
61 .map_err(|_| RedDbErrorKind::Datapersist)?;
62
63 Ok(Self {
64 storage,
65 data: Arc::new(RwLock::new(data)),
66 serializer: SE::default(),
67 })
68 }
69
70 async fn read(&'a self) -> Result<RwLockReadGuard<'a, RedDbHM>> {
71 let lock = self.data.read().await;
72 Ok(lock)
73 }
74
75 async fn write(&'a self) -> Result<RwLockWriteGuard<'a, RedDbHM>> {
76 let lock = self.data.write().await;
77 Ok(lock)
78 }
79
80 fn create_doc<T>(&self, id: &Uuid, value: T, status: Status) -> Document<T>
81 where
82 for<'de> T: Serialize + Deserialize<'de> + Debug + PartialEq,
83 {
84 Document::new(*id, value, status)
85 }
86
87 async fn find_uuids<T>(&self, search: &T) -> Result<Vec<Uuid>>
88 where
89 for<'de> T: Serialize + Deserialize<'de> + Debug + PartialEq,
90 {
91 let data = self
92 .read()
93 .await
94 .map_err(|_| RedDbErrorKind::PoisonedValue)
95 .unwrap();
96
97 let serialized = self.serialize(search)?;
98
99 let docs: Vec<Uuid> = data
100 .iter()
101 .filter(|(_id, value)| **value == serialized)
102 .map(|(_id, _value)| *_id)
103 .collect();
104
105 Ok(docs)
106 }
107
108 async fn insert_document<T>(&self, value: T) -> Result<Document<T>>
109 where
110 for<'de> T: Serialize + Deserialize<'de> + Debug + PartialEq,
111 {
112 let mut data = self.write().await?;
113 let id = Uuid::new_v4();
114 let serialized = self.serialize(&value)?;
115 data.insert(id, serialized);
116 let result = self.create_doc(&id, value, Status::default());
117
118 Ok(result)
119 }
120
121 pub async fn insert_one<T>(&self, value: T) -> Result<Document<T>>
122 where
123 for<'de> T: Serialize + Deserialize<'de> + Debug + Clone + PartialEq + Send + Sync,
124 {
125 let doc = self.insert_document(value).await?;
126 self.storage
127 .persist(&[doc.to_owned()])
128 .await
129 .context(RedDbErrorKind::Datapersist)?;
130 Ok(doc)
131 }
132
133 pub async fn insert<T>(&self, values: Vec<T>) -> Result<Vec<Document<T>>>
134 where
135 for<'de> T: Serialize + Deserialize<'de> + Debug + PartialEq + Send + Sync,
136 {
137 let docs: Vec<Document<T>> = stream::iter(values)
138 .then(|data| self.insert_document(data))
139 .try_collect()
140 .await?;
141
142 self.storage
143 .persist(&docs)
144 .await
145 .context(RedDbErrorKind::Datapersist)?;
146
147 Ok(docs)
148 }
149
150 pub async fn find_one<T>(&self, id: &Uuid) -> Result<Document<T>>
151 where
152 for<'de> T: Serialize + Deserialize<'de> + Debug + PartialEq,
153 {
154 let data = self
155 .read()
156 .await
157 .map_err(|_| RedDbErrorKind::PoisonedValue)?;
158
159 let data = data.get(&id).ok_or(RedDbErrorKind::NotFound { _id: *id })?;
160
161 let data = self.deserialize(&*data)?;
162 let doc = self.create_doc(id, data, Status::In);
163 Ok(doc)
164 }
165
166 pub async fn update_one<T>(&'a self, id: &Uuid, new_value: T) -> Result<bool>
167 where
168 for<'de> T: Serialize + Deserialize<'de> + Debug + PartialEq + Send + Sync,
169 {
170 let mut data = self
171 .write()
172 .await
173 .map_err(|_| RedDbErrorKind::PoisonedValue)?;
174
175 if data.contains_key(id) {
176 let data = data
177 .get_mut(&id)
178 .ok_or(RedDbErrorKind::NotFound { _id: *id })?;
179
180 *data = self.serialize(&new_value)?;
181 let doc = self.create_doc(id, new_value, Status::Up);
182
183 self.storage
184 .persist(&[doc])
185 .await
186 .context(RedDbErrorKind::Datapersist)?;
187
188 Ok(true)
189 } else {
190 Ok(false)
191 }
192 }
193
194 pub async fn remove_document<T>(&self, id: Uuid) -> Result<Document<T>>
195 where
196 for<'de> T: Serialize + Deserialize<'de> + Debug + PartialEq + Send + Sync,
197 {
198 let mut data = self.write().await?;
199 let value = data.remove(&id).unwrap();
200 let data = self.deserialize(&value)?;
201 let doc = self.create_doc(&id, data, Status::De);
202 Ok(doc)
203 }
204
205 pub async fn delete_one<T>(&self, id: &Uuid) -> Result<Document<T>>
206 where
207 for<'de> T: Serialize + Deserialize<'de> + Debug + PartialEq + Send + Sync,
208 {
209 let result = self.remove_document(*id).await?;
210 Ok(result)
211 }
212
213 pub async fn find_all<T>(&self) -> Result<Vec<Document<T>>>
214 where
215 for<'de> T: Serialize + Deserialize<'de> + Debug + PartialEq,
216 {
217 let data = self
218 .read()
219 .await
220 .map_err(|_| RedDbErrorKind::PoisonedValue)?;
221
222 let docs: Vec<Document<T>> = data
223 .iter()
224 .map(|(id, data)| {
225 let data = self.deserialize(&*data).unwrap();
226 self.create_doc(id, data, Status::In)
227 })
228 .collect();
229
230 Ok(docs)
231 }
232
233 pub async fn find<T>(&self, search: &T) -> Result<Vec<Document<T>>>
234 where
235 for<'de> T: Serialize + Deserialize<'de> + Debug + PartialEq,
236 {
237 let data = self
238 .read()
239 .await
240 .map_err(|_| RedDbErrorKind::PoisonedValue)?;
241
242 let serialized = self.serialize(search)?;
243
244 let docs: Vec<Document<T>> = data
245 .iter()
246 .filter(|(_id, data)| **data == serialized)
247 .map(|(_id, data)| {
248 let data = self.deserialize(&*data).unwrap();
249 self.create_doc(_id, data, Status::In)
250 })
251 .collect();
252
253 Ok(docs)
254 }
255
256 pub async fn update<T>(&self, search: &T, new_value: &T) -> Result<usize>
257 where
258 for<'de> T: Serialize + Deserialize<'de> + Clone + Debug + PartialEq + Send + Sync,
259 {
260 let mut data = self
261 .write()
262 .await
263 .map_err(|_| RedDbErrorKind::PoisonedValue)?;
264
265 let query = self.serialize(search)?;
266
267 let docs: Vec<Document<T>> = data
268 .iter_mut()
269 .filter(|(_id, data)| **data == query)
270 .map(|(_id, data)| {
271 *data = self.serialize(new_value).unwrap();
272 self.create_doc(_id, new_value.to_owned(), Status::Up)
273 })
274 .collect();
275
276 let result = docs.len();
277
278 self.storage
279 .persist(&docs)
280 .await
281 .context(RedDbErrorKind::Datapersist)?;
282
283 Ok(result)
284 }
285
286 pub async fn delete<T>(&self, search: &T) -> Result<usize>
287 where
288 for<'de> T: Serialize + Deserialize<'de> + Debug + PartialEq + Send + Sync,
289 {
290 let uuids = self.find_uuids(search).await?;
291
292 let docs: Vec<Document<T>> = stream::iter(uuids)
293 .then(|_id| self.remove_document(_id))
294 .try_collect()
295 .await?;
296
297 self.storage
298 .persist(&docs)
299 .await
300 .context(RedDbErrorKind::Datapersist)?;
301
302 Ok(docs.len())
303 }
304
305 fn serialize<T>(&self, value: &T) -> Result<Vec<u8>>
306 where
307 for<'de> T: Serialize + Deserialize<'de> + Debug + PartialEq,
308 {
309 Ok(self
310 .serializer
311 .serialize(value)
312 .context(RedDbErrorKind::Serialization)?)
313 }
314
315 fn deserialize<T>(&self, value: &[u8]) -> Result<T>
316 where
317 for<'de> T: Serialize + Deserialize<'de> + Debug + PartialEq,
318 {
319 Ok(self
320 .serializer
321 .deserialize(value)
322 .context(RedDbErrorKind::Deserialization)?)
323 }
324}
325
326#[cfg(test)]
327#[cfg_attr(not(feature = "ron_ser"), ignore)]
328mod tests {
329 use super::*;
330 use crate::RonDb;
331 use std::fs;
332
333 #[derive(Clone, Debug, Serialize, PartialEq, Deserialize)]
334 struct TestStruct {
335 foo: String,
336 }
337
338 #[tokio::test]
339 async fn insert_document() {
340 let db = RonDb::new::<TestStruct>(".test.db").unwrap();
341 let _id = &Uuid::new_v4();
342 let data = TestStruct {
343 foo: "test".to_owned(),
344 };
345 let doc: Document<TestStruct> = db.insert_document(data).await.unwrap();
346 let find: Document<TestStruct> = db.find_one(&doc._id).await.unwrap();
347 assert_eq!(find.data, doc.data);
348 }
349 #[tokio::test]
350 async fn find_uuids() {
351 let db = RonDb::new::<TestStruct>(".test.db").unwrap();
352 let doc: Document<TestStruct> = db
353 .insert_document(TestStruct {
354 foo: "test".to_owned(),
355 })
356 .await
357 .unwrap();
358
359 let doc2: Document<TestStruct> = db
360 .insert_document(TestStruct {
361 foo: "test2".to_owned(),
362 })
363 .await
364 .unwrap();
365
366 let doc3: Document<TestStruct> = db
367 .insert_document(TestStruct {
368 foo: "test".to_owned(),
369 })
370 .await
371 .unwrap();
372 let uuids: Vec<Uuid> = db
373 .find_uuids(&TestStruct {
374 foo: "test".to_owned(),
375 })
376 .await
377 .unwrap();
378
379 assert_eq!(uuids.contains(&doc._id), true);
380 assert_eq!(uuids.contains(&doc2._id), false);
381 assert_eq!(uuids.contains(&doc3._id), true);
382
383 fs::remove_file(".test.db.ron").unwrap();
384 }
385 #[tokio::test]
386 async fn insert_and_find_one() {
387 let db = RonDb::new::<TestStruct>(".insert_and_find_one.db").unwrap();
388 let doc: Document<TestStruct> = db
389 .insert_one(TestStruct {
390 foo: "test".to_owned(),
391 })
392 .await
393 .unwrap();
394
395 let find: Document<TestStruct> = db.find_one(&doc._id).await.unwrap();
396 assert_eq!(find._id, doc._id);
397 assert_eq!(find.data, doc.data);
398
399 fs::remove_file(".insert_and_find_one.db.ron").unwrap();
400 }
401 #[tokio::test]
402 async fn find() {
403 let db = RonDb::new::<TestStruct>(".find.db").unwrap();
404
405 let one = TestStruct {
406 foo: String::from("one"),
407 };
408
409 let two = TestStruct {
410 foo: String::from("two"),
411 };
412
413 let many = vec![one.clone(), one.clone(), two.clone()];
414 db.insert(many).await.unwrap();
415 let result = db.find(&one).await.unwrap();
416 assert_eq!(result.len(), 2);
417 fs::remove_file(".find.db.ron").unwrap();
418 }
419 #[tokio::test]
420 async fn update_one() {
421 let db = RonDb::new::<TestStruct>(".update_one.db").unwrap();
422 let original = TestStruct {
423 foo: "hi".to_owned(),
424 };
425
426 let updated = TestStruct {
427 foo: "bye".to_owned(),
428 };
429
430 let doc = db.insert_one(original.clone()).await.unwrap();
431 db.update_one(&doc._id, updated.clone()).await.unwrap();
432 let result: Document<TestStruct> = db.find_one(&doc._id).await.unwrap();
433 assert_eq!(result.data, updated);
434 fs::remove_file(".update_one.db.ron").unwrap();
435 }
436
437 #[tokio::test]
438 async fn update() {
439 let db = RonDb::new::<TestStruct>(".update.db").unwrap();
440 let one = TestStruct {
441 foo: String::from("one"),
442 };
443 let two = TestStruct {
444 foo: String::from("two"),
445 };
446
447 let many = vec![one.clone(), one.clone(), two.clone()];
448 db.insert(many).await.unwrap();
449 let updated = db.update(&one, &two).await.unwrap();
450 assert_eq!(updated, 2);
451 let result = db.find(&two).await.unwrap();
452 assert_eq!(result.len(), 3);
453 fs::remove_file(".update.db.ron").unwrap();
454 }
455
456 #[tokio::test]
457 async fn delete_and_find_one() {
458 let db = RonDb::new::<TestStruct>(".delete_one.db").unwrap();
459 let search = TestStruct {
460 foo: "test".to_owned(),
461 };
462
463 let doc = db.insert_one(search.clone()).await.unwrap();
464 let deleted = db.delete_one(&doc._id).await.unwrap();
465 assert_eq!(
466 deleted,
467 Document {
468 _id: doc._id,
469 data: doc.data,
470 _st: Status::De
471 }
472 );
473 fs::remove_file(".delete_one.db.ron").unwrap();
474 }
475
476 async fn delete() {
477 let db = RonDb::new::<TestStruct>(".delete.db").unwrap();
478 let one = TestStruct {
479 foo: "one".to_owned(),
480 };
481
482 let two = TestStruct {
483 foo: "two".to_owned(),
484 };
485
486 let many = vec![one.clone(), one.clone(), two.clone()];
487 db.insert(many).await.unwrap();
488 let deleted = db.delete(&one).await.unwrap();
489 assert_eq!(deleted, 2);
490
491 let not_deleted = db.delete(&one).await.unwrap();
492 assert_eq!(not_deleted, 0);
493 fs::remove_file(".delete.db.ron").unwrap();
494 }
495 #[tokio::test]
496 async fn serialie_deserialize() {
497 let db = RonDb::new::<TestStruct>(".serialize.db").unwrap();
498 let test = TestStruct {
499 foo: "one".to_owned(),
500 };
501 let byte_str = [40, 102, 111, 111, 58, 34, 111, 110, 101, 34, 41, 10];
502 let serialized = db.serializer.serialize(&test).unwrap();
503 assert_eq!(serialized, byte_str);
504 let deserialized: TestStruct = db.serializer.deserialize(&byte_str).unwrap();
505 assert_eq!(deserialized, test);
506 fs::remove_file(".serialize.db.ron").unwrap();
507 }
508}