depl 2.4.3

Toolkit for a bunch of local and remote CI/CD actions
Documentation
//! Deployer's watcher module.
//!
//! This module used to create file watchers and interrupt pipelines on disk changes for re-running.

use colored::Colorize;
use fs_change_notifier::{RecursiveMode, create_watcher, match_event};
use std::collections::HashSet;
use std::path::{Path, PathBuf};
use std::sync::{Arc, atomic::AtomicBool};

use crate::cmd::RunArgs;
use crate::entities::observe_executor::ObserveExecutor;
use crate::entities::runs::Runs;
use crate::entities::skipper::Skipper;
use crate::globals::DeployerGlobalConfig;
use crate::project::DeployerProjectOptions;
use crate::rw::log;

/// Starts watching your project.
pub async fn watch(
  config: &DeployerProjectOptions,
  globals: &DeployerGlobalConfig,
  runs: &mut Runs,
  cache_dir: &Path,
  config_dir: &Path,
  storage_dir: &Path,
  args: &RunArgs,
) -> anyhow::Result<()> {
  let mut ignore = HashSet::new();
  config.cache_files.iter().for_each(|i| {
    ignore.insert(i.to_owned());
  });
  config.ignore_files.iter().for_each(|i| {
    ignore.insert(i.to_owned());
  });
  ignore.insert(PathBuf::from("artifacts"));

  // Collect copy_only from the pipeline(s) that will be watched.
  let copy_only = collect_copy_only(config, args);
  if !copy_only.is_empty() {
    log(format!("Watching only copy_only files: {copy_only:?}"));
  }

  let observe_executor = ObserveExecutor::default();
  let observe_client = observe_executor.make_client();
  tokio::task::spawn(observe_executor.run());

  if crate::rw::VERBOSE.get().is_some_and(|v| *v) && !args.no_clear {
    clearscreen::clear().expect("Failed to clear screen");
  }

  let project_root = std::env::current_dir().unwrap();
  let (mut wr, mut rx) = create_watcher(|e| log(format!("{e:?}"))).unwrap();

  if copy_only.is_empty() {
    wr.watch(&project_root, RecursiveMode::Recursive).unwrap();
  } else {
    let mut watched = HashSet::new();
    for path_str in &copy_only {
      let full_path = project_root.join(path_str);
      if full_path.is_dir() {
        if watched.insert(full_path.clone()) {
          wr.watch(&full_path, RecursiveMode::Recursive).unwrap();
        }
      } else if let Some(parent) = full_path.parent()
        && watched.insert(parent.to_path_buf())
      {
        wr.watch(parent, RecursiveMode::NonRecursive).unwrap();
      }
    }
  }

  // Create restart signal that persists across pipeline runs
  let restart_requested = Arc::new(AtomicBool::new(false));
  let restart_requested_clone = restart_requested.clone();

  // Spawn persistent file watcher task in background
  tokio::spawn(async move {
    loop {
      match_event(&project_root, &mut rx, &ignore).await;
      crate::rw::log("Files changed, signaling restart...");
      restart_requested_clone.store(true, std::sync::atomic::Ordering::Relaxed);
    }
  });

  loop {
    if !crate::rw::VERBOSE.get().is_some_and(|v| *v) && !args.no_clear {
      clearscreen::clear().expect("Failed to clear screen");
    }

    let skipper = Skipper::new(args.skip.unwrap_or(0usize));

    // Reset restart flag before running pipeline
    restart_requested.store(false, std::sync::atomic::Ordering::Relaxed);

    // Run pipeline - it will check the flag after each action and exit early if set
    let status = crate::run::run(
      config,
      globals,
      runs,
      cache_dir,
      config_dir,
      storage_dir,
      args,
      Some(observe_client.clone()),
      skipper,
      Some(restart_requested.clone()),
    )
    .await;

    if let Ok(true) = status {
      println!(
        "{} Waiting for files update...",
        "Pipeline executed successfully.".green()
      );
      crate::rw::log("Pipeline executed successfully. Waiting for files update...");
    } else {
      println!("{} Waiting for files update...", "Pipeline failed!".red());
      crate::rw::log("Pipeline failed! Waiting for files update...");
    }

    // Wait for file change before restarting
    while !restart_requested.load(std::sync::atomic::Ordering::Relaxed) {
      tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
    }

    crate::rw::log("Files are changed, restarting pipeline...");
    tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
  }
}

/// Collects `copy_only` paths from the pipeline(s) that match the given run args.
fn collect_copy_only(config: &DeployerProjectOptions, args: &RunArgs) -> Vec<String> {
  let mut copy_only = Vec::new();

  if args.pipeline_tags.is_empty() {
    // Default pipeline selection: collect copy_only from all default pipelines.
    for pipeline in &config.pipelines {
      if pipeline.default.is_some() {
        copy_only.extend(pipeline.copy_only.iter().cloned());
      }
    }
    // If no explicit default, the first pipeline is used.
    if copy_only.is_empty()
      && let Some(pipeline) = config.pipelines.first()
    {
      copy_only.extend(pipeline.copy_only.iter().cloned());
    }
  } else {
    for tag in &args.pipeline_tags {
      if let Some(pipeline) = config.pipelines.iter().find(|p| p.title.as_str() == tag.as_str()) {
        copy_only.extend(pipeline.copy_only.iter().cloned());
      }
    }
  }

  copy_only
}