use colored::Colorize;
use fs_change_notifier::{RecursiveMode, create_watcher, match_event};
use std::collections::HashSet;
use std::path::{Path, PathBuf};
use std::sync::{Arc, atomic::AtomicBool};
use crate::cmd::RunArgs;
use crate::entities::observe_executor::ObserveExecutor;
use crate::entities::runs::Runs;
use crate::entities::skipper::Skipper;
use crate::globals::DeployerGlobalConfig;
use crate::project::DeployerProjectOptions;
use crate::rw::log;
pub async fn watch(
config: &DeployerProjectOptions,
globals: &DeployerGlobalConfig,
runs: &mut Runs,
cache_dir: &Path,
config_dir: &Path,
storage_dir: &Path,
args: &RunArgs,
) -> anyhow::Result<()> {
let mut ignore = HashSet::new();
config.cache_files.iter().for_each(|i| {
ignore.insert(i.to_owned());
});
config.ignore_files.iter().for_each(|i| {
ignore.insert(i.to_owned());
});
ignore.insert(PathBuf::from("artifacts"));
let copy_only = collect_copy_only(config, args);
if !copy_only.is_empty() {
log(format!("Watching only copy_only files: {copy_only:?}"));
}
let observe_executor = ObserveExecutor::default();
let observe_client = observe_executor.make_client();
tokio::task::spawn(observe_executor.run());
if crate::rw::VERBOSE.get().is_some_and(|v| *v) && !args.no_clear {
clearscreen::clear().expect("Failed to clear screen");
}
let project_root = std::env::current_dir().unwrap();
let (mut wr, mut rx) = create_watcher(|e| log(format!("{e:?}"))).unwrap();
if copy_only.is_empty() {
wr.watch(&project_root, RecursiveMode::Recursive).unwrap();
} else {
let mut watched = HashSet::new();
for path_str in ©_only {
let full_path = project_root.join(path_str);
if full_path.is_dir() {
if watched.insert(full_path.clone()) {
wr.watch(&full_path, RecursiveMode::Recursive).unwrap();
}
} else if let Some(parent) = full_path.parent()
&& watched.insert(parent.to_path_buf())
{
wr.watch(parent, RecursiveMode::NonRecursive).unwrap();
}
}
}
let restart_requested = Arc::new(AtomicBool::new(false));
let restart_requested_clone = restart_requested.clone();
tokio::spawn(async move {
loop {
match_event(&project_root, &mut rx, &ignore).await;
crate::rw::log("Files changed, signaling restart...");
restart_requested_clone.store(true, std::sync::atomic::Ordering::Relaxed);
}
});
loop {
if !crate::rw::VERBOSE.get().is_some_and(|v| *v) && !args.no_clear {
clearscreen::clear().expect("Failed to clear screen");
}
let skipper = Skipper::new(args.skip.unwrap_or(0usize));
restart_requested.store(false, std::sync::atomic::Ordering::Relaxed);
let status = crate::run::run(
config,
globals,
runs,
cache_dir,
config_dir,
storage_dir,
args,
Some(observe_client.clone()),
skipper,
Some(restart_requested.clone()),
)
.await;
if let Ok(true) = status {
println!(
"{} Waiting for files update...",
"Pipeline executed successfully.".green()
);
crate::rw::log("Pipeline executed successfully. Waiting for files update...");
} else {
println!("{} Waiting for files update...", "Pipeline failed!".red());
crate::rw::log("Pipeline failed! Waiting for files update...");
}
while !restart_requested.load(std::sync::atomic::Ordering::Relaxed) {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
crate::rw::log("Files are changed, restarting pipeline...");
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
}
fn collect_copy_only(config: &DeployerProjectOptions, args: &RunArgs) -> Vec<String> {
let mut copy_only = Vec::new();
if args.pipeline_tags.is_empty() {
for pipeline in &config.pipelines {
if pipeline.default.is_some() {
copy_only.extend(pipeline.copy_only.iter().cloned());
}
}
if copy_only.is_empty()
&& let Some(pipeline) = config.pipelines.first()
{
copy_only.extend(pipeline.copy_only.iter().cloned());
}
} else {
for tag in &args.pipeline_tags {
if let Some(pipeline) = config.pipelines.iter().find(|p| p.title.as_str() == tag.as_str()) {
copy_only.extend(pipeline.copy_only.iter().cloned());
}
}
}
copy_only
}