use std::sync::Arc;
use anyhow::bail;
use crate::{
config::AutoschematicConfig,
connector::{Connector, FilterResponse, OutputMapFile, VirtToPhyResponse},
connector_cache::ConnectorCache,
keystore::KeyStore,
report::{ApplyReport, PlanReport},
};
pub async fn apply_connector(connector: Arc<dyn Connector>, plan: &PlanReport) -> anyhow::Result<Option<ApplyReport>> {
let mut apply_report = ApplyReport::default();
for op in &plan.connector_ops {
let op_exec_output = match connector.addr_virt_to_phy(&plan.virt_addr).await? {
VirtToPhyResponse::NotPresent => connector.op_exec(&plan.virt_addr, &op.op_definition).await?,
VirtToPhyResponse::Deferred(_read_outputs) => {
bail!("Apply run on plan with deferred outputs.")
}
VirtToPhyResponse::Present(phy_addr) => connector.op_exec(&phy_addr, &op.op_definition).await?,
VirtToPhyResponse::Null(phy_addr) => connector.op_exec(&phy_addr, &op.op_definition).await?,
};
apply_report.prefix = plan.prefix.clone();
apply_report.virt_addr = plan.virt_addr.clone();
if let Some(outputs) = &op_exec_output.outputs
&& !outputs.is_empty()
{
if let Some(virt_output_path) = OutputMapFile::apply_output_map(&plan.prefix, &plan.virt_addr, outputs)? {
if let VirtToPhyResponse::Present(phy_addr) = connector.addr_virt_to_phy(&plan.virt_addr).await?
&& phy_addr != plan.virt_addr
{
let phy_output_path = OutputMapFile::write_link(&plan.prefix, &phy_addr, &plan.virt_addr)?;
apply_report.wrote_files.push(phy_output_path);
}
apply_report.wrote_files.push(virt_output_path);
} else if let VirtToPhyResponse::Present(phy_addr) = connector.addr_virt_to_phy(&plan.virt_addr).await? {
if let Some(virt_output_path) = OutputMapFile::delete(&plan.prefix, &plan.virt_addr)? {
apply_report.wrote_files.push(virt_output_path);
}
if phy_addr != plan.virt_addr
&& let Some(phy_output_path) = OutputMapFile::delete(&plan.prefix, &phy_addr)?
{
apply_report.wrote_files.push(phy_output_path);
}
}
}
apply_report.outputs.push(op_exec_output);
}
Ok(Some(apply_report))
}
pub async fn apply(
autoschematic_config: &AutoschematicConfig,
connector_cache: Arc<ConnectorCache>,
keystore: Option<Arc<dyn KeyStore>>,
connector_filter: &Option<String>,
plan_report: &PlanReport,
) -> Result<Option<ApplyReport>, anyhow::Error> {
let Some(prefix_name) = plan_report.prefix.to_str() else {
return Ok(None);
};
let Some(prefix_def) = autoschematic_config.prefixes.get(prefix_name) else {
return Ok(None);
};
'connector: for connector_def in &prefix_def.connectors {
if let Some(connector_filter) = &connector_filter
&& connector_def.shortname != *connector_filter
{
continue 'connector;
}
let (connector, mut inbox) = connector_cache
.get_or_spawn_connector(autoschematic_config, prefix_name, connector_def, keystore.clone(), true)
.await?;
let _reader_handle = tokio::spawn(async move {
loop {
match inbox.recv().await {
Ok(Some(stdout)) => {
eprintln!("{}", stdout.to_string_lossy());
}
Ok(None) => {}
Err(_) => break,
}
}
});
if connector_cache
.filter_cached(&connector_def.shortname, &plan_report.prefix, &plan_report.virt_addr)
.await?
== FilterResponse::Resource
{
let apply_report = apply_connector(connector, plan_report).await?;
return Ok(apply_report);
}
}
Ok(None)
}