1use std::{
2 ffi::OsString,
3 fs,
4 io::{self, IsTerminal, Write},
5 path::PathBuf,
6 process::{Child, Command, ExitStatus, Stdio},
7 thread,
8 time::{Duration, Instant},
9};
10
11use tracing::{debug, info, warn};
12
13use crate::{
14 analysis::{CommandAdapterId, CommandAnalysisStatus, SideEffectProfile},
15 error::{Result, WithWatchError},
16 snapshot::{capture_snapshot, ChangeDetectionMode, CommandSource, SnapshotState, WatchInput},
17 watch::{CollectedEvents, WatchLoop},
18};
19
20const DEFAULT_POLL_TIMEOUT: Duration = Duration::from_millis(50);
21const DEFAULT_DEBOUNCE_WINDOW: Duration = Duration::from_millis(200);
22const CLEAR_TERMINAL_SEQUENCE: &str = "\x1b[2J\x1b[H";
23const WITH_WATCH_TEST_RUN_MARKER_DIR_ENV: &str = "WITH_WATCH_TEST_RUN_MARKER_DIR";
24
25#[derive(Debug, Clone, Copy, PartialEq, Eq)]
26pub enum OutputRefreshMode {
27 Preserve,
28 ClearTerminal,
29}
30
31impl OutputRefreshMode {
32 pub fn as_str(self) -> &'static str {
33 match self {
34 Self::Preserve => "preserve",
35 Self::ClearTerminal => "clear-terminal",
36 }
37 }
38}
39
40#[derive(Debug, Clone)]
41pub struct ExecutionPlan {
42 pub source: CommandSource,
43 pub detection_mode: ChangeDetectionMode,
44 pub output_refresh_mode: OutputRefreshMode,
45 pub inputs: Vec<WatchInput>,
46 pub delegated_command: DelegatedCommand,
47 pub metadata: ExecutionMetadata,
48}
49
50impl ExecutionPlan {
51 pub fn passthrough(
52 argv: Vec<OsString>,
53 inputs: Vec<WatchInput>,
54 detection_mode: ChangeDetectionMode,
55 output_refresh_mode: OutputRefreshMode,
56 metadata: ExecutionMetadata,
57 ) -> Self {
58 Self {
59 source: CommandSource::Argv,
60 detection_mode,
61 output_refresh_mode,
62 inputs,
63 delegated_command: DelegatedCommand::Argv(argv),
64 metadata,
65 }
66 }
67
68 pub fn shell(
69 expression: String,
70 inputs: Vec<WatchInput>,
71 detection_mode: ChangeDetectionMode,
72 output_refresh_mode: OutputRefreshMode,
73 metadata: ExecutionMetadata,
74 ) -> Self {
75 Self {
76 source: CommandSource::Shell,
77 detection_mode,
78 output_refresh_mode,
79 inputs,
80 delegated_command: DelegatedCommand::Shell(expression),
81 metadata,
82 }
83 }
84
85 pub fn exec(
86 argv: Vec<OsString>,
87 inputs: Vec<WatchInput>,
88 detection_mode: ChangeDetectionMode,
89 output_refresh_mode: OutputRefreshMode,
90 metadata: ExecutionMetadata,
91 ) -> Self {
92 Self {
93 source: CommandSource::Exec,
94 detection_mode,
95 output_refresh_mode,
96 inputs,
97 delegated_command: DelegatedCommand::Argv(argv),
98 metadata,
99 }
100 }
101}
102
103#[derive(Debug, Clone)]
104pub struct ExecutionMetadata {
105 pub adapter_ids: Vec<CommandAdapterId>,
106 pub fallback_used: bool,
107 pub default_watch_root_used: bool,
108 pub filtered_output_count: usize,
109 pub side_effect_profile: SideEffectProfile,
110 pub status: CommandAnalysisStatus,
111}
112
113impl ExecutionMetadata {
114 pub fn adapter_field(&self) -> String {
115 self.adapter_ids
116 .iter()
117 .map(|adapter| adapter.as_str())
118 .collect::<Vec<_>>()
119 .join(",")
120 }
121}
122
123#[derive(Debug, Clone)]
124pub enum DelegatedCommand {
125 Argv(Vec<OsString>),
126 Shell(String),
127}
128
129impl DelegatedCommand {
130 fn spawn_log_summary(&self) -> DelegatedCommandLogSummary {
131 match self {
132 Self::Argv(argv) => {
133 let program_name = argv
134 .first()
135 .map(program_name)
136 .unwrap_or_else(|| "<missing>".to_string());
137 DelegatedCommandLogSummary {
138 execution_kind: "argv",
139 program_name,
140 arg_count: argv.len().saturating_sub(1),
141 }
142 }
143 Self::Shell(_) => DelegatedCommandLogSummary {
144 execution_kind: "shell",
145 program_name: "sh".to_string(),
146 arg_count: 2,
147 },
148 }
149 }
150
151 fn display_name(&self) -> String {
152 match self {
153 Self::Argv(argv) => argv
154 .iter()
155 .map(|value| value.to_string_lossy().into_owned())
156 .collect::<Vec<_>>()
157 .join(" "),
158 Self::Shell(expression) => expression.clone(),
159 }
160 }
161}
162
163#[derive(Debug, Clone, PartialEq, Eq)]
164struct DelegatedCommandLogSummary {
165 execution_kind: &'static str,
166 program_name: String,
167 arg_count: usize,
168}
169
170#[derive(Debug, Clone, Copy)]
171pub struct RunnerOptions {
172 pub debounce_window: Duration,
173 pub poll_timeout: Duration,
174 pub max_runs: Option<usize>,
175}
176
177impl Default for RunnerOptions {
178 fn default() -> Self {
179 Self {
180 debounce_window: DEFAULT_DEBOUNCE_WINDOW,
181 poll_timeout: DEFAULT_POLL_TIMEOUT,
182 max_runs: None,
183 }
184 }
185}
186
187impl RunnerOptions {
188 pub fn from_environment() -> Self {
189 let mut options = Self::default();
190
191 if let Ok(raw_max_runs) = std::env::var("WITH_WATCH_TEST_MAX_RUNS") {
196 if let Ok(parsed) = raw_max_runs.parse::<usize>() {
197 options.max_runs = Some(parsed);
198 }
199 }
200
201 if let Ok(raw_debounce_ms) = std::env::var("WITH_WATCH_TEST_DEBOUNCE_MS") {
202 if let Ok(parsed) = raw_debounce_ms.parse::<u64>() {
203 options.debounce_window = Duration::from_millis(parsed);
204 }
205 }
206
207 options
208 }
209}
210
211pub fn run(plan: ExecutionPlan, options: RunnerOptions) -> Result<i32> {
212 let mut watch_loop = WatchLoop::new(&plan.inputs)?;
213 let mut baseline =
214 capture_snapshot_with_logging("initial-baseline", &plan.inputs, plan.detection_mode)?;
215 let mut child = Some(spawn_command(
219 &plan.delegated_command,
220 plan.output_refresh_mode,
221 )?);
222 let mut completed_runs = 0usize;
223 let mut pending_rerun = false;
224 let mut suppressed_self_change_snapshot = None::<SnapshotState>;
225
226 info!(
227 command_source = plan.source.as_str(),
228 detection_mode = plan.detection_mode.as_str(),
229 output_refresh_mode = plan.output_refresh_mode.as_str(),
230 input_count = plan.inputs.len(),
231 adapter_id = plan.metadata.adapter_field(),
232 fallback_used = plan.metadata.fallback_used,
233 default_watch_root_used = plan.metadata.default_watch_root_used,
234 filtered_output_count = plan.metadata.filtered_output_count,
235 side_effect_profile = plan.metadata.side_effect_profile.as_str(),
236 analysis_status = plan.metadata.status.as_str(),
237 initial_run_armed = true,
238 "Starting with-watch run loop"
239 );
240
241 loop {
242 if let Some(active_child) = child.as_mut() {
243 if let Some(status) =
244 active_child
245 .try_wait()
246 .map_err(|source| WithWatchError::Wait {
247 command: plan.delegated_command.display_name(),
248 source,
249 })?
250 {
251 completed_runs += 1;
252 let last_exit_code = exit_code_from_status(status);
253 let post_run_snapshot =
254 capture_snapshot_with_logging("post-run", &plan.inputs, plan.detection_mode)?;
255 let inputs_changed_since_baseline =
256 post_run_snapshot.is_meaningfully_different(&baseline, plan.detection_mode);
257 let additional_change_after_suppression = suppressed_self_change_snapshot
258 .as_ref()
259 .is_some_and(|snapshot| {
260 post_run_snapshot.is_meaningfully_different(snapshot, plan.detection_mode)
261 });
262 let should_rerun = if plan.metadata.side_effect_profile
263 == SideEffectProfile::WritesWatchedInputs
264 {
265 pending_rerun || additional_change_after_suppression
266 } else {
267 pending_rerun && inputs_changed_since_baseline
268 };
269
270 if pending_rerun
271 && plan.metadata.side_effect_profile == SideEffectProfile::WritesWatchedInputs
272 {
273 debug!(
274 rerun_queued = true,
275 side_effect_profile = plan.metadata.side_effect_profile.as_str(),
276 "Queued rerun after additional changes during self-mutating command \
277 activity"
278 );
279 } else if additional_change_after_suppression
280 && plan.metadata.side_effect_profile == SideEffectProfile::WritesWatchedInputs
281 {
282 debug!(
283 rerun_queued = true,
284 side_effect_profile = plan.metadata.side_effect_profile.as_str(),
285 "Queued rerun because post-run state diverged from the suppressed \
286 self-change snapshot"
287 );
288 } else if suppressed_self_change_snapshot.is_some()
289 && plan.metadata.side_effect_profile == SideEffectProfile::WritesWatchedInputs
290 {
291 debug!(
292 rerun_suppressed = true,
293 side_effect_profile = plan.metadata.side_effect_profile.as_str(),
294 "Suppressing rerun after self-mutating command activity"
295 );
296 }
297
298 baseline = post_run_snapshot;
299 pending_rerun = false;
300 suppressed_self_change_snapshot = None;
301 child = None;
302 write_test_run_marker(completed_runs);
303
304 info!(
305 completed_runs,
306 last_exit_code,
307 command_source = plan.source.as_str(),
308 rerun_queued = should_rerun,
309 "Delegated command finished"
310 );
311
312 if options
313 .max_runs
314 .is_some_and(|limit| completed_runs >= limit)
315 {
316 return Ok(last_exit_code);
317 }
318
319 if should_rerun {
320 child = Some(spawn_command(
321 &plan.delegated_command,
322 plan.output_refresh_mode,
323 )?);
324 continue;
325 }
326 }
327 }
328
329 if let Some(events) =
330 watch_loop.collect_events(options.poll_timeout, options.debounce_window)
331 {
332 handle_watch_events(&events);
333
334 let current_snapshot =
335 capture_snapshot_with_logging("event-rescan", &plan.inputs, plan.detection_mode)?;
336 let reference_snapshot = if child.is_some()
337 && plan.metadata.side_effect_profile == SideEffectProfile::WritesWatchedInputs
338 {
339 suppressed_self_change_snapshot
340 .as_ref()
341 .unwrap_or(&baseline)
342 } else {
343 &baseline
344 };
345
346 if current_snapshot.is_meaningfully_different(reference_snapshot, plan.detection_mode) {
347 debug!(
348 event_count = events.event_count,
349 path_count = events.path_count,
350 child_running = child.is_some(),
351 "Observed meaningful input changes"
352 );
353
354 if child.is_some() {
355 if plan.metadata.side_effect_profile == SideEffectProfile::WritesWatchedInputs
356 && suppressed_self_change_snapshot.is_none()
357 {
358 suppressed_self_change_snapshot = Some(current_snapshot);
359 debug!(
360 rerun_suppressed = true,
361 side_effect_profile = plan.metadata.side_effect_profile.as_str(),
362 "Suppressed the first in-run snapshot change for a self-mutating \
363 command"
364 );
365 } else {
366 pending_rerun = true;
367 }
368 } else {
369 baseline = current_snapshot;
370 child = Some(spawn_command(
371 &plan.delegated_command,
372 plan.output_refresh_mode,
373 )?);
374 }
375 } else if child.is_some() {
376 debug!(
377 rerun_suppressed = true,
378 "Ignored non-meaningful filesystem churn"
379 );
380 }
381 } else if child.is_none() {
382 thread::sleep(Duration::from_millis(10));
383 }
384 }
385}
386
387fn capture_snapshot_with_logging(
388 phase: &str,
389 inputs: &[WatchInput],
390 detection_mode: ChangeDetectionMode,
391) -> Result<SnapshotState> {
392 let snapshot_modes = summarize_snapshot_modes(inputs);
393 let started_at = Instant::now();
394 let snapshot = capture_snapshot(inputs, detection_mode)?;
395
396 debug!(
397 phase,
398 detection_mode = detection_mode.as_str(),
399 snapshot_modes,
400 input_count = inputs.len(),
401 snapshot_entry_count = snapshot.len(),
402 elapsed_ms = started_at.elapsed().as_millis() as u64,
403 "Captured input snapshot"
404 );
405
406 Ok(snapshot)
407}
408
409fn summarize_snapshot_modes(inputs: &[WatchInput]) -> String {
410 let mut modes = Vec::new();
411 for input in inputs {
412 let snapshot_mode = input.snapshot_mode_label();
413 if !modes.contains(&snapshot_mode) {
414 modes.push(snapshot_mode);
415 }
416 }
417
418 modes.join(",")
419}
420
421fn handle_watch_events(events: &CollectedEvents) {
422 if events.error_count > 0 {
423 warn!(
424 error_count = events.error_count,
425 event_count = events.event_count,
426 path_count = events.path_count,
427 "Watcher reported recoverable errors; forcing a rescan"
428 );
429 } else {
430 debug!(
431 event_count = events.event_count,
432 path_count = events.path_count,
433 "Collected filesystem events"
434 );
435 }
436}
437
438fn spawn_command(
439 command: &DelegatedCommand,
440 output_refresh_mode: OutputRefreshMode,
441) -> Result<Child> {
442 prepare_output_for_run(output_refresh_mode)?;
443 log_delegated_command_spawn(command);
444 match command {
445 DelegatedCommand::Argv(argv) => spawn_argv(argv),
446 DelegatedCommand::Shell(expression) => spawn_shell(expression),
447 }
448}
449
450fn prepare_output_for_run(output_refresh_mode: OutputRefreshMode) -> Result<()> {
451 let mut stdout = std::io::stdout();
452 let stdout_is_terminal = stdout.is_terminal();
453 let terminal_cleared =
454 refresh_output_before_run(output_refresh_mode, stdout_is_terminal, &mut stdout)
455 .map_err(WithWatchError::StdoutRefresh)?;
456
457 debug!(
458 output_refresh_mode = output_refresh_mode.as_str(),
459 stdout_is_terminal, terminal_cleared, "Prepared stdout for delegated command"
460 );
461
462 Ok(())
463}
464
465fn refresh_output_before_run<W: Write>(
466 output_refresh_mode: OutputRefreshMode,
467 stdout_is_terminal: bool,
468 output: &mut W,
469) -> io::Result<bool> {
470 if output_refresh_mode != OutputRefreshMode::ClearTerminal || !stdout_is_terminal {
471 return Ok(false);
472 }
473
474 output.write_all(CLEAR_TERMINAL_SEQUENCE.as_bytes())?;
475 output.flush()?;
476 Ok(true)
477}
478
479fn log_delegated_command_spawn(command: &DelegatedCommand) {
480 let summary = command.spawn_log_summary();
481 info!(
482 execution_kind = summary.execution_kind,
483 program = summary.program_name,
484 arg_count = summary.arg_count,
485 "Spawning delegated command"
486 );
487}
488
489fn spawn_argv(argv: &[OsString]) -> Result<Child> {
490 let program = argv
491 .first()
492 .cloned()
493 .ok_or(WithWatchError::MissingCommand)?;
494
495 Command::new(&program)
496 .args(argv.iter().skip(1))
497 .stdin(Stdio::inherit())
498 .stdout(Stdio::inherit())
499 .stderr(Stdio::inherit())
500 .spawn()
501 .map_err(|source| WithWatchError::Spawn {
502 command: program.to_string_lossy().into_owned(),
503 source,
504 })
505}
506
507fn spawn_shell(expression: &str) -> Result<Child> {
508 #[cfg(not(unix))]
509 {
510 let _ = expression;
511 Err(WithWatchError::UnsupportedShellPlatform)
512 }
513
514 #[cfg(unix)]
515 {
516 Command::new("/bin/sh")
517 .arg("-c")
518 .arg(expression)
519 .stdin(Stdio::inherit())
520 .stdout(Stdio::inherit())
521 .stderr(Stdio::inherit())
522 .spawn()
523 .map_err(|source| WithWatchError::Spawn {
524 command: expression.to_string(),
525 source,
526 })
527 }
528}
529
530fn program_name(program: &OsString) -> String {
531 std::path::Path::new(program)
532 .file_name()
533 .unwrap_or(program.as_os_str())
534 .to_string_lossy()
535 .into_owned()
536}
537
538fn exit_code_from_status(status: ExitStatus) -> i32 {
539 status.code().unwrap_or(1)
540}
541
542fn write_test_run_marker(completed_runs: usize) {
543 let Ok(marker_dir) = std::env::var(WITH_WATCH_TEST_RUN_MARKER_DIR_ENV) else {
544 return;
545 };
546
547 let marker_path = PathBuf::from(marker_dir).join(format!("run-{completed_runs}.done"));
548 if let Some(parent) = marker_path.parent() {
549 if let Err(error) = fs::create_dir_all(parent) {
550 warn!(
551 path = parent.display().to_string(),
552 %error,
553 "Failed to create test run marker directory"
554 );
555 return;
556 }
557 }
558
559 if let Err(error) = fs::write(&marker_path, completed_runs.to_string()) {
560 warn!(
561 path = marker_path.display().to_string(),
562 %error,
563 "Failed to write test run marker"
564 );
565 }
566}
567
568#[cfg(test)]
569mod tests {
570 use std::{
571 ffi::OsString,
572 io::{self, Write},
573 sync::{Arc, Mutex},
574 };
575
576 use tracing::Level;
577
578 use super::{
579 log_delegated_command_spawn, refresh_output_before_run, DelegatedCommand,
580 OutputRefreshMode, CLEAR_TERMINAL_SEQUENCE,
581 };
582
583 #[test]
584 fn argv_spawn_logging_omits_argument_values() {
585 let output = capture_logs(|| {
586 log_delegated_command_spawn(&DelegatedCommand::Argv(vec![
587 OsString::from("env"),
588 OsString::from("TOKEN=secret"),
589 OsString::from("cmd"),
590 ]));
591 });
592
593 assert!(output.contains("execution_kind=\"argv\""));
594 assert!(output.contains("program=\"env\""));
595 assert!(output.contains("arg_count=2"));
596 assert!(!output.contains("TOKEN=secret"));
597 assert!(!output.contains("cmd"));
598 }
599
600 #[test]
601 fn shell_spawn_logging_omits_expression_text() {
602 let output = capture_logs(|| {
603 log_delegated_command_spawn(&DelegatedCommand::Shell(
604 "TOKEN=secret grep -f patterns.txt file.txt".to_string(),
605 ));
606 });
607
608 assert!(output.contains("execution_kind=\"shell\""));
609 assert!(output.contains("program=\"sh\""));
610 assert!(output.contains("arg_count=2"));
611 assert!(!output.contains("TOKEN=secret"));
612 assert!(!output.contains("patterns.txt"));
613 }
614
615 #[test]
616 fn clear_refresh_mode_writes_escape_sequence_and_flushes_for_terminals() {
617 let mut writer = FlushTrackingWriter::default();
618
619 let cleared =
620 refresh_output_before_run(OutputRefreshMode::ClearTerminal, true, &mut writer)
621 .expect("clear terminal output");
622
623 assert!(cleared);
624 assert_eq!(writer.buffer, CLEAR_TERMINAL_SEQUENCE.as_bytes());
625 assert_eq!(writer.flush_count, 1);
626 }
627
628 #[test]
629 fn preserve_refresh_mode_does_not_write_escape_sequence() {
630 let mut writer = FlushTrackingWriter::default();
631
632 let cleared = refresh_output_before_run(OutputRefreshMode::Preserve, true, &mut writer)
633 .expect("skip refresh");
634
635 assert!(!cleared);
636 assert!(writer.buffer.is_empty());
637 assert_eq!(writer.flush_count, 0);
638 }
639
640 #[test]
641 fn clear_refresh_mode_skips_non_terminal_outputs() {
642 let mut writer = FlushTrackingWriter::default();
643
644 let cleared =
645 refresh_output_before_run(OutputRefreshMode::ClearTerminal, false, &mut writer)
646 .expect("skip non-terminal refresh");
647
648 assert!(!cleared);
649 assert!(writer.buffer.is_empty());
650 assert_eq!(writer.flush_count, 0);
651 }
652
653 fn capture_logs(callback: impl FnOnce()) -> String {
654 let buffer = Arc::new(Mutex::new(Vec::new()));
655 let writer = SharedWriter(buffer.clone());
656 let subscriber = tracing_subscriber::fmt()
657 .with_ansi(false)
658 .with_target(false)
659 .with_level(false)
660 .without_time()
661 .with_max_level(Level::INFO)
662 .with_writer(move || writer.clone())
663 .finish();
664
665 tracing::subscriber::with_default(subscriber, callback);
666
667 let output = buffer.lock().expect("lock buffer").clone();
668 String::from_utf8(output).expect("utf8 log output")
669 }
670
671 #[derive(Clone)]
672 struct SharedWriter(Arc<Mutex<Vec<u8>>>);
673
674 impl Write for SharedWriter {
675 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
676 self.0
677 .lock()
678 .expect("lock log buffer")
679 .extend_from_slice(buf);
680 Ok(buf.len())
681 }
682
683 fn flush(&mut self) -> io::Result<()> {
684 Ok(())
685 }
686 }
687
688 #[derive(Default)]
689 struct FlushTrackingWriter {
690 buffer: Vec<u8>,
691 flush_count: usize,
692 }
693
694 impl Write for FlushTrackingWriter {
695 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
696 self.buffer.extend_from_slice(buf);
697 Ok(buf.len())
698 }
699
700 fn flush(&mut self) -> io::Result<()> {
701 self.flush_count += 1;
702 Ok(())
703 }
704 }
705}