autoschematic-core 0.14.2

Core shared functionality for Autoschematic: workflow engine, state management, and Git integrations
Documentation
use std::{path::Path, sync::Arc};

use anyhow::Context;
use tokio::task::JoinSet;

use crate::{
    config::{self, AutoschematicConfig},
    connector::{Connector, FilterResponse, VirtToPhyResponse},
    connector_cache::ConnectorCache,
    keystore::KeyStore,
    report::PlanReport,
    template::template_config,
    util::split_prefix_addr,
};

pub async fn plan_connector(
    connector_def: config::Connector,
    connector: Arc<dyn Connector>,
    prefix: &Path,
    virt_addr: &Path,
) -> Result<Option<PlanReport>, anyhow::Error> {
    let mut plan_report = PlanReport {
        prefix: prefix.into(),
        connector_def: Some(connector_def.clone()),
        virt_addr: virt_addr.into(),
        ..Default::default()
    };

    let phy_addr = match connector.addr_virt_to_phy(virt_addr).await? {
        VirtToPhyResponse::NotPresent => None,
        VirtToPhyResponse::Deferred(read_outputs) => {
            for output in read_outputs {
                plan_report.missing_outputs.push(output);
            }
            return Ok(Some(plan_report));
        }
        VirtToPhyResponse::Present(phy_addr) => Some(phy_addr),
        VirtToPhyResponse::Null(phy_addr) => Some(phy_addr),
    };

    let current = match phy_addr {
        Some(ref phy_addr) => {
            match connector.get(&phy_addr.clone()).await.context(format!(
                "{}::get({})",
                connector_def.shortname,
                &phy_addr.to_str().unwrap_or_default()
            ))? {
                // Existing resource present for this address
                Some(get_resource_output) => {
                    let resource = get_resource_output.resource_definition;
                    Some(resource)
                }
                // No existing resource present for this address
                None => None,
            }
        }
        None => None,
    };

    let path = prefix.join(virt_addr);

    let connector_ops = if path.is_file() {
        // let desired = std::fs::read(&path)?;
        let desired_bytes = tokio::fs::read(&path).await?;

        match std::str::from_utf8(&desired_bytes) {
            Ok(desired) => {
                let template_result = template_config(prefix, desired)?;

                if !template_result.missing.is_empty() {
                    for read_output in template_result.missing {
                        plan_report.missing_outputs.push(read_output);
                    }

                    return Ok(Some(plan_report));
                } else {
                    // TODO warning that this phy .unwrap_or( virt )
                    // may be the most diabolically awful design
                    // TODO remove awful design
                    connector
                        .plan(
                            &phy_addr.clone().unwrap_or(virt_addr.into()),
                            current,
                            Some(template_result.body.into()),
                        )
                        .await
                        .context(format!("{}::plan({}, _, _)", connector_def.shortname, virt_addr.display()))?
                }
            }
            Err(_) => {
                // TODO warning that this phy .unwrap_or( virt )
                // may be the most diabolically awful design
                // TODO remove awful design
                connector
                    .plan(&phy_addr.clone().unwrap_or(virt_addr.into()), current, Some(desired_bytes))
                    .await
                    .context(format!("{}::plan({}, _, _)", connector_def.shortname, virt_addr.display()))?
            }
        }
    } else {
        // The file does not exist, so `desired` is therefore None.
        // Generally speaking, this will destroy the given resource if it currently exists.

        // TODO warning that this phy .unwrap_or( virt )
        // may be the most diabolically awful design
        // TODO remove awful design
        connector
            .plan(&phy_addr.clone().unwrap_or(virt_addr.into()), current, None)
            .await
            .context(format!(
                "{}::plan({}, _, _)",
                connector_def.shortname,
                virt_addr.to_str().unwrap_or_default()
            ))?
    };

    plan_report.connector_ops = connector_ops;

    Ok(Some(plan_report))
}

/// For a given path, attempt to resolve its prefix and Connector impl and return a Vec of ConnectorOps.
/// Note that this, unlike the server implementation, does not handle setting desired = None where files do
/// not exist - it is intended to be used from the command line or from LSPs to quickly modify resources.
pub async fn plan(
    autoschematic_config: &AutoschematicConfig,
    connector_cache: Arc<ConnectorCache>,
    keystore: Option<Arc<dyn KeyStore>>,
    connector_filter: &Option<String>,
    path: &Path,
) -> Result<Option<PlanReport>, anyhow::Error> {
    let autoschematic_config = Arc::new(autoschematic_config.clone());

    let Some((prefix, virt_addr)) = split_prefix_addr(&autoschematic_config, path) else {
        // eprintln!("split_prefix_addr None!");
        return Ok(None);
    };

    // let prefix_name = prefix_name.clone();

    let Some(prefix_def) = autoschematic_config.prefixes.get(prefix.to_str().unwrap_or_default()) else {
        // eprintln!("prefix None!");
        return Ok(None);
    };

    let prefix_def = prefix_def.clone();

    let mut joinset: JoinSet<anyhow::Result<Option<PlanReport>>> = JoinSet::new();

    'connector: for connector_def in prefix_def.connectors {
        if let Some(connector_filter) = &connector_filter
            && connector_def.shortname != *connector_filter
        {
            continue 'connector;
        }

        let autoschematic_config = autoschematic_config.clone();
        let connector_cache = connector_cache.clone();
        let keystore = keystore.clone();
        // let prefix_name = prefix_name.clone();
        let prefix = prefix.clone();
        let virt_addr = virt_addr.clone();
        joinset.spawn(async move {
            let Some(prefix_name) = prefix.to_str() else {
                return Ok(None);
            };
            let (connector, mut inbox) = connector_cache
                .get_or_spawn_connector(&autoschematic_config, prefix_name, &connector_def, keystore, true)
                .await?;

            let _reader_handle = tokio::spawn(async move {
                loop {
                    match inbox.recv().await {
                        Ok(Some(stdout)) => {
                            eprintln!("{}", stdout.to_string_lossy());
                        }
                        Ok(None) => {}
                        Err(_) => break,
                    }
                }
            });

            if connector_cache
                .filter_cached(&connector_def.shortname, &prefix, &virt_addr)
                .await?
                == FilterResponse::Resource
            {
                let plan_report = plan_connector(connector_def, connector, &prefix, &virt_addr).await?;
                return Ok(plan_report);
            }
            Ok(None)
            // return connector;
        });

        // if connector_cache.filter(&connector_def.shortname, &prefix, &virt_addr).await? == FilterResponse::Resource {
        //     let plan_report = plan_connector(&connector_def.shortname, &connector, &prefix, &virt_addr).await?;
        //     return Ok(plan_report);
        // }
    }

    while let Some(res) = joinset.join_next().await {
        if let Some(plan_report) = res?? {
            return Ok(Some(plan_report));
        }
    }

    Ok(None)
}