use std::collections::HashMap;
use std::sync::RwLock;
use super::state::MvState;
use super::types::StreamingMvDef;
pub struct MvRegistry {
defs: RwLock<HashMap<(u64, String), StreamingMvDef>>,
states: RwLock<HashMap<(u64, String), std::sync::Arc<MvState>>>,
}
impl MvRegistry {
pub fn new() -> Self {
Self {
defs: RwLock::new(HashMap::new()),
states: RwLock::new(HashMap::new()),
}
}
pub fn register(&self, def: StreamingMvDef) {
let key = (def.tenant_id, def.name.clone());
let state = std::sync::Arc::new(MvState::new(
def.name.clone(),
def.group_by_columns.clone(),
def.aggregates.clone(),
));
let mut defs = self.defs.write().unwrap_or_else(|p| p.into_inner());
defs.insert(key.clone(), def);
let mut states = self.states.write().unwrap_or_else(|p| p.into_inner());
states.insert(key, state);
}
pub fn unregister(&self, tenant_id: u64, name: &str) -> bool {
let key = (tenant_id, name.to_string());
let mut defs = self.defs.write().unwrap_or_else(|p| p.into_inner());
let existed = defs.remove(&key).is_some();
let mut states = self.states.write().unwrap_or_else(|p| p.into_inner());
states.remove(&key);
existed
}
pub fn get_def(&self, tenant_id: u64, name: &str) -> Option<StreamingMvDef> {
let key = (tenant_id, name.to_string());
let defs = self.defs.read().unwrap_or_else(|p| p.into_inner());
defs.get(&key).cloned()
}
pub fn get_state(&self, tenant_id: u64, name: &str) -> Option<std::sync::Arc<MvState>> {
let key = (tenant_id, name.to_string());
let states = self.states.read().unwrap_or_else(|p| p.into_inner());
states.get(&key).cloned()
}
pub fn find_by_source(
&self,
tenant_id: u64,
stream_name: &str,
) -> Vec<std::sync::Arc<MvState>> {
let defs = self.defs.read().unwrap_or_else(|p| p.into_inner());
let states = self.states.read().unwrap_or_else(|p| p.into_inner());
defs.iter()
.filter(|((tid, _), def)| *tid == tenant_id && def.source_stream == stream_name)
.filter_map(|(key, _)| states.get(key).cloned())
.collect()
}
pub fn clear_and_reload(
&self,
catalog: &crate::control::security::catalog::types::SystemCatalog,
) -> crate::Result<()> {
let fresh = catalog.load_all_streaming_mvs()?;
let mut defs = self.defs.write().unwrap_or_else(|p| p.into_inner());
let mut states = self.states.write().unwrap_or_else(|p| p.into_inner());
defs.clear();
states.clear();
for mv in fresh {
let key = (mv.tenant_id, mv.name.clone());
let state = std::sync::Arc::new(crate::event::streaming_mv::state::MvState::new(
mv.name.clone(),
mv.group_by_columns.clone(),
mv.aggregates.clone(),
));
defs.insert(key.clone(), mv);
states.insert(key, state);
}
Ok(())
}
pub fn list_all(&self) -> Vec<StreamingMvDef> {
let defs = self.defs.read().unwrap_or_else(|p| p.into_inner());
defs.values().cloned().collect()
}
pub fn list_for_tenant(&self, tenant_id: u64) -> Vec<StreamingMvDef> {
let defs = self.defs.read().unwrap_or_else(|p| p.into_inner());
defs.values()
.filter(|d| d.tenant_id == tenant_id)
.cloned()
.collect()
}
pub fn load_from_catalog(
&self,
catalog: &crate::control::security::catalog::types::SystemCatalog,
) {
let mvs = match catalog.load_all_streaming_mvs() {
Ok(m) => m,
Err(e) => {
tracing::warn!(error = %e, "failed to load streaming MVs from catalog");
return;
}
};
for mv in mvs {
self.register(mv);
}
}
}
impl Default for MvRegistry {
fn default() -> Self {
Self::new()
}
}