live_entity/store/
in_mem.rs

1use std::{collections::HashMap, error::Error, sync::Arc};
2
3use async_trait::async_trait;
4use tokio::sync::{broadcast::Sender, Mutex};
5use typemap_rev::{TypeMap, TypeMapKey, Entry};
6
7use crate::{Entity, Event, NotFoundError, Store, Singleton, SingletonEvent};
8
9#[derive(Clone)]
10pub struct InMemStore {
11    retain: usize,
12    stores: Arc<Mutex<TypeMap>>,
13    singleton_stores: Arc<Mutex<TypeMap>>
14}
15
16impl InMemStore {
17    pub fn new(retain: usize) -> Self {
18        Self {
19            retain,
20            stores: Arc::new(Mutex::new(TypeMap::new())),
21            singleton_stores: Arc::new(Mutex::new(TypeMap::new()))
22        }
23    }
24}
25
26#[derive(Clone)]
27struct EntityWrapper<E: Entity>(E);
28impl<E: Entity> TypeMapKey for EntityWrapper<E> {
29    type Value = (Sender<Event<E>>, HashMap<E::ID, Self>);
30}
31
32struct SingletonWrapper<S: Singleton>(S);
33impl<S: Singleton> TypeMapKey for SingletonWrapper<S> {
34    type Value = (Sender<SingletonEvent<S>>, Option<Self>);
35}
36
37#[async_trait]
38impl Store for InMemStore {
39    async fn create<E: Entity>(&self, entity: &E) -> Result<(), Box<dyn Error>> {
40        let mut stores = self.stores.lock().await;
41        let (channel, map) = stores
42            .entry::<EntityWrapper<E>>()
43            .or_insert((Sender::new(self.retain), HashMap::default()));
44        map.insert(entity.get_id().clone(), EntityWrapper(entity.clone()));
45        if channel.receiver_count() > 0 {
46            channel.send(Event::Create(entity.clone()))?;
47        }
48        Ok(())
49    }
50
51    async fn create_singleton<S: Singleton>(&self, entity: &S) -> Result<(), Box<dyn Error>> {
52        let mut sings = self.singleton_stores.lock().await;
53        let e = sings.entry::<SingletonWrapper<S>>();
54        let channel = match e {
55            Entry::Occupied(mut e) => {
56                let (channel, s) = e.get_mut();
57                s.replace(SingletonWrapper(entity.clone()));
58                channel.clone()
59            }
60            Entry::Vacant(v) => {
61                let channel = Sender::new(self.retain);
62                v.insert((channel.clone(), Some(SingletonWrapper(entity.clone()))));
63                channel
64            }
65        };
66        if channel.receiver_count() > 0 {
67            channel.send(SingletonEvent::Create(entity.clone()))?;
68        }
69        Ok(())
70    }
71
72    async fn update<E: Entity>(
73        &self,
74        id: &E::ID,
75        update: &E::Update,
76    ) -> Result<(), Box<dyn Error>> {
77        let mut stores = self.stores.lock().await;
78        let (channel, map) = stores
79            .get_mut::<EntityWrapper<E>>()
80            .ok_or(NotFoundError(id.clone()))?;
81        let current = map.get_mut(id).ok_or(NotFoundError(id.clone()))?;
82        current.0.update(update);
83        if channel.receiver_count() > 0 {
84            channel.send(Event::Update {
85                id: id.clone(),
86                update: update.clone(),
87            })?;
88        }
89        Ok(())
90    }
91
92    async fn update_singleton<S: Singleton>(&self, update: &S::Update) -> Result<(), Box<dyn Error>> {
93        let mut sings = self.singleton_stores.lock().await;
94        let (channel, current_opt) = sings.get_mut::<SingletonWrapper<S>>().ok_or(NotFoundError(S::ENTITY_ID))?;
95        let current = current_opt.as_mut().ok_or(NotFoundError(S::ENTITY_ID))?;
96        current.0.update(update);
97        if channel.receiver_count() > 0 {
98            channel.send(SingletonEvent::Update(update.clone()))?;
99        }
100        Ok(())
101    }
102
103    async fn delete_all<E: Entity>(&self) -> Result<(), Box<dyn Error>> {
104        let mut stores = self.stores.lock().await;
105        let entry = stores.remove::<EntityWrapper<E>>();
106        if let Some((channel, map)) = entry {
107            if channel.receiver_count() > 0 {
108                for id in map.keys() {
109                    channel.send(Event::Delete(id.clone()))?;
110                }
111            }
112        }
113        Ok(())
114    }
115
116    async fn delete_by_id<E: Entity>(&self, id: &E::ID) -> Result<(), Box<dyn Error>> {
117        let mut stores = self.stores.lock().await;
118        let (channel, map) = stores
119            .get_mut::<EntityWrapper<E>>()
120            .ok_or(NotFoundError(id.clone()))?;
121        map.remove(id);
122        if channel.receiver_count() > 0 {
123            channel.send(Event::Delete(id.clone()))?;
124        }
125        Ok(())
126    }
127
128    async fn delete_singleton<S: Singleton>(&self) -> Result<(), Box<dyn Error>> {
129        let mut sings = self.singleton_stores.lock().await;
130        if let Entry::Occupied(mut e) = sings.entry::<SingletonWrapper<S>>() {
131            let (channel, _) = e.get_mut();
132            if channel.receiver_count() > 0 {
133                channel.send(SingletonEvent::Delete)?;
134            }
135            e.remove();
136        }
137        Ok(())
138    }
139
140    async fn get_all<E: Entity>(&self) -> Result<Vec<E>, Box<dyn Error>> {
141        let stores = self.stores.lock().await;
142        match stores.get::<EntityWrapper<E>>() {
143            Some((_, map)) => Ok(map.values().cloned().map(|w| w.0).collect()),
144            None => Ok(Vec::default()),
145        }
146    }
147
148    async fn get_by_id<E: Entity>(&self, id: &E::ID) -> Result<E, Box<dyn Error>> {
149        let stores = self.stores.lock().await;
150        let (_, map) = stores
151            .get::<EntityWrapper<E>>()
152            .ok_or(NotFoundError(id.clone()))?;
153        map.get(id)
154            .ok_or(NotFoundError(id.clone()))
155            .cloned()
156            .map(|w| w.0)
157            .map_err(|e| e.into())
158    }
159
160    async fn get_singleton<S: Singleton>(&self) -> Result<S, Box<dyn Error>> {
161        let sings = self.singleton_stores.lock().await;
162        let (_, opt_s) = sings.get::<SingletonWrapper<S>>().ok_or(NotFoundError(S::ENTITY_ID))?;
163        let s = opt_s.as_ref().ok_or(NotFoundError(S::ENTITY_ID))?;
164        Ok(s.0.clone())
165    }
166
167    async fn watch<E: Entity>(&self, channel: Sender<Event<E>>) -> Result<(), Box<dyn Error>> {
168        let mut ch = {
169            let mut stores = self.stores.lock().await;
170            let (channel, _) = stores
171                .entry::<EntityWrapper<E>>()
172                .or_insert((Sender::new(self.retain), HashMap::default()));
173            channel.subscribe()
174        };
175        while let Ok(e) = ch.recv().await {
176            channel.send(e)?;
177        }
178        Ok(())
179    }
180
181    async fn watch_singleton<S: Singleton>(self: Arc<Self>, channel: Sender<SingletonEvent<S>>, _: usize) -> Result<(), Box<dyn Error>> {
182        let mut ch = {
183            let mut sings = self.singleton_stores.lock().await;
184            let (channel, _) = sings.entry::<SingletonWrapper<S>>().or_insert((Sender::new(self.retain), None));
185            channel.subscribe()
186        };
187        while let Ok(e) = ch.recv().await {
188            channel.send(e)?;
189        }
190        Ok(())
191    }
192}