simple_database/
database.rs

1use super::Error;
2
3use super::traits::{KeyValueStore, Indexable};
4
5use chrono::{Utc, DateTime};
6use uuid::Uuid;
7
8use std::path::PathBuf;
9use std::collections::BTreeMap;
10use std::cmp::Ordering;
11
12use serde::{Serialize, Deserialize};
13
14#[derive(Serialize, Deserialize, Clone)]
15pub struct F64 {inner: f64}
16
17impl F64 {
18    pub fn new(inner: f64) -> Self {
19        F64{inner}
20    }
21}
22
23impl std::hash::Hash for F64 {
24    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {state.write(&self.inner.to_le_bytes())}
25}
26impl PartialEq for F64 {
27    fn eq(&self, other: &Self) -> bool {matches!(self.inner.total_cmp(&other.inner), Ordering::Equal)}
28}
29impl Eq for F64 {}
30impl PartialOrd for F64 {
31    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {Some(self.cmp(other))}
32}
33impl Ord for F64 {
34    fn cmp(&self, other: &Self) -> Ordering {self.inner.total_cmp(&other.inner)}
35}
36impl From<f64> for F64 {
37    fn from(item: f64) -> F64 {F64::new(item)}
38}
39
40impl std::fmt::Display for F64 {
41    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42        write!(f, "{}", self.inner)
43    }
44}
45
46#[derive(Serialize, Deserialize, Clone, PartialOrd, Ord)]
47pub enum Value {
48    I64(i64),
49    U64(u64),
50    Bytes(Vec<u8>),
51    F64(F64),
52    r#String(String),
53    Bool(bool),
54    Array(Vec<Value>)
55}
56
57impl Value {
58    pub fn as_i64(&self) -> Option<&i64> {if let Value::I64(val) = &self {Some(val)} else {None}}
59    pub fn as_u64(&self) -> Option<&u64> {if let Value::U64(val) = &self {Some(val)} else {None}}
60    pub fn as_bytes(&self) -> Option<&Vec<u8>> {if let Value::Bytes(val) = &self {Some(val)} else {None}}
61    pub fn as_f64(&self) -> Option<&F64> {if let Value::F64(val) = &self {Some(val)} else {None}}
62    pub fn as_string(&self) -> Option<&String> {if let Value::r#String(val) = &self {Some(val)} else {None}}
63    pub fn as_bool(&self) -> Option<&bool> {if let Value::Bool(val) = &self {Some(val)} else {None}}
64    pub fn as_array(&self) -> Option<&Vec<Value>> {if let Value::Array(val) = &self {Some(val)} else {None}}
65
66    pub fn contains(&self, other: &Value) -> Option<bool> {
67        self.as_array().map(|v| v.contains(other))
68    }
69
70    pub fn starts_with(&self, other: &Value) -> Option<bool> {
71        if let Some(other) = other.as_string() {
72            self.as_string().map(|s| s.starts_with(other))
73        } else {None}
74    }
75
76    fn cmp(&self, other: &Value, cmp_type: &CmpType) -> Option<bool> {
77        if *cmp_type == CmpType::E {
78            Some(self == other)
79        } else {
80            self.partial_cmp(other).map(|ordering| {
81                match cmp_type {
82                    CmpType::GT if (ordering as i8) > 0 => true,
83                    CmpType::GTE if (ordering as i8) >= 0 => true,
84                    CmpType::LT if (ordering as i8) < 0 => true,
85                    CmpType::LTE if (ordering as i8) <= 0 => true,
86                    _ => false
87                }
88            })
89        }
90    }
91}
92
93impl PartialEq for Value {
94    fn eq(&self, other: &Self) -> bool {
95        match self {
96            Value::I64(val) => other.as_i64().map(|oval| val.eq(oval)),
97            Value::U64(val) => other.as_u64().map(|oval| val.eq(oval)),
98            Value::Bytes(val) => other.as_bytes().map(|oval| val.eq(oval)),
99            Value::F64(val) => other.as_f64().map(|oval| val.eq(oval)),
100            Value::r#String(val) => other.as_string().map(|oval| val.eq(oval)),
101            Value::Bool(val) => other.as_bool().map(|oval| val.eq(oval)),
102            Value::Array(val) => other.as_array().map(|oval| val.eq(oval))
103        }.unwrap_or(false)
104    }
105}
106
107impl Eq for Value {}
108
109impl From<i64> for Value {fn from(v: i64) -> Self {Value::I64(v)}}
110impl From<u64> for Value {fn from(v: u64) -> Self {Value::U64(v)}}
111impl From<usize> for Value {fn from(v: usize) -> Self {Value::U64(v as u64)}}
112impl From<DateTime<Utc>> for Value {fn from(v: DateTime<Utc>) -> Self {Value::U64(v.timestamp() as u64)}}
113impl From<F64> for Value {fn from(v: F64) -> Self {Value::F64(v)}}
114impl From<String> for Value {fn from(v: String) -> Self {Value::r#String(v)}}
115impl From<&str> for Value {fn from(v: &str) -> Self {Value::r#String(v.to_string())}}
116impl From<bool> for Value {fn from(v: bool) -> Self {Value::Bool(v)}}
117impl From<Vec<u8>> for Value {fn from(v: Vec<u8>) -> Self {Value::Bytes(v)}}
118impl<V: Into<Value>> From<Vec<V>> for Value {fn from(v: Vec<V>) -> Self {Value::Array(v.into_iter().map(|v| v.into()).collect())}}
119
120impl std::fmt::Debug for Value {
121    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
122        match self {
123            Value::I64(i) => write!(f, "{}", i),
124            Value::U64(u) => write!(f, "{}", u),
125            Value::Bytes(u) => write!(f, "{}", hex::encode(u)),
126            Value::F64(_f) => write!(f, "{}", _f),
127            Value::r#String(s) => write!(f, "{}", s),
128            Value::Bool(b) => write!(f, "{}", b),
129            Value::Array(vec) => write!(f, "{:#?}", vec)
130        }
131    }
132}
133
134
135#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
136pub enum CmpType { GT, GTE, E, LT, LTE }
137
138#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
139pub enum Filter {
140    Cmp(Value, CmpType),
141    Contains(Value),
142    StartsWith(Value),
143    All(Vec<Filter>),
144    Any(Vec<Filter>),
145    Not(Box<Filter>)
146
147}
148
149impl Filter {
150    pub fn filter(&self, item: &Value) -> Option<bool> {
151        match self {
152            Filter::Cmp(value, cmp_type) => item.cmp(value, cmp_type),
153            Filter::Contains(value) => item.contains(value),
154            Filter::StartsWith(value) => item.starts_with(value),
155            Filter::All(filters) => {
156                for filter in filters {
157                    if let Some(b) = filter.filter(item) {
158                        if !b {
159                            return Some(true);
160                        }
161                    } else {return None;}
162                }
163                Some(false)
164            },
165            Filter::Any(filters) => {
166                for filter in filters {
167                    if let Some(b) = filter.filter(item) {
168                        if b {
169                            return Some(true);
170                        }
171                    } else {return None;}
172                }
173                Some(false)
174               },
175            Filter::Not(filter) => filter.filter(item).map(|f| !f)
176        }
177    }
178
179    pub fn contains<V: Into<Value>>(val: V) -> Filter {
180        Filter::Contains(val.into())
181    }
182    pub fn range<V: Into<Value>>(start: V, end: V) -> Filter {
183        Filter::All(vec![Filter::Cmp(start.into(), CmpType::GTE), Filter::Cmp(end.into(), CmpType::LTE)])
184    }
185    pub fn new_not(filter: Filter) -> Filter {Filter::Not(Box::new(filter))}
186    pub fn cmp<V: Into<Value>>(cmp: CmpType, val: V) -> Filter {Filter::Cmp(val.into(), cmp)}
187    pub fn equal<V: Into<Value>>(val: V) -> Filter {Filter::Cmp(val.into(), CmpType::E)}
188    pub fn is_equal(&self) -> bool {
189        if let Filter::Cmp(_, t) = self {
190            *t == CmpType::E
191        } else {false}
192    }
193}
194
195pub type Index = BTreeMap<String, Value>;
196
197pub struct IndexBuilder {}
198impl IndexBuilder {
199    pub fn build<V: Into<Value>>(vec: Vec<(&str, V)>) -> Result<Index, Error> {
200        let index = Index::from_iter(vec.into_iter().map(|(k, v)| (k.to_string(), v.into())));
201        if index.contains_key("timestamp_stored") {
202            Err(Error::err("", "'timestamp_stored' is a reserved index"))
203        } else {Ok(index)}
204    }
205}
206
207#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
208pub struct Filters(pub BTreeMap<String, Filter>);
209
210impl Filters {
211    pub fn new(vec: Vec<(&str, Filter)>) -> Filters {
212        Filters(BTreeMap::from_iter(vec.into_iter().map(|(k, f)| (k.to_string(), f))))
213    }
214    pub fn add(&mut self, property: &str, ad_filter: Filter) {
215        if let Some(filter) = self.0.get_mut(property) {
216            if let Filter::All(ref mut filters) = filter {
217                filters.push(ad_filter);
218            } else {
219                *filter = Filter::All(vec![filter.clone(), ad_filter]);
220            }
221        } else {
222            self.0.insert(property.to_string(), ad_filter);
223        }
224    }
225    pub fn combine(&self, b_filters: &Filters, or: bool) -> Filters {
226        let mut filters = Vec::new();
227        for (a_name, a_filter) in &self.0 {
228            if let Some(b_filter) = b_filters.0.get(a_name) {
229                let vec = vec![b_filter.clone(), a_filter.clone()];
230                filters.push((a_name.to_string(), if or {Filter::Any(vec)} else {Filter::All(vec)}));
231            } else {
232                filters.push((a_name.to_string(), a_filter.clone()));
233            }
234        }
235        for (b_name, b_filter) in &b_filters.0 {
236            if !self.0.contains_key(b_name) {
237                filters.push((b_name.to_string(), b_filter.clone()));
238            }
239        }
240        Filters(BTreeMap::from_iter(filters))
241    }
242    pub fn filter(&self, index: &Index) -> bool {
243        self.0.iter().all(|(prop, filter)| {
244            if let Some(value) = index.get(prop) {
245                filter.filter(value).unwrap_or(false)
246            } else {false}
247        })
248    }
249}
250
251#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
252pub enum SortDirection {
253  Descending = -1,
254  Ascending = 1
255}
256
257#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
258pub struct SortOptions {
259    direction: SortDirection,
260    property: String,
261    limit: Option<usize>,
262    cursor_key: Option<Vec<u8>>
263}
264
265impl SortOptions {
266    pub fn new(property: &str) -> Self {
267        SortOptions{
268            direction: SortDirection::Ascending,
269            property: property.to_string(),
270            limit: None,
271            cursor_key: None
272        }
273    }
274
275    pub fn sort<T: Indexable>(&self, values: &mut [T]) -> Result<(), Error> {
276        if values.iter().any(|a| !a.index().contains_key(&self.property)) {
277            return Err(Error::err("", "Sort Property Not Found In All Values"));
278        }
279        values.sort_by(|a, b| {
280            let (f, s) = if self.direction == SortDirection::Ascending {(a, b)} else {(b, a)};
281            f.index().get(&self.property).unwrap()
282            .partial_cmp(
283                s.index().get(&self.property).unwrap()
284            ).unwrap_or(Ordering::Equal)
285        });
286        Ok(())
287    }
288}
289
290#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
291pub struct UuidKeyed<O: Indexable> {
292    inner: O,
293    uuid: Uuid
294}
295
296impl<O: Indexable> UuidKeyed<O> {
297    pub fn new(inner: O) -> Self {
298        UuidKeyed{inner, uuid: Uuid::new_v4()}
299    }
300    pub fn inner(self) -> O {self.inner}
301}
302
303impl<O: Indexable + Serialize + for<'a> Deserialize<'a>> Indexable for UuidKeyed<O> {
304    const PRIMARY_KEY: &'static str = "uuid";
305    const DEFAULT_SORT: &'static str = O::DEFAULT_SORT;
306    fn primary_key(&self) -> Vec<u8> {self.uuid.as_bytes().to_vec()}
307    fn secondary_keys(&self) -> Index {
308        let mut index = self.inner.secondary_keys();
309        index.insert(
310            O::PRIMARY_KEY.to_string(),
311            self.inner.primary_key().into()
312        );
313        index
314    }
315}
316
317#[derive(Clone)]
318pub struct Database {
319    store: Box<dyn KeyValueStore>,
320    location: PathBuf
321}
322
323pub const MAIN: &str = "___main___";
324pub const INDEX: &str = "___index___";
325pub const ALL: &str = "ALL";
326
327impl Database {
328    pub fn location(&self) -> PathBuf {self.location.clone()}
329    pub async fn new<KVS: KeyValueStore + 'static>(location: PathBuf) -> Result<Self, Error> {
330        Ok(Database{store: Box::new(KVS::new(location.clone()).await?), location})
331    }
332
333    pub async fn get_raw(&self, pk: &[u8]) -> Result<Option<Vec<u8>>, Error> {
334        Ok(if let Some(db) = self.store.get_partition(&PathBuf::from(MAIN)).await {
335            db.get(pk).await?
336        } else {None})
337    }
338
339    pub async fn get<I: Indexable + for<'a> Deserialize<'a>>(&self, pk: &[u8]) -> Result<Option<I>, Error> {
340        Ok(self.get_raw(pk).await?.map(|item| {
341            serde_json::from_slice::<I>(&item)
342        }).transpose()?)
343    }
344
345    pub async fn get_all<I: Indexable + for<'a> Deserialize<'a>>(&self) -> Result<Vec<I>, Error> {
346        Ok(if let Some(db) = self.store.get_partition(&PathBuf::from(MAIN)).await {
347            db.values().await?.into_iter().map(|item| Ok::<I, Error>(serde_json::from_slice::<I>(&item)?)).collect::<Result<Vec<I>, Error>>()?
348        } else {Vec::new()})
349    }
350
351    pub async fn keys(&self) -> Result<Vec<Vec<u8>>, Error> {
352        Ok(if let Some(db) = self.store.get_partition(&PathBuf::from(MAIN)).await {
353            db.keys().await?
354        } else {Vec::new()})
355    }
356
357    async fn add(partition: &dyn KeyValueStore, key: &[u8], value: Vec<u8>) -> Result<(), Error> {
358        if let Some(values) = partition.get(key).await? {
359            let mut values: Vec<Vec<u8>> = serde_json::from_slice(&values)?;
360            values.push(value);
361            partition.set(key, &serde_json::to_vec(&values)?).await?;
362        } else {
363            partition.set(key, &serde_json::to_vec(&vec![value])?).await?;
364        }
365        Ok(())
366    }
367
368    async fn remove(partition: &dyn KeyValueStore, key: &[u8], value: &[u8]) -> Result<(), Error> {
369        if let Some(values) = partition.get(key).await? {
370            let mut values: Vec<Vec<u8>> = serde_json::from_slice(&values)?;
371            values.retain(|v| v != value);
372            if !values.is_empty() {
373                partition.set(key, &serde_json::to_vec(&values)?).await?;
374            } else {
375                partition.delete(key).await?;
376            }
377        }
378        Ok(())
379    }
380
381    pub async fn set<I: Indexable + Serialize>(&self, item: &I) -> Result<(), Error> {
382        let pk = item.primary_key();
383        self.delete(&pk).await?;
384        let db = &self.store;
385        let mut keys = item.secondary_keys();
386        if keys.contains_key(I::PRIMARY_KEY) || keys.contains_key("timestamp_stored") {
387            return Err(Error::err("", &format!("'{}' and 'timestamp_stored' are reserved indexes", I::PRIMARY_KEY)));
388        }
389        keys.insert(I::PRIMARY_KEY.to_string(), pk.clone().into());
390        keys.insert("timestamp_stored".to_string(), Utc::now().into());
391        db.partition(PathBuf::from(MAIN)).await?.set(&pk, &serde_json::to_vec(item)?).await?;
392        db.partition(PathBuf::from(INDEX)).await?.set(&pk, &serde_json::to_vec(&keys)?).await?;
393        for (key, value) in keys.iter() {
394            let partition = db.partition(PathBuf::from(&format!("__{}__", key))).await?;
395            let value = serde_json::to_vec(&value)?;
396            Self::add(&*partition, &value, pk.clone()).await?;
397            Self::add(&*partition, ALL.as_bytes(), pk.clone()).await?;
398        }
399        Ok(())
400    }
401
402    pub async fn delete(&self, pk: &[u8]) -> Result<(), Error> {
403        let db = &self.store;
404        db.partition(PathBuf::from(MAIN)).await?.delete(pk).await?;
405        let index_db = db.partition(PathBuf::from(INDEX)).await?;
406        if let Some(index) = index_db.get(pk).await? {
407            index_db.delete(pk).await?;
408            let keys: Index = serde_json::from_slice(&index)?;
409            for (key, value) in keys.iter() {
410                let partition = db.partition(PathBuf::from(&format!("__{}__", key))).await?;
411                let value = serde_json::to_vec(value)?;
412                Self::remove(&*partition, &value, pk).await?;
413                Self::remove(&*partition, ALL.as_bytes(), pk).await?;
414            }
415        }
416        Ok(())
417    }
418
419    pub async fn clear(&self) -> Result<(), Error> {
420        self.store.clear().await?;
421        Ok(())
422    }
423
424    pub async fn query<I: Indexable + for<'a> Deserialize<'a>>(
425        &self,
426        filters: &Filters,
427        sort_options: Option<SortOptions>
428    ) -> Result<(Vec<I>, Option<Vec<u8>>), Error> {
429        let sort_options = sort_options.unwrap_or(SortOptions::new(I::DEFAULT_SORT));
430        let none = || Ok((Vec::new(), None));
431        let db = &self.store;
432
433        if let Some(pk) = filters.0.iter().find_map(|(p, f)| {
434            if p == I::PRIMARY_KEY {
435                if let Filter::Cmp(Value::Bytes(value), CmpType::E) = f {
436                    return Some(value);
437                }
438            }
439            None
440        }) {
441            return Ok((self.get(pk).await?.map(|r| vec![r]).unwrap_or(vec![]), None));
442        }
443
444
445        let db_filters: Vec<String> = filters.0.iter().filter_map(|(p, f)|
446            Some(p.to_string()).filter(|_| f.is_equal())
447        ).collect();
448
449        let partition_name = PathBuf::from(format!("__{}__",
450            db_filters.first().unwrap_or(&sort_options.property)
451        ));
452        let partition = if let Some(p) = db.get_partition(&partition_name).await {p} else {return none();};
453        let index = if let Some(p) = db.get_partition(&PathBuf::from(INDEX)).await {p} else {return none();};
454
455        let all = if let Some(p) = partition.get(ALL.as_bytes()).await? {
456            serde_json::from_slice::<Vec<Vec<u8>>>(&p)?
457        } else {return none();};
458
459        let mut values: Vec<(Vec<u8>, Value)> = vec![];
460        for pk in all {
461            let keys = index.get(&pk).await?.ok_or(
462                Error::err("database.query", "Indexed value not found in index")
463            )?;
464            let keys = serde_json::from_slice::<Index>(&keys)?;
465            if filters.filter(&keys) {
466                let main = db.get_partition(&PathBuf::from(MAIN)).await.ok_or(
467                    Error::err("database.query", "Indexed value not found in main")
468                )?;
469                let sp = keys.get(&sort_options.property).ok_or(
470                    Error::err("database.query", "Sort property not found in matched value")
471                )?.clone();
472
473                if let Some(i) = main.get(&pk).await? {
474                    values.push((i, sp));
475                }
476            }
477        }
478
479        values.sort_by(|a, b| {
480            if sort_options.direction == SortDirection::Ascending {
481                a.1.partial_cmp(&b.1).unwrap_or(Ordering::Equal)
482            } else {
483                b.1.partial_cmp(&a.1).unwrap_or(Ordering::Equal)
484            }
485        });
486
487        let start = sort_options.cursor_key.and_then(|ck|
488            values.iter().position(|(i, _)| *i == ck).map(|p| p+1)
489        ).unwrap_or(0);
490        let end = sort_options.limit.map(|l| start+l).unwrap_or(values.len());
491        let cursor = if end != values.len() {Some(values[end].0.clone())} else {None};
492        if start == end {
493            Ok((Vec::new(), None))
494        } else {
495            Ok((values[start..end].iter().cloned().flat_map(|(item, _)|
496                serde_json::from_slice(&item).ok()
497            ).collect::<Vec<I>>(),
498            cursor))
499        }
500    }
501
502    pub async fn debug(&self) -> Result<String, Error> {
503        Ok(format!("{:#?}",
504            (
505                self.location(),
506                if let Some(index) = self.store.get_partition(&PathBuf::from(INDEX)).await {
507                    let mut index = index.values().await?.into_iter().map(|keys|
508                        Ok(serde_json::from_slice::<Index>(&keys)?)
509                    ).collect::<Result<Vec<Index>, Error>>()?;
510                    index.sort_by_key(|i| i.get("timestamp_stored").unwrap().clone());
511                    index
512                } else {Vec::new()}
513            )
514        ))
515    }
516}
517
518impl std::fmt::Debug for Database {
519    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
520        f.debug_struct("Database")
521        .field("location", &self.location())
522        .finish()
523    }
524}