use std::{
collections::HashMap,
path::{Path, PathBuf},
sync::Arc,
};
use crate::{
config::Spec,
connector::{
Connector, ConnectorInbox, FilterOutput,
parse::{connector_shortname, parse_connector_name},
spawn::spawn_connector,
},
error::AutoschematicError,
keystore::KeyStore,
};
use anyhow::Context;
use dashmap::DashMap;
type HashKey = (String, PathBuf);
#[derive(Default)]
pub struct ConnectorCache {
cache: Arc<DashMap<HashKey, (Arc<Box<dyn Connector>>, ConnectorInbox)>>,
filter_cache: Arc<DashMap<HashKey, HashMap<PathBuf, FilterOutput>>>,
}
impl ConnectorCache {
pub async fn get_connector(&self, name: &str, prefix: &Path) -> Option<(Arc<Box<dyn Connector>>, ConnectorInbox)> {
let key = (name.into(), prefix.into());
if let Some(entry) = self.cache.get(&key) {
let (connector, inbox) = &*entry;
Some((connector.clone(), inbox.resubscribe()))
} else {
None
}
}
pub async fn get_or_spawn_connector(
&self,
name: &str,
spec: &Spec,
prefix: &Path,
env: &HashMap<String, String>,
keystore: Option<&Box<dyn KeyStore>>,
) -> Result<(Arc<Box<dyn Connector>>, ConnectorInbox), AutoschematicError> {
let key = (name.into(), prefix.into());
if !self.cache.contains_key(&key) {
let (connector, inbox) = spawn_connector(name, spec, prefix, env, keystore)
.await
.context("spawn_connector()")?;
if let Err(e) = connector.init().await {
tracing::error!("In prefix {}: failed to init connector {}: {:#?}", prefix.display(), name, e);
};
let connector_arc = Arc::new(connector);
self.cache.insert(key.clone(), (connector_arc.clone(), inbox.resubscribe()));
Ok((connector_arc, inbox))
} else {
let Some(entry) = self.cache.get(&key) else {
return Err(anyhow::anyhow!("Failed to get connector from cache: name {}, prefix {:?}", name, prefix).into());
};
let (connector, inbox) = &*entry;
Ok((connector.clone(), inbox.resubscribe()))
}
}
pub async fn init_connector(&self, name: &str, prefix: &Path) -> Option<anyhow::Result<()>> {
let connector_key = (name.into(), prefix.into());
if let Some(entry) = self.cache.get(&connector_key) {
let (connector, _inbox) = &*entry;
self.clear_filter_cache(name, prefix).await;
Some(connector.init().await)
} else {
None
}
}
pub async fn filter(&self, name: &str, prefix: &Path, addr: &Path) -> anyhow::Result<FilterOutput> {
let connector_key = (name.into(), prefix.into());
let mut connector_filter_cache = { self.filter_cache.entry(connector_key.clone()).or_insert_with(HashMap::new) };
if let Some(value) = connector_filter_cache.get(addr) {
Ok(*value)
} else if let Some(entry) = self.cache.get(&connector_key) {
let (connector, _inbox) = &*entry;
let res = connector.filter(addr).await?;
connector_filter_cache.insert(addr.into(), res);
Ok(res)
} else {
Ok(FilterOutput::None)
}
}
pub async fn clear_filter_cache(&self, name: &str, prefix: &Path) {
let connector_key = (name.into(), prefix.into());
self.filter_cache.remove(&connector_key);
}
pub async fn clear(&self) {
self.cache.clear();
self.filter_cache.clear();
}
}