Skip to main content

with_watch/
runner.rs

1use std::{
2    ffi::OsString,
3    fs,
4    io::{self, IsTerminal, Write},
5    path::PathBuf,
6    process::{Child, Command, ExitStatus, Stdio},
7    thread,
8    time::{Duration, Instant},
9};
10
11use tracing::{debug, info, warn};
12
13use crate::{
14    analysis::{CommandAdapterId, CommandAnalysisStatus, SideEffectProfile},
15    error::{Result, WithWatchError},
16    snapshot::{capture_snapshot, ChangeDetectionMode, CommandSource, SnapshotState, WatchInput},
17    watch::{CollectedEvents, WatchLoop},
18};
19
20const DEFAULT_POLL_TIMEOUT: Duration = Duration::from_millis(50);
21const DEFAULT_DEBOUNCE_WINDOW: Duration = Duration::from_millis(200);
22const CLEAR_TERMINAL_SEQUENCE: &str = "\x1b[2J\x1b[H";
23const WITH_WATCH_TEST_RUN_MARKER_DIR_ENV: &str = "WITH_WATCH_TEST_RUN_MARKER_DIR";
24
25#[derive(Debug, Clone, Copy, PartialEq, Eq)]
26pub enum OutputRefreshMode {
27    Preserve,
28    ClearTerminal,
29}
30
31impl OutputRefreshMode {
32    pub fn as_str(self) -> &'static str {
33        match self {
34            Self::Preserve => "preserve",
35            Self::ClearTerminal => "clear-terminal",
36        }
37    }
38}
39
40#[derive(Debug, Clone)]
41pub struct ExecutionPlan {
42    pub source: CommandSource,
43    pub detection_mode: ChangeDetectionMode,
44    pub output_refresh_mode: OutputRefreshMode,
45    pub inputs: Vec<WatchInput>,
46    pub delegated_command: DelegatedCommand,
47    pub metadata: ExecutionMetadata,
48}
49
50impl ExecutionPlan {
51    pub fn passthrough(
52        argv: Vec<OsString>,
53        inputs: Vec<WatchInput>,
54        detection_mode: ChangeDetectionMode,
55        output_refresh_mode: OutputRefreshMode,
56        metadata: ExecutionMetadata,
57    ) -> Self {
58        Self {
59            source: CommandSource::Argv,
60            detection_mode,
61            output_refresh_mode,
62            inputs,
63            delegated_command: DelegatedCommand::Argv(argv),
64            metadata,
65        }
66    }
67
68    pub fn shell(
69        expression: String,
70        inputs: Vec<WatchInput>,
71        detection_mode: ChangeDetectionMode,
72        output_refresh_mode: OutputRefreshMode,
73        metadata: ExecutionMetadata,
74    ) -> Self {
75        Self {
76            source: CommandSource::Shell,
77            detection_mode,
78            output_refresh_mode,
79            inputs,
80            delegated_command: DelegatedCommand::Shell(expression),
81            metadata,
82        }
83    }
84
85    pub fn exec(
86        argv: Vec<OsString>,
87        inputs: Vec<WatchInput>,
88        detection_mode: ChangeDetectionMode,
89        output_refresh_mode: OutputRefreshMode,
90        metadata: ExecutionMetadata,
91    ) -> Self {
92        Self {
93            source: CommandSource::Exec,
94            detection_mode,
95            output_refresh_mode,
96            inputs,
97            delegated_command: DelegatedCommand::Argv(argv),
98            metadata,
99        }
100    }
101}
102
103#[derive(Debug, Clone)]
104pub struct ExecutionMetadata {
105    pub adapter_ids: Vec<CommandAdapterId>,
106    pub fallback_used: bool,
107    pub default_watch_root_used: bool,
108    pub filtered_output_count: usize,
109    pub side_effect_profile: SideEffectProfile,
110    pub status: CommandAnalysisStatus,
111}
112
113impl ExecutionMetadata {
114    pub fn adapter_field(&self) -> String {
115        self.adapter_ids
116            .iter()
117            .map(|adapter| adapter.as_str())
118            .collect::<Vec<_>>()
119            .join(",")
120    }
121}
122
123#[derive(Debug, Clone)]
124pub enum DelegatedCommand {
125    Argv(Vec<OsString>),
126    Shell(String),
127}
128
129impl DelegatedCommand {
130    fn spawn_log_summary(&self) -> DelegatedCommandLogSummary {
131        match self {
132            Self::Argv(argv) => {
133                let program_name = argv
134                    .first()
135                    .map(program_name)
136                    .unwrap_or_else(|| "<missing>".to_string());
137                DelegatedCommandLogSummary {
138                    execution_kind: "argv",
139                    program_name,
140                    arg_count: argv.len().saturating_sub(1),
141                }
142            }
143            Self::Shell(_) => DelegatedCommandLogSummary {
144                execution_kind: "shell",
145                program_name: "sh".to_string(),
146                arg_count: 2,
147            },
148        }
149    }
150
151    fn display_name(&self) -> String {
152        match self {
153            Self::Argv(argv) => argv
154                .iter()
155                .map(|value| value.to_string_lossy().into_owned())
156                .collect::<Vec<_>>()
157                .join(" "),
158            Self::Shell(expression) => expression.clone(),
159        }
160    }
161}
162
163#[derive(Debug, Clone, PartialEq, Eq)]
164struct DelegatedCommandLogSummary {
165    execution_kind: &'static str,
166    program_name: String,
167    arg_count: usize,
168}
169
170#[derive(Debug, Clone, Copy)]
171pub struct RunnerOptions {
172    pub debounce_window: Duration,
173    pub poll_timeout: Duration,
174    pub max_runs: Option<usize>,
175}
176
177impl Default for RunnerOptions {
178    fn default() -> Self {
179        Self {
180            debounce_window: DEFAULT_DEBOUNCE_WINDOW,
181            poll_timeout: DEFAULT_POLL_TIMEOUT,
182            max_runs: None,
183        }
184    }
185}
186
187impl RunnerOptions {
188    pub fn from_environment() -> Self {
189        let mut options = Self::default();
190
191        // Test-only hooks for deterministic integration coverage. They keep the public
192        // CLI surface stable while allowing `cargo test` to stop the
193        // long-running watch loop and shorten debounce windows. Remove them
194        // when we have a better end-to-end harness.
195        if let Ok(raw_max_runs) = std::env::var("WITH_WATCH_TEST_MAX_RUNS") {
196            if let Ok(parsed) = raw_max_runs.parse::<usize>() {
197                options.max_runs = Some(parsed);
198            }
199        }
200
201        if let Ok(raw_debounce_ms) = std::env::var("WITH_WATCH_TEST_DEBOUNCE_MS") {
202            if let Ok(parsed) = raw_debounce_ms.parse::<u64>() {
203                options.debounce_window = Duration::from_millis(parsed);
204            }
205        }
206
207        options
208    }
209}
210
211pub fn run(plan: ExecutionPlan, options: RunnerOptions) -> Result<i32> {
212    let mut watch_loop = WatchLoop::new(&plan.inputs)?;
213    let mut baseline =
214        capture_snapshot_with_logging("initial-baseline", &plan.inputs, plan.detection_mode)?;
215    // v1 contract: after inference, watcher setup, and baseline capture succeed,
216    // the delegated command must run immediately once before waiting for any
217    // filesystem change events.
218    let mut child = Some(spawn_command(
219        &plan.delegated_command,
220        plan.output_refresh_mode,
221    )?);
222    let mut completed_runs = 0usize;
223    let mut pending_rerun = false;
224    let mut suppressed_self_change_snapshot = None::<SnapshotState>;
225
226    info!(
227        command_source = plan.source.as_str(),
228        detection_mode = plan.detection_mode.as_str(),
229        output_refresh_mode = plan.output_refresh_mode.as_str(),
230        input_count = plan.inputs.len(),
231        adapter_id = plan.metadata.adapter_field(),
232        fallback_used = plan.metadata.fallback_used,
233        default_watch_root_used = plan.metadata.default_watch_root_used,
234        filtered_output_count = plan.metadata.filtered_output_count,
235        side_effect_profile = plan.metadata.side_effect_profile.as_str(),
236        analysis_status = plan.metadata.status.as_str(),
237        initial_run_armed = true,
238        "Starting with-watch run loop"
239    );
240
241    loop {
242        if let Some(active_child) = child.as_mut() {
243            if let Some(status) =
244                active_child
245                    .try_wait()
246                    .map_err(|source| WithWatchError::Wait {
247                        command: plan.delegated_command.display_name(),
248                        source,
249                    })?
250            {
251                completed_runs += 1;
252                let last_exit_code = exit_code_from_status(status);
253                let post_run_snapshot =
254                    capture_snapshot_with_logging("post-run", &plan.inputs, plan.detection_mode)?;
255                let inputs_changed_since_baseline =
256                    post_run_snapshot.is_meaningfully_different(&baseline, plan.detection_mode);
257                let additional_change_after_suppression = suppressed_self_change_snapshot
258                    .as_ref()
259                    .is_some_and(|snapshot| {
260                        post_run_snapshot.is_meaningfully_different(snapshot, plan.detection_mode)
261                    });
262                let should_rerun = if plan.metadata.side_effect_profile
263                    == SideEffectProfile::WritesWatchedInputs
264                {
265                    pending_rerun || additional_change_after_suppression
266                } else {
267                    pending_rerun && inputs_changed_since_baseline
268                };
269
270                if pending_rerun
271                    && plan.metadata.side_effect_profile == SideEffectProfile::WritesWatchedInputs
272                {
273                    debug!(
274                        rerun_queued = true,
275                        side_effect_profile = plan.metadata.side_effect_profile.as_str(),
276                        "Queued rerun after additional changes during self-mutating command \
277                         activity"
278                    );
279                } else if additional_change_after_suppression
280                    && plan.metadata.side_effect_profile == SideEffectProfile::WritesWatchedInputs
281                {
282                    debug!(
283                        rerun_queued = true,
284                        side_effect_profile = plan.metadata.side_effect_profile.as_str(),
285                        "Queued rerun because post-run state diverged from the suppressed \
286                         self-change snapshot"
287                    );
288                } else if suppressed_self_change_snapshot.is_some()
289                    && plan.metadata.side_effect_profile == SideEffectProfile::WritesWatchedInputs
290                {
291                    debug!(
292                        rerun_suppressed = true,
293                        side_effect_profile = plan.metadata.side_effect_profile.as_str(),
294                        "Suppressing rerun after self-mutating command activity"
295                    );
296                }
297
298                baseline = post_run_snapshot;
299                pending_rerun = false;
300                suppressed_self_change_snapshot = None;
301                child = None;
302                write_test_run_marker(completed_runs);
303
304                info!(
305                    completed_runs,
306                    last_exit_code,
307                    command_source = plan.source.as_str(),
308                    rerun_queued = should_rerun,
309                    "Delegated command finished"
310                );
311
312                if options
313                    .max_runs
314                    .is_some_and(|limit| completed_runs >= limit)
315                {
316                    return Ok(last_exit_code);
317                }
318
319                if should_rerun {
320                    child = Some(spawn_command(
321                        &plan.delegated_command,
322                        plan.output_refresh_mode,
323                    )?);
324                    continue;
325                }
326            }
327        }
328
329        if let Some(events) =
330            watch_loop.collect_events(options.poll_timeout, options.debounce_window)
331        {
332            handle_watch_events(&events);
333
334            let current_snapshot =
335                capture_snapshot_with_logging("event-rescan", &plan.inputs, plan.detection_mode)?;
336            let reference_snapshot = if child.is_some()
337                && plan.metadata.side_effect_profile == SideEffectProfile::WritesWatchedInputs
338            {
339                suppressed_self_change_snapshot
340                    .as_ref()
341                    .unwrap_or(&baseline)
342            } else {
343                &baseline
344            };
345
346            if current_snapshot.is_meaningfully_different(reference_snapshot, plan.detection_mode) {
347                debug!(
348                    event_count = events.event_count,
349                    path_count = events.path_count,
350                    child_running = child.is_some(),
351                    "Observed meaningful input changes"
352                );
353
354                if child.is_some() {
355                    if plan.metadata.side_effect_profile == SideEffectProfile::WritesWatchedInputs
356                        && suppressed_self_change_snapshot.is_none()
357                    {
358                        suppressed_self_change_snapshot = Some(current_snapshot);
359                        debug!(
360                            rerun_suppressed = true,
361                            side_effect_profile = plan.metadata.side_effect_profile.as_str(),
362                            "Suppressed the first in-run snapshot change for a self-mutating \
363                             command"
364                        );
365                    } else {
366                        pending_rerun = true;
367                    }
368                } else {
369                    baseline = current_snapshot;
370                    child = Some(spawn_command(
371                        &plan.delegated_command,
372                        plan.output_refresh_mode,
373                    )?);
374                }
375            } else if child.is_some() {
376                debug!(
377                    rerun_suppressed = true,
378                    "Ignored non-meaningful filesystem churn"
379                );
380            }
381        } else if child.is_none() {
382            thread::sleep(Duration::from_millis(10));
383        }
384    }
385}
386
387fn capture_snapshot_with_logging(
388    phase: &str,
389    inputs: &[WatchInput],
390    detection_mode: ChangeDetectionMode,
391) -> Result<SnapshotState> {
392    let snapshot_modes = summarize_snapshot_modes(inputs);
393    let started_at = Instant::now();
394    let snapshot = capture_snapshot(inputs, detection_mode)?;
395
396    debug!(
397        phase,
398        detection_mode = detection_mode.as_str(),
399        snapshot_modes,
400        input_count = inputs.len(),
401        snapshot_entry_count = snapshot.len(),
402        elapsed_ms = started_at.elapsed().as_millis() as u64,
403        "Captured input snapshot"
404    );
405
406    Ok(snapshot)
407}
408
409fn summarize_snapshot_modes(inputs: &[WatchInput]) -> String {
410    let mut modes = Vec::new();
411    for input in inputs {
412        let snapshot_mode = input.snapshot_mode_label();
413        if !modes.contains(&snapshot_mode) {
414            modes.push(snapshot_mode);
415        }
416    }
417
418    modes.join(",")
419}
420
421fn handle_watch_events(events: &CollectedEvents) {
422    if events.error_count > 0 {
423        warn!(
424            error_count = events.error_count,
425            event_count = events.event_count,
426            path_count = events.path_count,
427            "Watcher reported recoverable errors; forcing a rescan"
428        );
429    } else {
430        debug!(
431            event_count = events.event_count,
432            path_count = events.path_count,
433            "Collected filesystem events"
434        );
435    }
436}
437
438fn spawn_command(
439    command: &DelegatedCommand,
440    output_refresh_mode: OutputRefreshMode,
441) -> Result<Child> {
442    prepare_output_for_run(output_refresh_mode)?;
443    log_delegated_command_spawn(command);
444    match command {
445        DelegatedCommand::Argv(argv) => spawn_argv(argv),
446        DelegatedCommand::Shell(expression) => spawn_shell(expression),
447    }
448}
449
450fn prepare_output_for_run(output_refresh_mode: OutputRefreshMode) -> Result<()> {
451    let mut stdout = std::io::stdout();
452    let stdout_is_terminal = stdout.is_terminal();
453    let terminal_cleared =
454        refresh_output_before_run(output_refresh_mode, stdout_is_terminal, &mut stdout)
455            .map_err(WithWatchError::StdoutRefresh)?;
456
457    debug!(
458        output_refresh_mode = output_refresh_mode.as_str(),
459        stdout_is_terminal, terminal_cleared, "Prepared stdout for delegated command"
460    );
461
462    Ok(())
463}
464
465fn refresh_output_before_run<W: Write>(
466    output_refresh_mode: OutputRefreshMode,
467    stdout_is_terminal: bool,
468    output: &mut W,
469) -> io::Result<bool> {
470    if output_refresh_mode != OutputRefreshMode::ClearTerminal || !stdout_is_terminal {
471        return Ok(false);
472    }
473
474    output.write_all(CLEAR_TERMINAL_SEQUENCE.as_bytes())?;
475    output.flush()?;
476    Ok(true)
477}
478
479fn log_delegated_command_spawn(command: &DelegatedCommand) {
480    let summary = command.spawn_log_summary();
481    info!(
482        execution_kind = summary.execution_kind,
483        program = summary.program_name,
484        arg_count = summary.arg_count,
485        "Spawning delegated command"
486    );
487}
488
489fn spawn_argv(argv: &[OsString]) -> Result<Child> {
490    let program = argv
491        .first()
492        .cloned()
493        .ok_or(WithWatchError::MissingCommand)?;
494
495    Command::new(&program)
496        .args(argv.iter().skip(1))
497        .stdin(Stdio::inherit())
498        .stdout(Stdio::inherit())
499        .stderr(Stdio::inherit())
500        .spawn()
501        .map_err(|source| WithWatchError::Spawn {
502            command: program.to_string_lossy().into_owned(),
503            source,
504        })
505}
506
507fn spawn_shell(expression: &str) -> Result<Child> {
508    #[cfg(not(unix))]
509    {
510        let _ = expression;
511        Err(WithWatchError::UnsupportedShellPlatform)
512    }
513
514    #[cfg(unix)]
515    {
516        Command::new("/bin/sh")
517            .arg("-c")
518            .arg(expression)
519            .stdin(Stdio::inherit())
520            .stdout(Stdio::inherit())
521            .stderr(Stdio::inherit())
522            .spawn()
523            .map_err(|source| WithWatchError::Spawn {
524                command: expression.to_string(),
525                source,
526            })
527    }
528}
529
530fn program_name(program: &OsString) -> String {
531    std::path::Path::new(program)
532        .file_name()
533        .unwrap_or(program.as_os_str())
534        .to_string_lossy()
535        .into_owned()
536}
537
538fn exit_code_from_status(status: ExitStatus) -> i32 {
539    status.code().unwrap_or(1)
540}
541
542fn write_test_run_marker(completed_runs: usize) {
543    let Ok(marker_dir) = std::env::var(WITH_WATCH_TEST_RUN_MARKER_DIR_ENV) else {
544        return;
545    };
546
547    let marker_path = PathBuf::from(marker_dir).join(format!("run-{completed_runs}.done"));
548    if let Some(parent) = marker_path.parent() {
549        if let Err(error) = fs::create_dir_all(parent) {
550            warn!(
551                path = parent.display().to_string(),
552                %error,
553                "Failed to create test run marker directory"
554            );
555            return;
556        }
557    }
558
559    if let Err(error) = fs::write(&marker_path, completed_runs.to_string()) {
560        warn!(
561            path = marker_path.display().to_string(),
562            %error,
563            "Failed to write test run marker"
564        );
565    }
566}
567
568#[cfg(test)]
569mod tests {
570    use std::{
571        ffi::OsString,
572        io::{self, Write},
573        sync::{Arc, Mutex},
574    };
575
576    use tracing::Level;
577
578    use super::{
579        log_delegated_command_spawn, refresh_output_before_run, DelegatedCommand,
580        OutputRefreshMode, CLEAR_TERMINAL_SEQUENCE,
581    };
582
583    #[test]
584    fn argv_spawn_logging_omits_argument_values() {
585        let output = capture_logs(|| {
586            log_delegated_command_spawn(&DelegatedCommand::Argv(vec![
587                OsString::from("env"),
588                OsString::from("TOKEN=secret"),
589                OsString::from("cmd"),
590            ]));
591        });
592
593        assert!(output.contains("execution_kind=\"argv\""));
594        assert!(output.contains("program=\"env\""));
595        assert!(output.contains("arg_count=2"));
596        assert!(!output.contains("TOKEN=secret"));
597        assert!(!output.contains("cmd"));
598    }
599
600    #[test]
601    fn shell_spawn_logging_omits_expression_text() {
602        let output = capture_logs(|| {
603            log_delegated_command_spawn(&DelegatedCommand::Shell(
604                "TOKEN=secret grep -f patterns.txt file.txt".to_string(),
605            ));
606        });
607
608        assert!(output.contains("execution_kind=\"shell\""));
609        assert!(output.contains("program=\"sh\""));
610        assert!(output.contains("arg_count=2"));
611        assert!(!output.contains("TOKEN=secret"));
612        assert!(!output.contains("patterns.txt"));
613    }
614
615    #[test]
616    fn clear_refresh_mode_writes_escape_sequence_and_flushes_for_terminals() {
617        let mut writer = FlushTrackingWriter::default();
618
619        let cleared =
620            refresh_output_before_run(OutputRefreshMode::ClearTerminal, true, &mut writer)
621                .expect("clear terminal output");
622
623        assert!(cleared);
624        assert_eq!(writer.buffer, CLEAR_TERMINAL_SEQUENCE.as_bytes());
625        assert_eq!(writer.flush_count, 1);
626    }
627
628    #[test]
629    fn preserve_refresh_mode_does_not_write_escape_sequence() {
630        let mut writer = FlushTrackingWriter::default();
631
632        let cleared = refresh_output_before_run(OutputRefreshMode::Preserve, true, &mut writer)
633            .expect("skip refresh");
634
635        assert!(!cleared);
636        assert!(writer.buffer.is_empty());
637        assert_eq!(writer.flush_count, 0);
638    }
639
640    #[test]
641    fn clear_refresh_mode_skips_non_terminal_outputs() {
642        let mut writer = FlushTrackingWriter::default();
643
644        let cleared =
645            refresh_output_before_run(OutputRefreshMode::ClearTerminal, false, &mut writer)
646                .expect("skip non-terminal refresh");
647
648        assert!(!cleared);
649        assert!(writer.buffer.is_empty());
650        assert_eq!(writer.flush_count, 0);
651    }
652
653    fn capture_logs(callback: impl FnOnce()) -> String {
654        let buffer = Arc::new(Mutex::new(Vec::new()));
655        let writer = SharedWriter(buffer.clone());
656        let subscriber = tracing_subscriber::fmt()
657            .with_ansi(false)
658            .with_target(false)
659            .with_level(false)
660            .without_time()
661            .with_max_level(Level::INFO)
662            .with_writer(move || writer.clone())
663            .finish();
664
665        tracing::subscriber::with_default(subscriber, callback);
666
667        let output = buffer.lock().expect("lock buffer").clone();
668        String::from_utf8(output).expect("utf8 log output")
669    }
670
671    #[derive(Clone)]
672    struct SharedWriter(Arc<Mutex<Vec<u8>>>);
673
674    impl Write for SharedWriter {
675        fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
676            self.0
677                .lock()
678                .expect("lock log buffer")
679                .extend_from_slice(buf);
680            Ok(buf.len())
681        }
682
683        fn flush(&mut self) -> io::Result<()> {
684            Ok(())
685        }
686    }
687
688    #[derive(Default)]
689    struct FlushTrackingWriter {
690        buffer: Vec<u8>,
691        flush_count: usize,
692    }
693
694    impl Write for FlushTrackingWriter {
695        fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
696            self.buffer.extend_from_slice(buf);
697            Ok(buf.len())
698        }
699
700        fn flush(&mut self) -> io::Result<()> {
701            self.flush_count += 1;
702            Ok(())
703        }
704    }
705}