live_entity/store/
mod.rs

1use crate::{Entity, Event, SingletonEntity, Singleton, SingletonEntityUpdate, SingletonEvent};
2use async_trait::async_trait;
3use std::{error::Error, fmt::Debug, sync::Arc};
4use tokio::sync::broadcast::{Receiver, Sender};
5
6#[cfg(feature = "in-mem")]
7pub mod in_mem;
8
9#[async_trait]
10pub trait Store: Send + Sync + 'static {
11    async fn create<E: Entity>(&self, entity: &E) -> Result<(), Box<dyn Error>>;
12    async fn create_singleton<S: Singleton>(&self, entity: &S) -> Result<(), Box<dyn Error>> {
13        self.create(&SingletonEntity::new(entity.clone())).await
14    }
15    async fn update<E: Entity>(&self, id: &E::ID, update: &E::Update)
16        -> Result<(), Box<dyn Error>>;
17    async fn update_singleton<S: Singleton>(&self, update: &S::Update) -> Result<(), Box<dyn Error>> {
18        self.update::<SingletonEntity<S>>(&S::ENTITY_ID.to_owned(), &SingletonEntityUpdate(update.clone())).await
19    }
20    async fn delete_all<E: Entity>(&self) -> Result<(), Box<dyn Error>>;
21    async fn delete_by_id<E: Entity>(&self, id: &E::ID) -> Result<(), Box<dyn Error>>;
22    async fn delete_singleton<S: Singleton>(&self) -> Result<(), Box<dyn Error>> {
23        self.delete_by_id::<SingletonEntity<S>>(&S::ENTITY_ID.to_owned()).await
24    }
25    async fn get_all<E: Entity>(&self) -> Result<Vec<E>, Box<dyn Error>>;
26    async fn get_by_id<E: Entity>(&self, id: &E::ID) -> Result<E, Box<dyn Error>>;
27    async fn get_singleton<S: Singleton>(&self) -> Result<S, Box<dyn Error>> {
28        self.get_by_id::<SingletonEntity<S>>(&S::ENTITY_ID.to_owned()).await.map(|se| se.0)
29    }
30    async fn watch<E: Entity>(&self, channel: Sender<Event<E>>) -> Result<(), Box<dyn Error>>;
31    async fn watch_singleton<S: Singleton>(self: Arc<Self>, channel: Sender<SingletonEvent<S>>, capacity: usize) -> Result<(), Box<dyn Error>> {
32        let (tx, mut rx) = tokio::sync::broadcast::channel(capacity);
33        let clone = self.clone();
34        let job = tokio::spawn(async move { clone.watch::<SingletonEntity<S>>(tx).await.unwrap(); });
35        while let Ok(evt) = rx.recv().await {
36            channel.send(evt.into())?;
37        }
38        job.abort();
39        job.await?;
40        Ok(())
41    }
42
43    async fn sync<E: Entity>(&self, mut channel: Receiver<Event<E>>) -> Result<(), Box<dyn Error>> {
44        while let Ok(event) = channel.recv().await {
45            match event {
46                Event::Create(e) => self.create(&e).await?,
47                Event::Update { id, update } => self.update::<E>(&id, &update).await?,
48                Event::Delete(id) => self.delete_by_id::<E>(&id).await?,
49            }
50        }
51        Ok(())
52    }
53}
54
55#[derive(Debug)]
56pub struct NotFoundError<T: Debug>(pub T);
57impl<T: Debug> std::fmt::Display for NotFoundError<T> {
58    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
59        f.write_fmt(format_args!("Not found: {:?}", self.0))
60    }
61}
62impl<T: Debug> Error for NotFoundError<T> {}