live_entity/store/
in_mem.rs1use std::{collections::HashMap, error::Error, sync::Arc};
2
3use async_trait::async_trait;
4use tokio::sync::{broadcast::Sender, Mutex};
5use typemap_rev::{TypeMap, TypeMapKey, Entry};
6
7use crate::{Entity, Event, NotFoundError, Store, Singleton, SingletonEvent};
8
9#[derive(Clone)]
10pub struct InMemStore {
11 retain: usize,
12 stores: Arc<Mutex<TypeMap>>,
13 singleton_stores: Arc<Mutex<TypeMap>>
14}
15
16impl InMemStore {
17 pub fn new(retain: usize) -> Self {
18 Self {
19 retain,
20 stores: Arc::new(Mutex::new(TypeMap::new())),
21 singleton_stores: Arc::new(Mutex::new(TypeMap::new()))
22 }
23 }
24}
25
26#[derive(Clone)]
27struct EntityWrapper<E: Entity>(E);
28impl<E: Entity> TypeMapKey for EntityWrapper<E> {
29 type Value = (Sender<Event<E>>, HashMap<E::ID, Self>);
30}
31
32struct SingletonWrapper<S: Singleton>(S);
33impl<S: Singleton> TypeMapKey for SingletonWrapper<S> {
34 type Value = (Sender<SingletonEvent<S>>, Option<Self>);
35}
36
37#[async_trait]
38impl Store for InMemStore {
39 async fn create<E: Entity>(&self, entity: &E) -> Result<(), Box<dyn Error>> {
40 let mut stores = self.stores.lock().await;
41 let (channel, map) = stores
42 .entry::<EntityWrapper<E>>()
43 .or_insert((Sender::new(self.retain), HashMap::default()));
44 map.insert(entity.get_id().clone(), EntityWrapper(entity.clone()));
45 if channel.receiver_count() > 0 {
46 channel.send(Event::Create(entity.clone()))?;
47 }
48 Ok(())
49 }
50
51 async fn create_singleton<S: Singleton>(&self, entity: &S) -> Result<(), Box<dyn Error>> {
52 let mut sings = self.singleton_stores.lock().await;
53 let e = sings.entry::<SingletonWrapper<S>>();
54 let channel = match e {
55 Entry::Occupied(mut e) => {
56 let (channel, s) = e.get_mut();
57 s.replace(SingletonWrapper(entity.clone()));
58 channel.clone()
59 }
60 Entry::Vacant(v) => {
61 let channel = Sender::new(self.retain);
62 v.insert((channel.clone(), Some(SingletonWrapper(entity.clone()))));
63 channel
64 }
65 };
66 if channel.receiver_count() > 0 {
67 channel.send(SingletonEvent::Create(entity.clone()))?;
68 }
69 Ok(())
70 }
71
72 async fn update<E: Entity>(
73 &self,
74 id: &E::ID,
75 update: &E::Update,
76 ) -> Result<(), Box<dyn Error>> {
77 let mut stores = self.stores.lock().await;
78 let (channel, map) = stores
79 .get_mut::<EntityWrapper<E>>()
80 .ok_or(NotFoundError(id.clone()))?;
81 let current = map.get_mut(id).ok_or(NotFoundError(id.clone()))?;
82 current.0.update(update);
83 if channel.receiver_count() > 0 {
84 channel.send(Event::Update {
85 id: id.clone(),
86 update: update.clone(),
87 })?;
88 }
89 Ok(())
90 }
91
92 async fn update_singleton<S: Singleton>(&self, update: &S::Update) -> Result<(), Box<dyn Error>> {
93 let mut sings = self.singleton_stores.lock().await;
94 let (channel, current_opt) = sings.get_mut::<SingletonWrapper<S>>().ok_or(NotFoundError(S::ENTITY_ID))?;
95 let current = current_opt.as_mut().ok_or(NotFoundError(S::ENTITY_ID))?;
96 current.0.update(update);
97 if channel.receiver_count() > 0 {
98 channel.send(SingletonEvent::Update(update.clone()))?;
99 }
100 Ok(())
101 }
102
103 async fn delete_all<E: Entity>(&self) -> Result<(), Box<dyn Error>> {
104 let mut stores = self.stores.lock().await;
105 let entry = stores.remove::<EntityWrapper<E>>();
106 if let Some((channel, map)) = entry {
107 if channel.receiver_count() > 0 {
108 for id in map.keys() {
109 channel.send(Event::Delete(id.clone()))?;
110 }
111 }
112 }
113 Ok(())
114 }
115
116 async fn delete_by_id<E: Entity>(&self, id: &E::ID) -> Result<(), Box<dyn Error>> {
117 let mut stores = self.stores.lock().await;
118 let (channel, map) = stores
119 .get_mut::<EntityWrapper<E>>()
120 .ok_or(NotFoundError(id.clone()))?;
121 map.remove(id);
122 if channel.receiver_count() > 0 {
123 channel.send(Event::Delete(id.clone()))?;
124 }
125 Ok(())
126 }
127
128 async fn delete_singleton<S: Singleton>(&self) -> Result<(), Box<dyn Error>> {
129 let mut sings = self.singleton_stores.lock().await;
130 if let Entry::Occupied(mut e) = sings.entry::<SingletonWrapper<S>>() {
131 let (channel, _) = e.get_mut();
132 if channel.receiver_count() > 0 {
133 channel.send(SingletonEvent::Delete)?;
134 }
135 e.remove();
136 }
137 Ok(())
138 }
139
140 async fn get_all<E: Entity>(&self) -> Result<Vec<E>, Box<dyn Error>> {
141 let stores = self.stores.lock().await;
142 match stores.get::<EntityWrapper<E>>() {
143 Some((_, map)) => Ok(map.values().cloned().map(|w| w.0).collect()),
144 None => Ok(Vec::default()),
145 }
146 }
147
148 async fn get_by_id<E: Entity>(&self, id: &E::ID) -> Result<E, Box<dyn Error>> {
149 let stores = self.stores.lock().await;
150 let (_, map) = stores
151 .get::<EntityWrapper<E>>()
152 .ok_or(NotFoundError(id.clone()))?;
153 map.get(id)
154 .ok_or(NotFoundError(id.clone()))
155 .cloned()
156 .map(|w| w.0)
157 .map_err(|e| e.into())
158 }
159
160 async fn get_singleton<S: Singleton>(&self) -> Result<S, Box<dyn Error>> {
161 let sings = self.singleton_stores.lock().await;
162 let (_, opt_s) = sings.get::<SingletonWrapper<S>>().ok_or(NotFoundError(S::ENTITY_ID))?;
163 let s = opt_s.as_ref().ok_or(NotFoundError(S::ENTITY_ID))?;
164 Ok(s.0.clone())
165 }
166
167 async fn watch<E: Entity>(&self, channel: Sender<Event<E>>) -> Result<(), Box<dyn Error>> {
168 let mut ch = {
169 let mut stores = self.stores.lock().await;
170 let (channel, _) = stores
171 .entry::<EntityWrapper<E>>()
172 .or_insert((Sender::new(self.retain), HashMap::default()));
173 channel.subscribe()
174 };
175 while let Ok(e) = ch.recv().await {
176 channel.send(e)?;
177 }
178 Ok(())
179 }
180
181 async fn watch_singleton<S: Singleton>(self: Arc<Self>, channel: Sender<SingletonEvent<S>>, _: usize) -> Result<(), Box<dyn Error>> {
182 let mut ch = {
183 let mut sings = self.singleton_stores.lock().await;
184 let (channel, _) = sings.entry::<SingletonWrapper<S>>().or_insert((Sender::new(self.retain), None));
185 channel.subscribe()
186 };
187 while let Ok(e) = ch.recv().await {
188 channel.send(e)?;
189 }
190 Ok(())
191 }
192}