1use super::Error;
2
3use super::traits::{KeyValueStore, Indexable};
4
5use chrono::{Utc, DateTime};
6use uuid::Uuid;
7
8use std::path::PathBuf;
9use std::collections::BTreeMap;
10use std::cmp::Ordering;
11
12use serde::{Serialize, Deserialize};
13
14#[derive(Serialize, Deserialize, Clone)]
15pub struct F64 {inner: f64}
16
17impl F64 {
18 pub fn new(inner: f64) -> Self {
19 F64{inner}
20 }
21}
22
23impl std::hash::Hash for F64 {
24 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {state.write(&self.inner.to_le_bytes())}
25}
26impl PartialEq for F64 {
27 fn eq(&self, other: &Self) -> bool {matches!(self.inner.total_cmp(&other.inner), Ordering::Equal)}
28}
29impl Eq for F64 {}
30impl PartialOrd for F64 {
31 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {Some(self.cmp(other))}
32}
33impl Ord for F64 {
34 fn cmp(&self, other: &Self) -> Ordering {self.inner.total_cmp(&other.inner)}
35}
36impl From<f64> for F64 {
37 fn from(item: f64) -> F64 {F64::new(item)}
38}
39
40impl std::fmt::Display for F64 {
41 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42 write!(f, "{}", self.inner)
43 }
44}
45
46#[derive(Serialize, Deserialize, Clone, PartialOrd, Ord)]
47pub enum Value {
48 I64(i64),
49 U64(u64),
50 Bytes(Vec<u8>),
51 F64(F64),
52 r#String(String),
53 Bool(bool),
54 Array(Vec<Value>)
55}
56
57impl Value {
58 pub fn as_i64(&self) -> Option<&i64> {if let Value::I64(val) = &self {Some(val)} else {None}}
59 pub fn as_u64(&self) -> Option<&u64> {if let Value::U64(val) = &self {Some(val)} else {None}}
60 pub fn as_bytes(&self) -> Option<&Vec<u8>> {if let Value::Bytes(val) = &self {Some(val)} else {None}}
61 pub fn as_f64(&self) -> Option<&F64> {if let Value::F64(val) = &self {Some(val)} else {None}}
62 pub fn as_string(&self) -> Option<&String> {if let Value::r#String(val) = &self {Some(val)} else {None}}
63 pub fn as_bool(&self) -> Option<&bool> {if let Value::Bool(val) = &self {Some(val)} else {None}}
64 pub fn as_array(&self) -> Option<&Vec<Value>> {if let Value::Array(val) = &self {Some(val)} else {None}}
65
66 pub fn contains(&self, other: &Value) -> Option<bool> {
67 self.as_array().map(|v| v.contains(other))
68 }
69
70 pub fn starts_with(&self, other: &Value) -> Option<bool> {
71 if let Some(other) = other.as_string() {
72 self.as_string().map(|s| s.starts_with(other))
73 } else {None}
74 }
75
76 fn cmp(&self, other: &Value, cmp_type: &CmpType) -> Option<bool> {
77 if *cmp_type == CmpType::E {
78 Some(self == other)
79 } else {
80 self.partial_cmp(other).map(|ordering| {
81 match cmp_type {
82 CmpType::GT if (ordering as i8) > 0 => true,
83 CmpType::GTE if (ordering as i8) >= 0 => true,
84 CmpType::LT if (ordering as i8) < 0 => true,
85 CmpType::LTE if (ordering as i8) <= 0 => true,
86 _ => false
87 }
88 })
89 }
90 }
91}
92
93impl PartialEq for Value {
94 fn eq(&self, other: &Self) -> bool {
95 match self {
96 Value::I64(val) => other.as_i64().map(|oval| val.eq(oval)),
97 Value::U64(val) => other.as_u64().map(|oval| val.eq(oval)),
98 Value::Bytes(val) => other.as_bytes().map(|oval| val.eq(oval)),
99 Value::F64(val) => other.as_f64().map(|oval| val.eq(oval)),
100 Value::r#String(val) => other.as_string().map(|oval| val.eq(oval)),
101 Value::Bool(val) => other.as_bool().map(|oval| val.eq(oval)),
102 Value::Array(val) => other.as_array().map(|oval| val.eq(oval))
103 }.unwrap_or(false)
104 }
105}
106
107impl Eq for Value {}
108
109impl From<i64> for Value {fn from(v: i64) -> Self {Value::I64(v)}}
110impl From<u64> for Value {fn from(v: u64) -> Self {Value::U64(v)}}
111impl From<usize> for Value {fn from(v: usize) -> Self {Value::U64(v as u64)}}
112impl From<DateTime<Utc>> for Value {fn from(v: DateTime<Utc>) -> Self {Value::U64(v.timestamp() as u64)}}
113impl From<F64> for Value {fn from(v: F64) -> Self {Value::F64(v)}}
114impl From<String> for Value {fn from(v: String) -> Self {Value::r#String(v)}}
115impl From<&str> for Value {fn from(v: &str) -> Self {Value::r#String(v.to_string())}}
116impl From<bool> for Value {fn from(v: bool) -> Self {Value::Bool(v)}}
117impl From<Vec<u8>> for Value {fn from(v: Vec<u8>) -> Self {Value::Bytes(v)}}
118impl<V: Into<Value>> From<Vec<V>> for Value {fn from(v: Vec<V>) -> Self {Value::Array(v.into_iter().map(|v| v.into()).collect())}}
119
120impl std::fmt::Debug for Value {
121 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
122 match self {
123 Value::I64(i) => write!(f, "{}", i),
124 Value::U64(u) => write!(f, "{}", u),
125 Value::Bytes(u) => write!(f, "{}", hex::encode(u)),
126 Value::F64(_f) => write!(f, "{}", _f),
127 Value::r#String(s) => write!(f, "{}", s),
128 Value::Bool(b) => write!(f, "{}", b),
129 Value::Array(vec) => write!(f, "{:#?}", vec)
130 }
131 }
132}
133
134
135#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
136pub enum CmpType { GT, GTE, E, LT, LTE }
137
138#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
139pub enum Filter {
140 Cmp(Value, CmpType),
141 Contains(Value),
142 StartsWith(Value),
143 All(Vec<Filter>),
144 Any(Vec<Filter>),
145 Not(Box<Filter>)
146
147}
148
149impl Filter {
150 pub fn filter(&self, item: &Value) -> Option<bool> {
151 match self {
152 Filter::Cmp(value, cmp_type) => item.cmp(value, cmp_type),
153 Filter::Contains(value) => item.contains(value),
154 Filter::StartsWith(value) => item.starts_with(value),
155 Filter::All(filters) => {
156 for filter in filters {
157 if let Some(b) = filter.filter(item) {
158 if !b {
159 return Some(true);
160 }
161 } else {return None;}
162 }
163 Some(false)
164 },
165 Filter::Any(filters) => {
166 for filter in filters {
167 if let Some(b) = filter.filter(item) {
168 if b {
169 return Some(true);
170 }
171 } else {return None;}
172 }
173 Some(false)
174 },
175 Filter::Not(filter) => filter.filter(item).map(|f| !f)
176 }
177 }
178
179 pub fn contains<V: Into<Value>>(val: V) -> Filter {
180 Filter::Contains(val.into())
181 }
182 pub fn range<V: Into<Value>>(start: V, end: V) -> Filter {
183 Filter::All(vec![Filter::Cmp(start.into(), CmpType::GTE), Filter::Cmp(end.into(), CmpType::LTE)])
184 }
185 pub fn new_not(filter: Filter) -> Filter {Filter::Not(Box::new(filter))}
186 pub fn cmp<V: Into<Value>>(cmp: CmpType, val: V) -> Filter {Filter::Cmp(val.into(), cmp)}
187 pub fn equal<V: Into<Value>>(val: V) -> Filter {Filter::Cmp(val.into(), CmpType::E)}
188 pub fn is_equal(&self) -> bool {
189 if let Filter::Cmp(_, t) = self {
190 *t == CmpType::E
191 } else {false}
192 }
193}
194
195pub type Index = BTreeMap<String, Value>;
196
197pub struct IndexBuilder {}
198impl IndexBuilder {
199 pub fn build<V: Into<Value>>(vec: Vec<(&str, V)>) -> Result<Index, Error> {
200 let index = Index::from_iter(vec.into_iter().map(|(k, v)| (k.to_string(), v.into())));
201 if index.contains_key("timestamp_stored") {
202 Err(Error::err("", "'timestamp_stored' is a reserved index"))
203 } else {Ok(index)}
204 }
205}
206
207#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
208pub struct Filters(pub BTreeMap<String, Filter>);
209
210impl Filters {
211 pub fn new(vec: Vec<(&str, Filter)>) -> Filters {
212 Filters(BTreeMap::from_iter(vec.into_iter().map(|(k, f)| (k.to_string(), f))))
213 }
214 pub fn add(&mut self, property: &str, ad_filter: Filter) {
215 if let Some(filter) = self.0.get_mut(property) {
216 if let Filter::All(ref mut filters) = filter {
217 filters.push(ad_filter);
218 } else {
219 *filter = Filter::All(vec![filter.clone(), ad_filter]);
220 }
221 } else {
222 self.0.insert(property.to_string(), ad_filter);
223 }
224 }
225 pub fn combine(&self, b_filters: &Filters, or: bool) -> Filters {
226 let mut filters = Vec::new();
227 for (a_name, a_filter) in &self.0 {
228 if let Some(b_filter) = b_filters.0.get(a_name) {
229 let vec = vec![b_filter.clone(), a_filter.clone()];
230 filters.push((a_name.to_string(), if or {Filter::Any(vec)} else {Filter::All(vec)}));
231 } else {
232 filters.push((a_name.to_string(), a_filter.clone()));
233 }
234 }
235 for (b_name, b_filter) in &b_filters.0 {
236 if !self.0.contains_key(b_name) {
237 filters.push((b_name.to_string(), b_filter.clone()));
238 }
239 }
240 Filters(BTreeMap::from_iter(filters))
241 }
242 pub fn filter(&self, index: &Index) -> bool {
243 self.0.iter().all(|(prop, filter)| {
244 if let Some(value) = index.get(prop) {
245 filter.filter(value).unwrap_or(false)
246 } else {false}
247 })
248 }
249}
250
251#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
252pub enum SortDirection {
253 Descending = -1,
254 Ascending = 1
255}
256
257#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
258pub struct SortOptions {
259 direction: SortDirection,
260 property: String,
261 limit: Option<usize>,
262 cursor_key: Option<Vec<u8>>
263}
264
265impl SortOptions {
266 pub fn new(property: &str) -> Self {
267 SortOptions{
268 direction: SortDirection::Ascending,
269 property: property.to_string(),
270 limit: None,
271 cursor_key: None
272 }
273 }
274
275 pub fn sort<T: Indexable>(&self, values: &mut [T]) -> Result<(), Error> {
276 if values.iter().any(|a| !a.index().contains_key(&self.property)) {
277 return Err(Error::err("", "Sort Property Not Found In All Values"));
278 }
279 values.sort_by(|a, b| {
280 let (f, s) = if self.direction == SortDirection::Ascending {(a, b)} else {(b, a)};
281 f.index().get(&self.property).unwrap()
282 .partial_cmp(
283 s.index().get(&self.property).unwrap()
284 ).unwrap_or(Ordering::Equal)
285 });
286 Ok(())
287 }
288}
289
290#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
291pub struct UuidKeyed<O: Indexable> {
292 inner: O,
293 uuid: Uuid
294}
295
296impl<O: Indexable> UuidKeyed<O> {
297 pub fn new(inner: O) -> Self {
298 UuidKeyed{inner, uuid: Uuid::new_v4()}
299 }
300 pub fn inner(self) -> O {self.inner}
301}
302
303impl<O: Indexable + Serialize + for<'a> Deserialize<'a>> Indexable for UuidKeyed<O> {
304 const PRIMARY_KEY: &'static str = "uuid";
305 const DEFAULT_SORT: &'static str = O::DEFAULT_SORT;
306 fn primary_key(&self) -> Vec<u8> {self.uuid.as_bytes().to_vec()}
307 fn secondary_keys(&self) -> Index {
308 let mut index = self.inner.secondary_keys();
309 index.insert(
310 O::PRIMARY_KEY.to_string(),
311 self.inner.primary_key().into()
312 );
313 index
314 }
315}
316
317#[derive(Clone)]
318pub struct Database {
319 store: Box<dyn KeyValueStore>,
320 location: PathBuf
321}
322
323pub const MAIN: &str = "___main___";
324pub const INDEX: &str = "___index___";
325pub const ALL: &str = "ALL";
326
327impl Database {
328 pub fn location(&self) -> PathBuf {self.location.clone()}
329 pub async fn new<KVS: KeyValueStore + 'static>(location: PathBuf) -> Result<Self, Error> {
330 Ok(Database{store: Box::new(KVS::new(location.clone()).await?), location})
331 }
332
333 pub async fn get_raw(&self, pk: &[u8]) -> Result<Option<Vec<u8>>, Error> {
334 Ok(if let Some(db) = self.store.get_partition(&PathBuf::from(MAIN)).await {
335 db.get(pk).await?
336 } else {None})
337 }
338
339 pub async fn get<I: Indexable + for<'a> Deserialize<'a>>(&self, pk: &[u8]) -> Result<Option<I>, Error> {
340 Ok(self.get_raw(pk).await?.map(|item| {
341 serde_json::from_slice::<I>(&item)
342 }).transpose()?)
343 }
344
345 pub async fn get_all<I: Indexable + for<'a> Deserialize<'a>>(&self) -> Result<Vec<I>, Error> {
346 Ok(if let Some(db) = self.store.get_partition(&PathBuf::from(MAIN)).await {
347 db.values().await?.into_iter().map(|item| Ok::<I, Error>(serde_json::from_slice::<I>(&item)?)).collect::<Result<Vec<I>, Error>>()?
348 } else {Vec::new()})
349 }
350
351 pub async fn keys(&self) -> Result<Vec<Vec<u8>>, Error> {
352 Ok(if let Some(db) = self.store.get_partition(&PathBuf::from(MAIN)).await {
353 db.keys().await?
354 } else {Vec::new()})
355 }
356
357 async fn add(partition: &dyn KeyValueStore, key: &[u8], value: Vec<u8>) -> Result<(), Error> {
358 if let Some(values) = partition.get(key).await? {
359 let mut values: Vec<Vec<u8>> = serde_json::from_slice(&values)?;
360 values.push(value);
361 partition.set(key, &serde_json::to_vec(&values)?).await?;
362 } else {
363 partition.set(key, &serde_json::to_vec(&vec![value])?).await?;
364 }
365 Ok(())
366 }
367
368 async fn remove(partition: &dyn KeyValueStore, key: &[u8], value: &[u8]) -> Result<(), Error> {
369 if let Some(values) = partition.get(key).await? {
370 let mut values: Vec<Vec<u8>> = serde_json::from_slice(&values)?;
371 values.retain(|v| v != value);
372 if !values.is_empty() {
373 partition.set(key, &serde_json::to_vec(&values)?).await?;
374 } else {
375 partition.delete(key).await?;
376 }
377 }
378 Ok(())
379 }
380
381 pub async fn set<I: Indexable + Serialize>(&self, item: &I) -> Result<(), Error> {
382 let pk = item.primary_key();
383 self.delete(&pk).await?;
384 let db = &self.store;
385 let mut keys = item.secondary_keys();
386 if keys.contains_key(I::PRIMARY_KEY) || keys.contains_key("timestamp_stored") {
387 return Err(Error::err("", &format!("'{}' and 'timestamp_stored' are reserved indexes", I::PRIMARY_KEY)));
388 }
389 keys.insert(I::PRIMARY_KEY.to_string(), pk.clone().into());
390 keys.insert("timestamp_stored".to_string(), Utc::now().into());
391 db.partition(PathBuf::from(MAIN)).await?.set(&pk, &serde_json::to_vec(item)?).await?;
392 db.partition(PathBuf::from(INDEX)).await?.set(&pk, &serde_json::to_vec(&keys)?).await?;
393 for (key, value) in keys.iter() {
394 let partition = db.partition(PathBuf::from(&format!("__{}__", key))).await?;
395 let value = serde_json::to_vec(&value)?;
396 Self::add(&*partition, &value, pk.clone()).await?;
397 Self::add(&*partition, ALL.as_bytes(), pk.clone()).await?;
398 }
399 Ok(())
400 }
401
402 pub async fn delete(&self, pk: &[u8]) -> Result<(), Error> {
403 let db = &self.store;
404 db.partition(PathBuf::from(MAIN)).await?.delete(pk).await?;
405 let index_db = db.partition(PathBuf::from(INDEX)).await?;
406 if let Some(index) = index_db.get(pk).await? {
407 index_db.delete(pk).await?;
408 let keys: Index = serde_json::from_slice(&index)?;
409 for (key, value) in keys.iter() {
410 let partition = db.partition(PathBuf::from(&format!("__{}__", key))).await?;
411 let value = serde_json::to_vec(value)?;
412 Self::remove(&*partition, &value, pk).await?;
413 Self::remove(&*partition, ALL.as_bytes(), pk).await?;
414 }
415 }
416 Ok(())
417 }
418
419 pub async fn clear(&self) -> Result<(), Error> {
420 self.store.clear().await?;
421 Ok(())
422 }
423
424 pub async fn query<I: Indexable + for<'a> Deserialize<'a>>(
425 &self,
426 filters: &Filters,
427 sort_options: Option<SortOptions>
428 ) -> Result<(Vec<I>, Option<Vec<u8>>), Error> {
429 let sort_options = sort_options.unwrap_or(SortOptions::new(I::DEFAULT_SORT));
430 let none = || Ok((Vec::new(), None));
431 let db = &self.store;
432
433 if let Some(pk) = filters.0.iter().find_map(|(p, f)| {
434 if p == I::PRIMARY_KEY {
435 if let Filter::Cmp(Value::Bytes(value), CmpType::E) = f {
436 return Some(value);
437 }
438 }
439 None
440 }) {
441 return Ok((self.get(pk).await?.map(|r| vec![r]).unwrap_or(vec![]), None));
442 }
443
444
445 let db_filters: Vec<String> = filters.0.iter().filter_map(|(p, f)|
446 Some(p.to_string()).filter(|_| f.is_equal())
447 ).collect();
448
449 let partition_name = PathBuf::from(format!("__{}__",
450 db_filters.first().unwrap_or(&sort_options.property)
451 ));
452 let partition = if let Some(p) = db.get_partition(&partition_name).await {p} else {return none();};
453 let index = if let Some(p) = db.get_partition(&PathBuf::from(INDEX)).await {p} else {return none();};
454
455 let all = if let Some(p) = partition.get(ALL.as_bytes()).await? {
456 serde_json::from_slice::<Vec<Vec<u8>>>(&p)?
457 } else {return none();};
458
459 let mut values: Vec<(Vec<u8>, Value)> = vec![];
460 for pk in all {
461 let keys = index.get(&pk).await?.ok_or(
462 Error::err("database.query", "Indexed value not found in index")
463 )?;
464 let keys = serde_json::from_slice::<Index>(&keys)?;
465 if filters.filter(&keys) {
466 let main = db.get_partition(&PathBuf::from(MAIN)).await.ok_or(
467 Error::err("database.query", "Indexed value not found in main")
468 )?;
469 let sp = keys.get(&sort_options.property).ok_or(
470 Error::err("database.query", "Sort property not found in matched value")
471 )?.clone();
472
473 if let Some(i) = main.get(&pk).await? {
474 values.push((i, sp));
475 }
476 }
477 }
478
479 values.sort_by(|a, b| {
480 if sort_options.direction == SortDirection::Ascending {
481 a.1.partial_cmp(&b.1).unwrap_or(Ordering::Equal)
482 } else {
483 b.1.partial_cmp(&a.1).unwrap_or(Ordering::Equal)
484 }
485 });
486
487 let start = sort_options.cursor_key.and_then(|ck|
488 values.iter().position(|(i, _)| *i == ck).map(|p| p+1)
489 ).unwrap_or(0);
490 let end = sort_options.limit.map(|l| start+l).unwrap_or(values.len());
491 let cursor = if end != values.len() {Some(values[end].0.clone())} else {None};
492 if start == end {
493 Ok((Vec::new(), None))
494 } else {
495 Ok((values[start..end].iter().cloned().flat_map(|(item, _)|
496 serde_json::from_slice(&item).ok()
497 ).collect::<Vec<I>>(),
498 cursor))
499 }
500 }
501
502 pub async fn debug(&self) -> Result<String, Error> {
503 Ok(format!("{:#?}",
504 (
505 self.location(),
506 if let Some(index) = self.store.get_partition(&PathBuf::from(INDEX)).await {
507 let mut index = index.values().await?.into_iter().map(|keys|
508 Ok(serde_json::from_slice::<Index>(&keys)?)
509 ).collect::<Result<Vec<Index>, Error>>()?;
510 index.sort_by_key(|i| i.get("timestamp_stored").unwrap().clone());
511 index
512 } else {Vec::new()}
513 )
514 ))
515 }
516}
517
518impl std::fmt::Debug for Database {
519 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
520 f.debug_struct("Database")
521 .field("location", &self.location())
522 .finish()
523 }
524}