use crate::prefilter;
use crate::RuntimeChoice;
use std::path::PathBuf;
pub(crate) struct WatchCtx {
pub(crate) path: Option<PathBuf>,
pub(crate) runtime: RuntimeChoice,
pub(crate) debounce: u64,
pub(crate) event: String,
pub(crate) show_action_messages: bool,
pub(crate) preserve_containers_on_failure: bool,
pub(crate) max_concurrency: usize,
pub(crate) base_branch: Option<String>,
pub(crate) activity_type: Option<String>,
pub(crate) max_pending_events: Option<usize>,
pub(crate) ignore_dirs: Vec<String>,
pub(crate) strict_filter: bool,
pub(crate) no_strict_filter: bool,
pub(crate) verbose: bool,
}
pub(crate) async fn run(ctx: WatchCtx) {
let strict_filter = prefilter::effective_strict_filter(ctx.strict_filter, ctx.no_strict_filter);
let workflow_dir = ctx
.path
.clone()
.unwrap_or_else(|| PathBuf::from(".github/workflows"));
if !workflow_dir.exists() {
eprintln!(
"Error: workflow directory not found: {}",
workflow_dir.display()
);
std::process::exit(1);
}
let repo_root =
match tokio::task::spawn_blocking(wrkflw_trigger_filter::find_repo_root_detailed).await {
Ok(Ok(p)) => p,
Ok(Err(e)) => {
eprintln!("Error: {}", e);
std::process::exit(1);
}
Err(join_err) => {
eprintln!("Error: find_repo_root task panicked: {}", join_err);
std::process::exit(1);
}
};
let debounce_duration = std::time::Duration::from_millis(ctx.debounce);
let config = wrkflw_executor::ExecutionConfig {
runtime_type: ctx.runtime.into(),
verbose: ctx.verbose,
preserve_containers_on_failure: ctx.preserve_containers_on_failure,
secrets_config: None,
show_action_messages: ctx.show_action_messages,
target_job: None,
};
use wrkflw_ui::cli_style;
println!(
"{}",
cli_style::success(&format!(
"Watching for changes (event={}, debounce={}ms)... Press Ctrl+C to stop.",
ctx.event, ctx.debounce
))
);
if ctx.base_branch.is_none() {
if let Err(msg) = prefilter::validate_event_requires_base_branch(&ctx.event, strict_filter)
{
eprintln!("Error: {}", msg);
std::process::exit(1);
}
}
let max_pending_for_cfg: usize = match ctx.max_pending_events {
Some(0) => {
wrkflw_logging::warning(
"--max-pending-events 0 is invalid (would cap the pending \
set at zero and drop every event); falling back to the \
library default.",
);
0 }
Some(n) => n,
None => 0,
};
let watcher_cfg = wrkflw_watcher::WatcherConfig::new(workflow_dir, repo_root, config)
.with_event(ctx.event.clone())
.with_base_branch(ctx.base_branch.clone())
.with_activity_type(ctx.activity_type.clone())
.with_debounce(debounce_duration)
.with_verbose(ctx.verbose)
.with_max_concurrency(ctx.max_concurrency)
.with_max_pending_events(max_pending_for_cfg)
.with_extra_ignore_dirs(ctx.ignore_dirs.clone());
let watcher = wrkflw_watcher::WorkflowWatcher::from_config(watcher_cfg);
if let Err(e) = watcher.collect_workflow_files().await {
eprintln!("Error: {}", e);
std::process::exit(1);
}
let shutdown = wrkflw_watcher::ShutdownSignal::new();
let shutdown_for_signal = shutdown.clone();
tokio::spawn(async move {
if tokio::signal::ctrl_c().await.is_ok() {
wrkflw_logging::info(
"Ctrl+C received — draining current watch cycle gracefully. \
Press Ctrl+C again if the drain hangs.",
);
shutdown_for_signal.trigger();
}
});
let watch_result = watcher
.run(shutdown, |watch_event| {
println!(
"\n{}",
cli_style::section(&format!(
"Change detected ({} file(s) changed, {} triggered, {} skipped{})",
watch_event.changed_files.len(),
watch_event.triggered_workflows.len(),
watch_event.skipped_workflows.len(),
if watch_event.dropped_events > 0 {
format!(", {} dropped", watch_event.dropped_events)
} else {
String::new()
}
))
);
if let Some(reason) = &watch_event.error {
eprintln!(" {} {}", cli_style::error("ERROR"), reason);
}
for warning in &watch_event.warnings {
eprintln!(" {} {}", cli_style::warning("WARN"), warning);
}
for wf in &watch_event.triggered_workflows {
println!(" {} {}", cli_style::success("TRIGGERED"), wf);
}
for wf in &watch_event.skipped_workflows {
println!(" {} {}", cli_style::dim("SKIPPED"), wf);
}
})
.await;
if let Err(e) = watch_result {
eprintln!("Watch error: {}", e);
std::process::exit(1);
}
}