use std::{
collections::HashMap,
path::{Path, PathBuf},
sync::Arc,
};
use crate::{
config::{self, AutoschematicConfig},
connector::{
Connector, ConnectorInbox, FilterResponse,
handle::{ConnectorHandle, ConnectorHandleStatus},
spawn::spawn_connector,
},
connector_util::check_connector_host_version_match,
error::AutoschematicError,
keystore::KeyStore,
util::parse_env_file,
};
use anyhow::Context;
use dashmap::DashMap;
use serde::Serialize;
use tokio::task::JoinSet;
#[derive(Debug, Clone, Serialize)]
pub enum InitStatus {
Offline,
Spawning,
Initializing,
Error(String),
Running,
}
#[derive(Debug, Serialize)]
pub struct TopResponse {
handle_status: ConnectorHandleStatus,
init_status: InitStatus,
}
#[derive(Clone, PartialEq, Eq, Hash, Serialize)]
pub struct ConnectorCacheKey {
pub prefix: PathBuf,
pub shortname: String,
}
pub type ConnectorCacheValue = (Arc<dyn ConnectorHandle>, ConnectorInbox);
#[derive(Default)]
pub struct ConnectorCache {
cache: Arc<DashMap<ConnectorCacheKey, ConnectorCacheValue>>,
init_status: Arc<DashMap<ConnectorCacheKey, InitStatus>>,
filter_cache: Arc<DashMap<ConnectorCacheKey, HashMap<PathBuf, FilterResponse>>>,
}
impl ConnectorCache {
pub async fn top(&self) -> HashMap<ConnectorCacheKey, TopResponse> {
let mut res = HashMap::new();
let keys: Vec<ConnectorCacheKey> = self.cache.iter().map(|kv| kv.key().clone()).collect();
for key in keys {
if let Some(connector) = self.cache.get(&key).map(|kv| kv.0.clone()) {
let init_status = match self.init_status.get(&key) {
Some(init_status) => init_status.value().clone(),
None => InitStatus::Initializing,
};
res.insert(
key,
TopResponse {
handle_status: connector.status().await,
init_status,
},
);
}
}
res
}
pub async fn get_connector(&self, name: &str, prefix: &Path) -> Option<(Arc<dyn Connector>, ConnectorInbox)> {
let key = ConnectorCacheKey {
shortname: name.into(),
prefix: prefix.into(),
};
if let Some(entry) = self.cache.get(&key) {
let (connector, inbox) = &*entry;
Some((connector.clone(), inbox.resubscribe()))
} else {
None
}
}
pub async fn get_or_spawn_connector(
&self,
config: &AutoschematicConfig,
prefix: &str,
connector_def: &config::Connector,
keystore: Option<Arc<dyn KeyStore>>,
do_init: bool,
) -> Result<(Arc<dyn Connector>, ConnectorInbox), AutoschematicError> {
let key = ConnectorCacheKey {
shortname: connector_def.shortname.clone(),
prefix: prefix.into(),
};
let Some(prefix_def) = config.prefixes.get(prefix) else {
return Err(anyhow::anyhow!(format!("No such prefix {}", prefix)).into());
};
let spec = &connector_def.spec;
let mut env = HashMap::new();
if let Some(ref env_file) = prefix_def.env_file {
for (k, v) in parse_env_file(&std::fs::read_to_string(env_file).context(format!("Reading env file {}", env_file))?)
{
env.insert(k, v);
}
}
for (k, v) in &prefix_def.env {
env.insert(k.into(), v.into());
}
if let Some(ref env_file) = connector_def.env_file {
for (k, v) in parse_env_file(&std::fs::read_to_string(env_file).context(format!("Reading env file {}", env_file))?)
{
env.insert(k, v);
}
}
for (k, v) in &connector_def.env {
env.insert(k.into(), v.into());
}
if let Some((connector, inbox)) = self.cache.get(&key).map(|entry| {
let (connector, inbox) = &*entry;
(connector.clone(), inbox.resubscribe())
}) {
let need_init = match self.init_status.entry(key.clone()) {
dashmap::Entry::Occupied(status_ref) => match status_ref.get() {
InitStatus::Offline => do_init,
InitStatus::Spawning => do_init,
InitStatus::Initializing => false,
InitStatus::Error(_) => do_init,
InitStatus::Running => false,
},
dashmap::Entry::Vacant(vacant_entry) => {
vacant_entry.insert(InitStatus::Offline);
do_init
}
};
if need_init {
self.init_status.insert(key.clone(), InitStatus::Initializing);
if let Err(e) = connector.init().await {
tracing::error!(
"In prefix {}: failed to init connector {}: {:#?}",
prefix,
connector_def.shortname,
e
);
self.init_status.insert(key.clone(), InitStatus::Error(format!("{:#?}", e)));
} else {
self.init_status.insert(key.clone(), InitStatus::Running);
}
}
Ok((connector.clone(), inbox.resubscribe()))
} else {
self.init_status.insert(key.clone(), InitStatus::Spawning);
let (connector, inbox) = spawn_connector(&connector_def.shortname, spec, &PathBuf::from(prefix), &env, keystore)
.await
.context("spawn_connector()")?;
check_connector_host_version_match(&connector_def.shortname, &connector).await?;
if do_init {
self.init_status.insert(key.clone(), InitStatus::Initializing);
if let Err(e) = connector.init().await {
tracing::error!(
"In prefix {}: failed to init connector {}: {:#?}",
prefix,
connector_def.shortname,
e
);
self.init_status.insert(key.clone(), InitStatus::Error(format!("{:#?}", e)));
} else {
self.init_status.insert(key.clone(), InitStatus::Running);
}
}
let connector_arc = Arc::new(connector);
self.cache.insert(key, (connector_arc.clone(), inbox.resubscribe()));
Ok((connector_arc, inbox))
}
}
pub async fn init_connector(&self, name: &str, prefix: &Path) -> Option<anyhow::Result<()>> {
let key = ConnectorCacheKey {
shortname: name.into(),
prefix: prefix.into(),
};
if let Some(entry) = self.cache.get(&key) {
let (connector, _inbox) = &*entry;
self.clear_filter_cache(name, prefix).await;
Some(connector.init().await)
} else {
None
}
}
pub async fn filter_cached(&self, name: &str, prefix: &Path, addr: &Path) -> anyhow::Result<FilterResponse> {
let key = ConnectorCacheKey {
shortname: name.into(),
prefix: prefix.into(),
};
if let Some(value) = self.filter_cache.get(&key).and_then(|cache| cache.get(addr).copied()) {
Ok(value)
} else if let Some(connector) = self.cache.get(&key).map(|entry| entry.0.clone()) {
let res = connector.filter(addr).await?;
self.filter_cache.entry(key.clone()).or_default().insert(addr.into(), res);
Ok(res)
} else {
Ok(FilterResponse::none())
}
}
pub async fn filter_all_cached(
&self,
autoschematic_config: &AutoschematicConfig,
addr: &Path,
) -> anyhow::Result<FilterResponse> {
for (prefix_name, prefix_def) in &autoschematic_config.prefixes {
for connector_def in &prefix_def.connectors {
match self
.filter_cached(&connector_def.shortname, &PathBuf::from(prefix_name), addr)
.await?
{
FilterResponse::None => continue,
resp => return Ok(resp),
}
}
}
Ok(FilterResponse::None)
}
pub async fn clear_filter_cache(&self, name: &str, prefix: &Path) {
let key = ConnectorCacheKey {
shortname: name.into(),
prefix: prefix.into(),
};
self.filter_cache.remove(&key);
}
pub async fn clear(&self) {
let keys: Vec<ConnectorCacheKey> = self.cache.iter().map(|kv| kv.key().clone()).collect();
let mut joinset = JoinSet::new();
for key in keys {
if let Some(kv) = self.cache.get(&key) {
let connector = kv.0.clone();
joinset.spawn(async move { connector.kill().await });
}
}
joinset.join_all().await;
self.cache.clear();
self.filter_cache.clear();
}
}