use std::{path::Path, sync::Arc};
use anyhow::Context;
use tokio::task::JoinSet;
use crate::{
config::{self, AutoschematicConfig},
connector::{Connector, FilterResponse, VirtToPhyResponse},
connector_cache::ConnectorCache,
keystore::KeyStore,
report::PlanReport,
template::template_config,
util::split_prefix_addr,
};
pub async fn plan_connector(
connector_def: config::Connector,
connector: Arc<dyn Connector>,
prefix: &Path,
virt_addr: &Path,
) -> Result<Option<PlanReport>, anyhow::Error> {
let mut plan_report = PlanReport {
prefix: prefix.into(),
connector_def: Some(connector_def.clone()),
virt_addr: virt_addr.into(),
..Default::default()
};
let phy_addr = match connector.addr_virt_to_phy(virt_addr).await? {
VirtToPhyResponse::NotPresent => None,
VirtToPhyResponse::Deferred(read_outputs) => {
for output in read_outputs {
plan_report.missing_outputs.push(output);
}
return Ok(Some(plan_report));
}
VirtToPhyResponse::Present(phy_addr) => Some(phy_addr),
VirtToPhyResponse::Null(phy_addr) => Some(phy_addr),
};
let current = match phy_addr {
Some(ref phy_addr) => {
match connector.get(&phy_addr.clone()).await.context(format!(
"{}::get({})",
connector_def.shortname,
&phy_addr.to_str().unwrap_or_default()
))? {
Some(get_resource_output) => {
let resource = get_resource_output.resource_definition;
Some(resource)
}
None => None,
}
}
None => None,
};
let path = prefix.join(virt_addr);
let connector_ops = if path.is_file() {
let desired_bytes = tokio::fs::read(&path).await?;
match std::str::from_utf8(&desired_bytes) {
Ok(desired) => {
let template_result = template_config(prefix, desired)?;
if !template_result.missing.is_empty() {
for read_output in template_result.missing {
plan_report.missing_outputs.push(read_output);
}
return Ok(Some(plan_report));
} else {
connector
.plan(
&phy_addr.clone().unwrap_or(virt_addr.into()),
current,
Some(template_result.body.into()),
)
.await
.context(format!("{}::plan({}, _, _)", connector_def.shortname, virt_addr.display()))?
}
}
Err(_) => {
connector
.plan(&phy_addr.clone().unwrap_or(virt_addr.into()), current, Some(desired_bytes))
.await
.context(format!("{}::plan({}, _, _)", connector_def.shortname, virt_addr.display()))?
}
}
} else {
connector
.plan(&phy_addr.clone().unwrap_or(virt_addr.into()), current, None)
.await
.context(format!(
"{}::plan({}, _, _)",
connector_def.shortname,
virt_addr.to_str().unwrap_or_default()
))?
};
plan_report.connector_ops = connector_ops;
Ok(Some(plan_report))
}
pub async fn plan(
autoschematic_config: &AutoschematicConfig,
connector_cache: Arc<ConnectorCache>,
keystore: Option<Arc<dyn KeyStore>>,
connector_filter: &Option<String>,
path: &Path,
) -> Result<Option<PlanReport>, anyhow::Error> {
let autoschematic_config = Arc::new(autoschematic_config.clone());
let Some((prefix, virt_addr)) = split_prefix_addr(&autoschematic_config, path) else {
return Ok(None);
};
let Some(prefix_def) = autoschematic_config.prefixes.get(prefix.to_str().unwrap_or_default()) else {
return Ok(None);
};
let prefix_def = prefix_def.clone();
let mut joinset: JoinSet<anyhow::Result<Option<PlanReport>>> = JoinSet::new();
'connector: for connector_def in prefix_def.connectors {
if let Some(connector_filter) = &connector_filter
&& connector_def.shortname != *connector_filter
{
continue 'connector;
}
let autoschematic_config = autoschematic_config.clone();
let connector_cache = connector_cache.clone();
let keystore = keystore.clone();
let prefix = prefix.clone();
let virt_addr = virt_addr.clone();
joinset.spawn(async move {
let Some(prefix_name) = prefix.to_str() else {
return Ok(None);
};
let (connector, mut inbox) = connector_cache
.get_or_spawn_connector(&autoschematic_config, prefix_name, &connector_def, keystore, true)
.await?;
let _reader_handle = tokio::spawn(async move {
loop {
match inbox.recv().await {
Ok(Some(stdout)) => {
eprintln!("{stdout}");
}
Ok(None) => {}
Err(_) => break,
}
}
});
if connector_cache
.filter_cached(&connector_def.shortname, &prefix, &virt_addr)
.await?
== FilterResponse::Resource
{
let plan_report = plan_connector(connector_def, connector, &prefix, &virt_addr).await?;
return Ok(plan_report);
}
Ok(None)
});
}
while let Some(res) = joinset.join_next().await {
if let Some(plan_report) = res?? {
return Ok(Some(plan_report));
}
}
Ok(None)
}