myko_rs/
repo.rs

1use crate::{
2    event::{MEvent, MEventType},
3    item::Eventable,
4    subscription::{Publisher, Subscription},
5    utils::filter_query,
6};
7
8use serde::de::DeserializeOwned;
9use serde_json::Value;
10use std::{collections::HashMap, sync::Arc};
11use tokio::sync::{mpsc::Receiver, Mutex};
12
13pub struct RepoStruct<T: Eventable<T, PT>, PT: Clone> {
14    subs: Vec<Arc<Mutex<Subscription<T, PT>>>>,
15    state: HashMap<String, T>,
16}
17
18pub trait Repo<T, PT> {
19    fn watch(&mut self, query: PT) -> Receiver<Vec<T>>
20    where
21        Self: Sized;
22}
23
24impl<T: Eventable<T, PT> + PartialEq + DeserializeOwned, PT: Clone> RepoStruct<T, PT> {
25    pub fn new() -> Self {
26        RepoStruct {
27            subs: vec![],
28            state: HashMap::new(),
29        }
30    }
31}
32
33impl<T: Eventable<T, PT> + PartialEq + DeserializeOwned, PT: Clone> Default for RepoStruct<T, PT> {
34    fn default() -> Self {
35        RepoStruct {
36            subs: vec![],
37            state: HashMap::new(),
38        }
39    }
40}
41
42impl<T: Eventable<T, PT> + PartialEq, PT: Clone> RepoStruct<T, PT> {
43    pub async fn process(&mut self, event: MEvent) -> Result<(), serde_json::Error> {
44        let mut item = event.item_json();
45
46        let res = item.as_object_mut();
47
48        if res.is_none() {
49            return Err(serde::de::Error::custom("Invalid JSON"));
50        }
51
52        let mut json = res.unwrap().to_owned();
53
54        let hash = json.get("hash");
55
56        if hash.is_none() {
57            let computed_hash = md5::compute(event.item_json().to_string());
58            let hash_string = format!("{:x}", computed_hash);
59
60            json.insert("hash".to_string(), Value::String(hash_string));
61        }
62
63        let ent = serde_json::from_value::<T>(Value::Object(json))?;
64
65        match event.change_type() {
66            MEventType::SET => {
67                self.set(&ent);
68            }
69            MEventType::DEL => {
70                self.remove(ent.id());
71            }
72        };
73
74        let event_type = event.change_type();
75        for sub in self.subs.iter_mut() {
76            sub.lock().await.handle(&ent, event_type);
77        }
78
79        Ok(())
80    }
81
82    fn remove(&mut self, id: String) -> Option<T> {
83        self.state.remove(&id)
84    }
85
86    fn set(&mut self, item: &T) {
87        self.state.insert(item.id(), item.clone());
88    }
89}
90
91impl<T: Eventable<T, PT> + PartialEq + DeserializeOwned, PT: Clone> Repo<T, PT>
92    for RepoStruct<T, PT>
93{
94    fn watch(&mut self, query: PT) -> Receiver<Vec<T>> {
95        let initial = filter_query(&self.state, &query);
96
97        let (tx, rx) = tokio::sync::mpsc::channel::<Vec<T>>(1);
98
99        tx.try_send(initial.values().cloned().collect()).unwrap();
100
101        let sub = Subscription {
102            state: self.state.clone(),
103            tx,
104            query: Box::new(query),
105        };
106
107        self.subs.push(Arc::new(Mutex::new(sub)));
108
109        rx
110    }
111}