use std::collections::{HashMap, HashSet};
use maf_schemas::packet::{Bull, OneStoreUpdate, TxPacket};
use serde_json::Value;
use crate::{
app::meta::MetaKey,
store::{SelectKey, StoreId},
App,
};
#[derive(Debug, Default)]
pub struct ObserveStore {
targets: HashMap<ObserveDepdendency, HashSet<ObserveTarget>>,
}
#[non_exhaustive] #[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub(crate) enum ObserveDepdendency {
Store(StoreId),
Users,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub(crate) enum ObserveTarget {
Select(SelectKey),
Meta(MetaKey),
}
impl ObserveStore {
pub(crate) fn add_dependency(&mut self, dependency: ObserveDepdendency, target: ObserveTarget) {
self.targets.entry(dependency).or_default().insert(target);
}
pub(crate) fn get_dependents(
&self,
dependency: &ObserveDepdendency,
) -> Option<&HashSet<ObserveTarget>> {
self.targets.get(dependency)
}
}
impl App {
pub(crate) async fn trigger_update(
&self,
dependency: &ObserveDepdendency,
) -> anyhow::Result<()> {
let dependents = self.inner.observe.get_dependents(dependency);
let users = self.inner.state.users.read().await;
let store_dependency = match dependency {
ObserveDepdendency::Store(store_id) => Some(self.get_any_store(store_id).await?),
#[allow(unreachable_patterns)]
_ => None,
};
for target in dependents.into_iter().flatten() {
match target {
ObserveTarget::Meta(meta_key) => {
self.inner
.meta
.trigger_meta_update(self.clone(), meta_key)
.await?;
}
_ => (),
}
}
for (_user_id, user) in users.iter() {
let mut store_updates: Vec<OneStoreUpdate<Value>> = vec![];
if let Some(store) = &store_dependency {
store_updates.push(OneStoreUpdate {
store: &store.name,
data: Bull::Owned(self.serialize_store(user.clone(), store.clone()).await?),
});
}
for target in dependents.into_iter().flatten() {
match target {
ObserveTarget::Select(select_key) => {
let content = self
.compute_select_contents(&select_key, user.clone())
.await?;
store_updates.push(OneStoreUpdate {
store: &select_key.0,
data: Bull::Owned(content),
});
}
_ => (),
}
}
if !store_updates.is_empty() {
user.send(TxPacket::ManyStoreUpdate::<()>(store_updates))
.ok();
}
}
Ok(())
}
}