use std::collections::HashMap;
use crate::store::api::{KeyValueStore, StateStore};
use crate::store::kv::KeyValueBytesStore;
#[derive(Default)]
pub(crate) struct StoreRegistry {
stores: HashMap<String, Box<dyn StateStore>>,
}
impl StoreRegistry {
pub fn insert(&mut self, store: Box<dyn StateStore>) {
self.stores.insert(store.name().to_string(), store);
}
pub fn get_kv<K: Send + Sync + 'static, V: Send + 'static>(
&mut self,
name: &str,
) -> Option<&mut dyn KeyValueStore<K, V>> {
let store = self.stores.get_mut(name)?;
let concrete = store
.as_any_mut()
.downcast_mut::<KeyValueBytesStore<K, V>>()?;
Some(concrete as &mut dyn KeyValueStore<K, V>)
}
pub fn names(&self) -> Vec<String> {
self.stores.keys().cloned().collect()
}
pub fn get_window<K: Send + Sync + 'static, V: Send + 'static>(
&mut self,
name: &str,
) -> Option<&mut dyn crate::store::window::WindowStore<K, V>> {
let store = self.stores.get_mut(name)?;
let concrete = store
.as_any_mut()
.downcast_mut::<crate::store::window::WindowBytesStore<K, V>>()?;
Some(concrete as &mut dyn crate::store::window::WindowStore<K, V>)
}
pub fn get_join_window<K: Send + Sync + 'static, V: Send + 'static>(
&mut self,
name: &str,
) -> Option<&mut dyn crate::store::join_window::JoinWindowStore<K, V>> {
let store = self.stores.get_mut(name)?;
let concrete = store
.as_any_mut()
.downcast_mut::<crate::store::join_window::JoinWindowBytesStore<K, V>>()?;
Some(concrete as &mut dyn crate::store::join_window::JoinWindowStore<K, V>)
}
pub fn get_session<K: Send + Sync + 'static, V: Send + 'static>(
&mut self,
name: &str,
) -> Option<&mut dyn crate::store::session::SessionStore<K, V>> {
let store = self.stores.get_mut(name)?;
let concrete = store
.as_any_mut()
.downcast_mut::<crate::store::session::SessionBytesStore<K, V>>()?;
Some(concrete as &mut dyn crate::store::session::SessionStore<K, V>)
}
pub(crate) fn get_suppress<K: Send + Sync + 'static, V: Send + 'static>(
&mut self,
name: &str,
) -> Option<&mut dyn crate::store::suppress_store::SuppressStore<K, V>> {
let store = self.stores.get_mut(name)?;
let concrete = store
.as_any_mut()
.downcast_mut::<crate::store::suppress_store::SuppressBytesStore<K, V>>()?;
Some(concrete as &mut dyn crate::store::suppress_store::SuppressStore<K, V>)
}
pub(crate) fn get_fk_subscription(
&mut self,
name: &str,
) -> Option<&mut crate::store::fk_subscription::SubscriptionBytesStore> {
let store = self.stores.get_mut(name)?;
store
.as_any_mut()
.downcast_mut::<crate::store::fk_subscription::SubscriptionBytesStore>()
}
pub fn get_mut(&mut self, name: &str) -> Option<&mut dyn StateStore> {
self.stores.get_mut(name).map(std::convert::AsMut::as_mut)
}
pub(crate) fn iq_get(&self, name: &str) -> Option<&dyn crate::store::iq::IqQueryable> {
self.stores.get(name).and_then(|s| s.as_iq())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::processor::serde::{I64Serde, StringSerde};
use crate::store::kv::KeyValueBytesStore;
use assert2::check;
#[tokio::test]
async fn register_and_downcast_typed_store() {
let mut reg = StoreRegistry::default();
reg.insert(Box::new(KeyValueBytesStore::<String, i64>::in_memory(
"counts".into(),
Box::new(StringSerde),
Box::new(I64Serde),
"c-changelog".into(),
)));
let s = reg.get_kv::<String, i64>("counts").unwrap();
s.put("x".into(), 5).await;
check!(s.get(&"x".to_string()).await == Some(5));
check!(reg.get_kv::<i64, i64>("counts").is_none());
check!(reg.get_kv::<String, i64>("missing").is_none());
}
}