1use crate::{
2 event::{MEvent, MEventType},
3 item::Eventable,
4 subscription::{Publisher, Subscription},
5 utils::filter_query,
6};
7
8use serde::de::DeserializeOwned;
9use serde_json::Value;
10use std::{collections::HashMap, sync::Arc};
11use tokio::sync::{mpsc::Receiver, Mutex};
12
13pub struct RepoStruct<T: Eventable<T, PT>, PT: Clone> {
14 subs: Vec<Arc<Mutex<Subscription<T, PT>>>>,
15 state: HashMap<String, T>,
16}
17
18pub trait Repo<T, PT> {
19 fn watch(&mut self, query: PT) -> Receiver<Vec<T>>
20 where
21 Self: Sized;
22}
23
24impl<T: Eventable<T, PT> + PartialEq + DeserializeOwned, PT: Clone> RepoStruct<T, PT> {
25 pub fn new() -> Self {
26 RepoStruct {
27 subs: vec![],
28 state: HashMap::new(),
29 }
30 }
31}
32
33impl<T: Eventable<T, PT> + PartialEq + DeserializeOwned, PT: Clone> Default for RepoStruct<T, PT> {
34 fn default() -> Self {
35 RepoStruct {
36 subs: vec![],
37 state: HashMap::new(),
38 }
39 }
40}
41
42impl<T: Eventable<T, PT> + PartialEq, PT: Clone> RepoStruct<T, PT> {
43 pub async fn process(&mut self, event: MEvent) -> Result<(), serde_json::Error> {
44 let mut item = event.item_json();
45
46 let res = item.as_object_mut();
47
48 if res.is_none() {
49 return Err(serde::de::Error::custom("Invalid JSON"));
50 }
51
52 let mut json = res.unwrap().to_owned();
53
54 let hash = json.get("hash");
55
56 if hash.is_none() {
57 let computed_hash = md5::compute(event.item_json().to_string());
58 let hash_string = format!("{:x}", computed_hash);
59
60 json.insert("hash".to_string(), Value::String(hash_string));
61 }
62
63 let ent = serde_json::from_value::<T>(Value::Object(json))?;
64
65 match event.change_type() {
66 MEventType::SET => {
67 self.set(&ent);
68 }
69 MEventType::DEL => {
70 self.remove(ent.id());
71 }
72 };
73
74 let event_type = event.change_type();
75 for sub in self.subs.iter_mut() {
76 sub.lock().await.handle(&ent, event_type);
77 }
78
79 Ok(())
80 }
81
82 fn remove(&mut self, id: String) -> Option<T> {
83 self.state.remove(&id)
84 }
85
86 fn set(&mut self, item: &T) {
87 self.state.insert(item.id(), item.clone());
88 }
89}
90
91impl<T: Eventable<T, PT> + PartialEq + DeserializeOwned, PT: Clone> Repo<T, PT>
92 for RepoStruct<T, PT>
93{
94 fn watch(&mut self, query: PT) -> Receiver<Vec<T>> {
95 let initial = filter_query(&self.state, &query);
96
97 let (tx, rx) = tokio::sync::mpsc::channel::<Vec<T>>(1);
98
99 tx.try_send(initial.values().cloned().collect()).unwrap();
100
101 let sub = Subscription {
102 state: self.state.clone(),
103 tx,
104 query: Box::new(query),
105 };
106
107 self.subs.push(Arc::new(Mutex::new(sub)));
108
109 rx
110 }
111}