use std::path::{Path, PathBuf};
use anyhow::{Context, bail};
use tokio::sync::broadcast::error::RecvError;
use crate::{
config::AutoschematicConfig,
connector::{Connector, OutputMapFile},
connector_cache::ConnectorCache,
error::AutoschematicError,
glob::addr_matches_filter,
keystore::KeyStore,
};
pub enum ImportMessage {
SkipExisting(PathBuf),
StartGet(PathBuf),
GetSuccess(PathBuf),
}
pub type ImportOutbox = tokio::sync::mpsc::Sender<ImportMessage>;
pub type ImportInbox = tokio::sync::mpsc::Sender<ImportMessage>;
pub async fn import_resource(
connector_shortname: &str,
connector: &Box<dyn Connector>,
prefix: &Path,
phy_addr: &Path,
overwrite_existing: bool,
) -> Result<bool, anyhow::Error> {
let phy_addr = if phy_addr.is_absolute() {
phy_addr.strip_prefix("/")?
} else {
phy_addr
};
let virt_addr = connector.addr_phy_to_virt(phy_addr).await?.unwrap_or(phy_addr.to_path_buf());
let phy_path = prefix.join(phy_addr);
let phy_out_path = OutputMapFile::path(prefix, phy_addr);
let virt_out_path = OutputMapFile::path(prefix, &virt_addr);
let virt_path = prefix.join(&virt_addr);
if virt_path.exists() && !overwrite_existing {
eprintln!("\u{1b}[92m [SKIP] \u{1b}[39m {} (already exists)", virt_path.display());
} else if phy_path.exists() && !overwrite_existing {
eprintln!("\u{1b}[92m [SKIP] \u{1b}[39m {} (already exists)", phy_path.display());
} else if phy_out_path.exists() && !overwrite_existing {
} else if virt_out_path.exists() && !overwrite_existing {
} else {
tracing::info!("import at path: {:?}", virt_path);
match connector
.get(phy_addr)
.await
.context(format!("{}::get()", connector_shortname))?
{
Some(get_resource_output) => {
let body = get_resource_output.resource_definition;
let res_path = prefix.join(&virt_addr);
if let Some(parent) = res_path.parent() {
tokio::fs::create_dir_all(parent).await?;
}
eprintln!("\u{1b}[92m [PULL] \u{1b}[39m {}", res_path.display());
tokio::fs::write(&res_path, body).await?;
if let Some(outputs) = get_resource_output.outputs {
if !outputs.is_empty() {
let output_map_file = OutputMapFile::OutputMap(outputs);
output_map_file.write(&prefix, &virt_addr)?;
if virt_addr != phy_addr {
OutputMapFile::write_link(&prefix, &phy_addr, &virt_addr)?;
}
}
}
return Ok(true);
}
None => {
tracing::error!("No remote resource at addr:{:?} path: {:?}", phy_addr, virt_path);
}
}
}
Ok(false)
}
pub async fn import_complete() {}
pub async fn import_all(
autoschematic_config: &AutoschematicConfig,
connector_cache: &ConnectorCache,
keystore: Option<&Box<dyn KeyStore>>,
outbox: ImportOutbox,
subpath: Option<PathBuf>,
prefix_filter: Option<String>,
connector_filter: Option<String>,
overwrite_existing: bool,
) -> Result<(usize, usize), AutoschematicError> {
let resource_group_map = autoschematic_config.resource_group_map();
let subpath = subpath.unwrap_or(PathBuf::from("./"));
let mut imported_count: usize = 0;
let mut total_count: usize = 0;
for (prefix_name, prefix) in &autoschematic_config.prefixes {
if let Some(prefix_filter) = &prefix_filter {
if prefix_name != prefix_filter {
continue;
}
}
for connector_def in &prefix.connectors {
let prefix_name = PathBuf::from(&prefix_name);
if let Some(connector_filter) = &connector_filter {
if connector_def.shortname != *connector_filter {
continue;
}
}
let mut imported_subcount: usize = 0;
tracing::info!("connector init: {}", connector_def.shortname);
let (connector, mut inbox) = connector_cache
.get_or_spawn_connector(
&connector_def.shortname,
&connector_def.spec,
&PathBuf::from(&prefix_name),
&connector_def.env,
keystore,
)
.await?;
let _reader_handle = tokio::spawn(async move {
loop {
match inbox.recv().await {
Ok(Some(stdout)) => {
eprintln!("{}", stdout);
}
Err(RecvError::Closed) => break,
_ => {}
}
}
});
let phy_addrs = connector.list(&subpath.clone()).await.context(format!(
"{}::list({})",
connector_def.shortname,
subpath.to_str().unwrap_or_default()
))?;
'phy_addr: for phy_addr in phy_addrs {
if !addr_matches_filter(&prefix_name, &phy_addr, &subpath) {
continue 'phy_addr;
}
if let Some(ref resource_group) = prefix.resource_group {
if let Some(neighbour_prefixes) = resource_group_map.get(resource_group) {
for neighbour_prefix in neighbour_prefixes.iter().filter(|p| **p != prefix_name) {
if neighbour_prefix.join(&phy_addr).exists() {
continue 'phy_addr;
}
if OutputMapFile::path(neighbour_prefix, &phy_addr).exists() {
continue 'phy_addr;
}
}
}
}
let res = import_resource(
&connector_def.shortname,
&connector,
&prefix_name,
&phy_addr,
overwrite_existing,
)
.await?;
if res {
imported_subcount += 1;
imported_count += 1;
}
total_count += 1;
}
}
}
Ok((imported_count, total_count))
}