processkit 0.8.0

Child-process management: kill-on-drop process trees and async run-and-capture
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
//! Test doubles for the [`ProcessRunner`] seam — no real subprocess required.
//!
//! - [`ScriptedRunner`] returns canned output for commands matched by argument
//!   prefix or a predicate.
//! - [`RecordingRunner`] wraps another runner and records every [`Invocation`]
//!   so tests can assert exactly what was run.
//!
//! Behind the `mock` feature, [`mockall`] additionally generates a `MockRunner`
//! (re-exported from the crate root) for expectation-style mocking.
//!
//! The seam covers **both shapes of a run**. The bulk verbs (`output` and the
//! helpers over it) replay canned results — and feed the command's
//! `on_stdout_line`/`on_stderr_line` handlers, so progress-reporting paths
//! test hermetically. A scripted [`start`](crate::ProcessRunner::start) hands
//! back a live [`RunningProcess`](crate::RunningProcess) whose canned output
//! flows through the **same pump machinery** as a real child: `stdout_lines`,
//! `wait_for_line`/`wait_for`, and `finish_streamed` all behave identically
//! (per-line pacing via [`Reply::with_line_delay`]). Scripted handles have no
//! OS identity (`pid()` is `None`), don't compose into a real
//! [`Pipeline`](crate::Pipeline), and don't model interactive stdin.
//!
//! Instant replies never observe a `cancel_on` token (they resolve before any
//! cancellation could race them). To exercise cancellation **behaviour** — a
//! call that genuinely blocks until its token fires — script
//! [`Reply::pending`] (`cancellation` feature): it parks the call (or never
//! "exits", on `start`) until the command's token — per-command or
//! [`CliClient::default_cancel_on`](crate::CliClient::default_cancel_on) —
//! cancels, then resolves with `Err(Error::Cancelled { .. })`, mirroring the
//! live contract.

use std::ffi::{OsStr, OsString};
use std::sync::Mutex;

use crate::command::Command;
use crate::error::Result;
use crate::result::ProcessResult;
use crate::runner::ProcessRunner;

/// A canned reply: stdout/stderr text plus an exit code (or a timed-out run,
/// or a parked-until-cancelled call).
#[derive(Debug, Clone)]
pub struct Reply {
    stdout: String,
    stderr: String,
    code: i32,
    timed_out: bool,
    /// Park the call until the command's cancellation token fires (see
    /// [`pending`](Self::pending)); the other fields are unused then.
    pending: bool,
    /// On a scripted `start`, sleep this long before each stdout line (see
    /// [`with_line_delay`](Self::with_line_delay)). Bulk `output` ignores it.
    line_delay: Option<std::time::Duration>,
}

impl Reply {
    /// A successful reply (exit code 0) producing `stdout`.
    pub fn ok(stdout: impl Into<String>) -> Self {
        Self {
            stdout: stdout.into(),
            stderr: String::new(),
            code: 0,
            timed_out: false,
            pending: false,
            line_delay: None,
        }
    }

    /// A failing reply with exit `code` and `stderr` text.
    pub fn fail(code: i32, stderr: impl Into<String>) -> Self {
        Self {
            stdout: String::new(),
            stderr: stderr.into(),
            code,
            timed_out: false,
            pending: false,
            line_delay: None,
        }
    }

    /// A timed-out reply — drives the timeout path so a test can assert that a
    /// command which exceeds its deadline surfaces as [`Error::Timeout`](crate::Error::Timeout).
    pub fn timeout() -> Self {
        Self {
            stdout: String::new(),
            stderr: String::new(),
            // Unused — a timed-out result carries no code; `timed_out` is what
            // the helpers key on.
            code: 0,
            timed_out: true,
            pending: false,
            line_delay: None,
        }
    }

    /// A reply that **parks the call until its cancellation token fires**,
    /// then resolves with [`Error::Cancelled`](crate::Error::Cancelled) naming
    /// the program — the hermetic mirror of cancelling a live long-runner, for
    /// testing that an orchestration genuinely cancels (and cleans up), not
    /// just that it formats a canned error.
    ///
    /// The token is the matched command's — set per command
    /// ([`Command::cancel_on`]) or client-wide
    /// ([`CliClient::default_cancel_on`](crate::CliClient::default_cancel_on)).
    /// A pending reply for a command **without** a token parks forever, like a
    /// hung child nobody can cancel — deliberate; pair it with a token (or a
    /// test timeout) by design.
    #[cfg(feature = "cancellation")]
    pub fn pending() -> Self {
        Self {
            stdout: String::new(),
            stderr: String::new(),
            code: 0,
            timed_out: false,
            pending: true,
            line_delay: None,
        }
    }

    /// A successful reply whose stdout is `lines` joined with `\n` — reads
    /// naturally for scripted **streaming** (`start` → `stdout_lines` yields
    /// exactly these lines), and is equivalent to [`ok`](Self::ok) with the
    /// joined text for the bulk path.
    pub fn lines<I, S>(lines: I) -> Self
    where
        I: IntoIterator<Item = S>,
        S: Into<String>,
    {
        let mut text = lines
            .into_iter()
            .map(Into::into)
            .collect::<Vec<_>>()
            .join("\n");
        if !text.is_empty() {
            text.push('\n');
        }
        Self::ok(text)
    }

    /// On a scripted `start`, sleep `delay` before each stdout line — so a
    /// hermetic streaming test can observe genuinely incremental delivery
    /// (deterministic under `#[tokio::test(start_paused = true)]`). The
    /// scripted run "exits" after the last line. Ignored by the bulk `output`
    /// path.
    pub fn with_line_delay(mut self, delay: std::time::Duration) -> Self {
        self.line_delay = Some(delay);
        self
    }

    /// Attach stdout to a reply — e.g. the `CONFLICT …` text `git merge` writes
    /// to stdout on a failing reply, so a test can exercise
    /// [`Error::Exit`](crate::Error::Exit)'s stdout field /
    /// [`ProcessResult::diagnostic`](crate::ProcessResult::diagnostic).
    pub fn with_stdout(mut self, stdout: impl Into<String>) -> Self {
        self.stdout = stdout.into();
        self
    }

    /// Build a scripted live handle for `command` from this reply — the
    /// `start` analogue of [`into_result`](Self::into_result). The canned
    /// stdout/stderr feed the command's real pump machinery (handlers,
    /// encodings, buffer policy all apply); the scripted "process" exits with
    /// the canned code after the last delayed line (immediately without
    /// delays), or never for a [`pending`](Self::pending) reply.
    fn into_running(self, command: &Command) -> crate::RunningProcess {
        // A pending reply never exits on its own; everything else exits after
        // its (possibly zero) total line-delay budget. A canned timeout exits
        // immediately as a timed-out outcome, mirroring `into_result`.
        let lifetime = if self.pending {
            None
        } else {
            let per_line = self.line_delay.unwrap_or_default();
            let lines = self.stdout.split_inclusive('\n').count() as u32;
            Some(per_line * lines)
        };
        let scripted = crate::running::ScriptedProc::new(
            self.stdout,
            self.stderr,
            (!self.timed_out).then_some(self.code),
            self.timed_out,
            lifetime,
            self.line_delay,
        );
        crate::RunningProcess::from_scripted(command, scripted)
    }

    fn into_result(
        self,
        program: String,
        timeout: Option<std::time::Duration>,
    ) -> ProcessResult<String> {
        // A timed-out run carries no code (`None`); otherwise the canned code.
        let code = (!self.timed_out).then_some(self.code);
        // Carry the command's configured timeout so a timed-out reply surfaces as
        // `Error::Timeout` with the *real* deadline (matching the live runner),
        // not a zero duration.
        ProcessResult::new(
            program,
            self.stdout,
            self.stderr,
            code,
            self.timed_out,
            timeout,
        )
    }
}

type Predicate = Box<dyn Fn(&Command) -> bool + Send + Sync>;

enum Rule {
    /// Match when the command's arguments start with this prefix.
    Prefix(Vec<OsString>),
    /// Match when the predicate accepts the command.
    Predicate(Predicate),
}

impl Rule {
    fn matches(&self, command: &Command) -> bool {
        match self {
            Rule::Prefix(prefix) => command.arguments().starts_with(prefix),
            Rule::Predicate(pred) => pred(command),
        }
    }
}

/// A [`ProcessRunner`] that returns canned [`Reply`]s for matched commands.
///
/// Rules are tried in registration order; the first match wins. With no match,
/// the [`fallback`](Self::fallback) reply is used, or an error is returned.
#[derive(Default)]
pub struct ScriptedRunner {
    rules: Vec<(Rule, Reply)>,
    fallback: Option<Reply>,
}

// Manual: `Rule` holds an opaque predicate closure.
impl std::fmt::Debug for ScriptedRunner {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("ScriptedRunner")
            .field("rules", &self.rules.len())
            .field("has_fallback", &self.fallback.is_some())
            .finish_non_exhaustive()
    }
}

impl ScriptedRunner {
    /// An empty runner (every command misses until rules are added).
    pub fn new() -> Self {
        Self::default()
    }

    /// Reply with `reply` when the command's arguments start with `prefix`.
    pub fn on<I, S>(mut self, prefix: I, reply: Reply) -> Self
    where
        I: IntoIterator<Item = S>,
        S: AsRef<OsStr>,
    {
        let prefix = prefix
            .into_iter()
            .map(|s| s.as_ref().to_os_string())
            .collect();
        self.rules.push((Rule::Prefix(prefix), reply));
        self
    }

    /// Reply with `reply` when `predicate` accepts the command.
    pub fn when<F>(mut self, predicate: F, reply: Reply) -> Self
    where
        F: Fn(&Command) -> bool + Send + Sync + 'static,
    {
        self.rules
            .push((Rule::Predicate(Box::new(predicate)), reply));
        self
    }

    /// Reply with `reply` for any command no rule matched.
    pub fn fallback(mut self, reply: Reply) -> Self {
        self.fallback = Some(reply);
        self
    }

    /// The first reply matching `command` (rules in registration order, then
    /// the fallback), or the loud not-found spawn error.
    fn matched_reply(&self, command: &Command, program: &str) -> Result<&Reply> {
        for (rule, reply) in &self.rules {
            if rule.matches(command) {
                return Ok(reply);
            }
        }
        self.fallback
            .as_ref()
            .ok_or_else(|| crate::error::Error::Spawn {
                program: program.to_owned(),
                source: std::io::Error::new(
                    std::io::ErrorKind::NotFound,
                    "ScriptedRunner: no rule matched and no fallback set",
                ),
            })
    }
}

/// Replay the canned streams through the command's `on_stdout_line` /
/// `on_stderr_line` handlers, so a wrapper's progress-reporting path is
/// exercised hermetically on the bulk `output` verbs too (on a scripted
/// `start`, the real pumps already invoke them).
fn replay_line_handlers(command: &Command, reply: &Reply) {
    if let Some(handler) = command.stdout_handler() {
        for line in reply.stdout.lines() {
            handler(line);
        }
    }
    if let Some(handler) = command.stderr_handler() {
        for line in reply.stderr.lines() {
            handler(line);
        }
    }
}

#[async_trait::async_trait]
impl ProcessRunner for ScriptedRunner {
    async fn output(&self, command: &Command) -> Result<ProcessResult<String>> {
        let program = command.program().to_string_lossy().into_owned();
        let timeout = command.configured_timeout();
        let reply = self.matched_reply(command, &program)?;
        if reply.pending {
            return park_until_cancelled(command, program).await;
        }
        replay_line_handlers(command, reply);
        Ok(reply.clone().into_result(program, timeout))
    }

    /// Start a scripted live handle: the canned stdout/stderr flow through the
    /// command's **real** pump machinery (handlers, encodings, buffer policy),
    /// so `stdout_lines` / `wait_for_line` / `finish_streamed` behave exactly
    /// as on a real child — no subprocess involved.
    async fn start(&self, command: &Command) -> Result<crate::RunningProcess> {
        let program = command.program().to_string_lossy().into_owned();
        let reply = self.matched_reply(command, &program)?;
        Ok(reply.clone().into_running(command))
    }
}

/// Drive a [`Reply::pending`] match: wait for the command's cancellation token
/// and resolve as the live runner would — `Err(Error::Cancelled)`. With no
/// token (or without the `cancellation` feature, where `pending` replies can't
/// even be constructed) the call parks forever, like a hung child.
async fn park_until_cancelled(command: &Command, program: String) -> Result<ProcessResult<String>> {
    #[cfg(feature = "cancellation")]
    if let Some(token) = command.cancel_token() {
        token.cancelled().await;
        return Err(crate::error::Error::Cancelled { program });
    }
    #[cfg(not(feature = "cancellation"))]
    let _ = (command, program);
    std::future::pending().await
}

/// A captured record of one command a runner was asked to run.
///
/// Captures the *routing* knobs — program, args, cwd, env overrides, whether
/// stdin was supplied — not the I/O-shaping ones (`timeout`, encodings, buffer
/// policy, line handlers, `keep_stdin_open`, retry). Tests that need to assert
/// those inspect the built [`Command`] itself.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Invocation {
    /// The program name.
    pub program: OsString,
    /// The arguments, in order.
    pub args: Vec<OsString>,
    /// The working directory, if one was set.
    pub cwd: Option<OsString>,
    /// Environment overrides (`None` value = removal), in order.
    pub envs: Vec<(OsString, Option<OsString>)>,
    /// Whether a (non-empty) stdin source was provided.
    pub has_stdin: bool,
}

impl Invocation {
    // pub(crate): the `record` feature's cassette runner captures inputs
    // through this same single path, so recordings and `RecordingRunner`
    // assertions can never disagree on what an invocation is.
    pub(crate) fn from_command(command: &Command) -> Self {
        Self {
            program: command.program().to_os_string(),
            args: command.arguments().to_vec(),
            cwd: command.working_dir().map(|p| p.as_os_str().to_os_string()),
            envs: command.env_overrides().to_vec(),
            has_stdin: command
                .stdin_source()
                .is_some_and(|stdin| !stdin.is_empty()),
        }
    }

    /// Whether `flag` appears among the arguments.
    pub fn has_flag(&self, flag: impl AsRef<OsStr>) -> bool {
        let flag = flag.as_ref();
        self.args.iter().any(|a| a == flag)
    }

    /// The arguments as lossy UTF-8 strings, for ergonomic assertions
    /// (e.g. `assert_eq!(call.args_str(), ["pr", "create"])`).
    pub fn args_str(&self) -> Vec<String> {
        self.args
            .iter()
            .map(|a| a.to_string_lossy().into_owned())
            .collect()
    }
}

/// Wraps another [`ProcessRunner`], recording every [`Invocation`] before
/// delegating, so tests can assert exactly what was run.
pub struct RecordingRunner<R: ProcessRunner = ScriptedRunner> {
    inner: R,
    calls: Mutex<Vec<Invocation>>,
}

// Manual: the inner runner type parameter carries no `Debug` bound.
impl<R: ProcessRunner> std::fmt::Debug for RecordingRunner<R> {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        let calls = self.calls.lock().map(|c| c.len()).unwrap_or(0);
        f.debug_struct("RecordingRunner")
            .field("calls", &calls)
            .finish_non_exhaustive()
    }
}

impl RecordingRunner<ScriptedRunner> {
    /// A recorder whose inner runner replies with `reply` to everything.
    pub fn replying(reply: Reply) -> Self {
        Self::new(ScriptedRunner::new().fallback(reply))
    }
}

impl<R: ProcessRunner> RecordingRunner<R> {
    /// Wrap `inner`, recording all calls.
    pub fn new(inner: R) -> Self {
        Self {
            inner,
            calls: Mutex::new(Vec::new()),
        }
    }

    /// A snapshot of every recorded invocation, in order.
    pub fn calls(&self) -> Vec<Invocation> {
        self.calls.lock().expect("recorder lock poisoned").clone()
    }

    /// The single recorded invocation; panics unless exactly one was made.
    pub fn only_call(&self) -> Invocation {
        let calls = self.calls();
        assert_eq!(
            calls.len(),
            1,
            "expected exactly one call, got {}",
            calls.len()
        );
        calls.into_iter().next().expect("length checked above")
    }
}

#[async_trait::async_trait]
impl<R: ProcessRunner> ProcessRunner for RecordingRunner<R> {
    async fn output(&self, command: &Command) -> Result<ProcessResult<String>> {
        self.calls
            .lock()
            .expect("recorder lock poisoned")
            .push(Invocation::from_command(command));
        self.inner.output(command).await
    }

    async fn start(&self, command: &Command) -> Result<crate::RunningProcess> {
        // Recorded BEFORE delegating (like `output`), so a streamed run is
        // captured even if its stream is never consumed.
        self.calls
            .lock()
            .expect("recorder lock poisoned")
            .push(Invocation::from_command(command));
        self.inner.start(command).await
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::runner::ProcessRunnerExt;

    #[tokio::test]
    async fn scripted_start_streams_canned_lines_through_real_pumps() {
        use tokio_stream::StreamExt;
        let runner = ScriptedRunner::new().on(["log"], Reply::lines(["first", "second", "third"]));
        let cmd = Command::new("git").arg("log");
        let mut run = runner.start(&cmd).await.expect("scripted start");
        assert_eq!(run.pid(), None, "a scripted child has no OS identity");

        let mut lines = run.stdout_lines();
        let mut seen = Vec::new();
        while let Some(line) = lines.next().await {
            seen.push(line);
        }
        assert_eq!(seen, ["first", "second", "third"]);

        let (code, stderr) = run.finish_streamed().await.expect("finish");
        assert_eq!(code, Some(0));
        assert_eq!(stderr, "");
    }

    #[tokio::test]
    async fn scripted_start_supports_probes_and_failing_finish() {
        let runner = ScriptedRunner::new().fallback(
            Reply::fail(7, "boom: detail\n").with_stdout("starting up\nready to serve\n"),
        );
        let cmd = Command::new("server");
        let mut run = runner.start(&cmd).await.expect("scripted start");
        run.wait_for_line(|l| l.contains("ready"), std::time::Duration::from_secs(5))
            .await
            .expect("the canned banner satisfies the probe");
        let (code, stderr) = run.finish_streamed().await.expect("finish");
        assert_eq!(code, Some(7));
        assert_eq!(stderr, "boom: detail");
    }

    #[tokio::test]
    async fn scripted_start_consumed_by_output_string() {
        // The whole consuming surface works on a scripted handle, not just
        // streaming: output_string drains the same pumps.
        let runner = ScriptedRunner::new().fallback(Reply::lines(["a", "b"]));
        let run = runner.start(&Command::new("x")).await.expect("start");
        let result = run.output_string().await.expect("consume");
        assert!(result.is_success());
        assert_eq!(result.stdout(), "a\nb");
    }

    #[tokio::test(start_paused = true)]
    async fn scripted_line_delay_delivers_incrementally() {
        use tokio_stream::StreamExt;
        let runner = ScriptedRunner::new().fallback(
            Reply::lines(["tick", "tock"]).with_line_delay(std::time::Duration::from_secs(10)),
        );
        let mut run = runner
            .start(&Command::new("clock"))
            .await
            .expect("scripted start");
        let mut lines = run.stdout_lines();

        // Nothing arrives before the first delay elapses…
        assert!(
            tokio::time::timeout(std::time::Duration::from_secs(5), lines.next())
                .await
                .is_err(),
            "no line may arrive before its scripted delay"
        );
        // …then the paused clock advances and both lines flow.
        assert_eq!(lines.next().await.as_deref(), Some("tick"));
        assert_eq!(lines.next().await.as_deref(), Some("tock"));
        assert_eq!(lines.next().await, None);
    }

    #[tokio::test]
    async fn scripted_timeout_reply_surfaces_through_start() {
        let runner = ScriptedRunner::new().fallback(Reply::timeout());
        let cmd = Command::new("slow").timeout(std::time::Duration::from_secs(9));
        let run = runner.start(&cmd).await.expect("start");
        let result = run.output_string().await.expect("a timeout is captured");
        assert!(result.timed_out());
        assert!(!result.is_success());
    }

    #[tokio::test]
    async fn output_replays_canned_lines_through_handlers() {
        // The bulk path fires `on_stdout_line`/`on_stderr_line` for canned
        // replies, so a wrapper's progress reporting tests hermetically.
        use std::sync::{Arc, Mutex};
        let seen = Arc::new(Mutex::new(Vec::new()));
        let errs = Arc::new(Mutex::new(Vec::new()));
        let runner = ScriptedRunner::new().on(["fetch"], Reply::ok("a\nb\n").with_stdout("a\nb\n"));
        let cmd = Command::new("git")
            .arg("fetch")
            .on_stdout_line({
                let seen = seen.clone();
                move |l| seen.lock().unwrap().push(l.to_owned())
            })
            .on_stderr_line({
                let errs = errs.clone();
                move |l| errs.lock().unwrap().push(l.to_owned())
            });
        let result = runner.output(&cmd).await.expect("scripted run");
        assert!(result.is_success());
        assert_eq!(*seen.lock().unwrap(), ["a", "b"]);
        assert!(errs.lock().unwrap().is_empty());
    }

    #[tokio::test]
    async fn handler_calls_happen_before_the_consuming_verb_resolves() {
        // Pins the documented ordering guarantee: by the time a consuming
        // verb's future resolves, every line handler invocation has happened
        // (the pumps are joined before the result is assembled).
        use std::sync::{Arc, Mutex};
        let seen = Arc::new(Mutex::new(0usize));
        let lines: Vec<String> = (1..=100).map(|n| format!("line {n}")).collect();
        let runner = ScriptedRunner::new().fallback(Reply::lines(lines));
        let cmd = Command::new("x").on_stdout_line({
            let seen = seen.clone();
            move |_| *seen.lock().unwrap() += 1
        });
        let run = runner.start(&cmd).await.expect("scripted start");
        let result = run.output_string().await.expect("consume");
        assert!(result.is_success());
        assert_eq!(
            *seen.lock().unwrap(),
            100,
            "all handler calls happen-before the verb resolves"
        );
    }

    #[tokio::test]
    async fn recording_runner_records_start_invocations() {
        let rec = RecordingRunner::new(ScriptedRunner::new().fallback(Reply::lines(["x"])));
        let run = rec
            .start(&Command::new("gh").args(["run", "watch"]))
            .await
            .expect("recorded start");
        drop(run); // recorded even though the stream was never consumed
        assert_eq!(rec.only_call().args_str(), ["run", "watch"]);
    }

    #[cfg(feature = "cancellation")]
    #[tokio::test(start_paused = true)]
    async fn scripted_pending_start_is_cancellable() {
        let token = crate::CancellationToken::new();
        let runner = ScriptedRunner::new().fallback(Reply::pending());
        let cmd = Command::new("watch").cancel_on(token.clone());
        let run = runner.start(&cmd).await.expect("start");
        let consume = run.output_string();
        tokio::pin!(consume);
        assert!(
            tokio::time::timeout(std::time::Duration::from_secs(3600), &mut consume)
                .await
                .is_err(),
            "a pending scripted run must not resolve before cancellation"
        );
        token.cancel();
        let err = tokio::time::timeout(std::time::Duration::from_secs(3600), consume)
            .await
            .expect("the token resolves the run")
            .expect_err("cancellation is always an error");
        assert!(
            matches!(err, crate::error::Error::Cancelled { .. }),
            "got {err:?}"
        );
    }

    #[tokio::test]
    async fn prefix_rule_matches_and_replies() {
        let runner = ScriptedRunner::new().on(["status"], Reply::ok("clean"));
        let out = runner
            .output(&Command::new("git").arg("status"))
            .await
            .unwrap();
        assert_eq!(out.stdout(), "clean");
        assert!(out.is_success());
    }

    #[tokio::test]
    async fn predicate_rule_and_fallback() {
        let runner = ScriptedRunner::new()
            .when(
                |c| c.arguments().iter().any(|a| a == "--version"),
                Reply::ok("v1"),
            )
            .fallback(Reply::fail(1, "unknown"));

        assert_eq!(
            runner
                .output(&Command::new("tool").arg("--version"))
                .await
                .unwrap()
                .stdout(),
            "v1"
        );
        let miss = runner.output(&Command::new("tool").arg("x")).await.unwrap();
        assert_eq!(miss.code(), Some(1));
        assert!(!miss.is_success());
    }

    #[tokio::test]
    async fn no_match_without_fallback_is_a_not_found_spawn_error() {
        let runner = ScriptedRunner::new().on(["status"], Reply::ok("clean"));
        let err = runner
            .output(&Command::new("git").arg("log"))
            .await
            .expect_err("an unmatched command with no fallback must error");
        match err {
            crate::error::Error::Spawn { program, source } => {
                assert_eq!(program, "git");
                assert_eq!(source.kind(), std::io::ErrorKind::NotFound);
            }
            other => panic!("expected Error::Spawn, got {other:?}"),
        }
    }

    #[tokio::test]
    async fn prefix_matches_whole_elements_not_substrings() {
        let runner = ScriptedRunner::new().on(["foo"], Reply::ok("hit"));
        // ["foo", anything…] matches — element-wise prefix.
        assert!(
            runner
                .output(&Command::new("tool").args(["foo", "bar"]))
                .await
                .is_ok()
        );
        // ["foobar"] must NOT: "foo" is a substring, not an args prefix.
        assert!(
            runner
                .output(&Command::new("tool").arg("foobar"))
                .await
                .is_err(),
            "substring of an element is not a prefix match"
        );
    }

    #[tokio::test]
    async fn timeout_reply_surfaces_as_timeout_error() {
        use crate::error::Error;
        let runner = ScriptedRunner::new().fallback(Reply::timeout());
        // capture/output exposes the flag without erroring …
        let out = runner.output(&Command::new("git")).await.unwrap();
        assert!(out.timed_out());
        // … but the success-checking helpers raise a distinct Timeout.
        assert!(matches!(
            runner.run(&Command::new("git")).await.unwrap_err(),
            Error::Timeout { .. }
        ));
        assert!(matches!(
            runner.exit_code(&Command::new("git")).await.unwrap_err(),
            Error::Timeout { .. }
        ));
        // The reply carries the command's *real* configured deadline, matching the
        // live runner — not a zero duration.
        let cmd = Command::new("git").timeout(std::time::Duration::from_secs(7));
        match runner.run(&cmd).await.unwrap_err() {
            Error::Timeout { timeout, .. } => {
                assert_eq!(timeout, std::time::Duration::from_secs(7))
            }
            other => panic!("expected Timeout, got {other:?}"),
        }
    }

    #[cfg(feature = "cancellation")]
    #[tokio::test(start_paused = true)]
    async fn pending_parks_until_the_token_fires_then_cancels() {
        use crate::error::Error;
        let token = crate::CancellationToken::new();
        let runner = ScriptedRunner::new().on(["run", "watch"], Reply::pending());
        let cmd = Command::new("gh")
            .args(["run", "watch"])
            .cancel_on(token.clone());

        let call = runner.output(&cmd);
        tokio::pin!(call);
        assert!(
            tokio::time::timeout(std::time::Duration::from_secs(3600), &mut call)
                .await
                .is_err(),
            "a pending reply must not resolve before cancellation"
        );
        token.cancel();
        match call.await {
            Err(Error::Cancelled { program }) => assert_eq!(program, "gh"),
            other => panic!("expected Error::Cancelled, got {other:?}"),
        }
    }

    #[cfg(feature = "cancellation")]
    #[tokio::test(start_paused = true)]
    async fn pending_without_a_token_parks_forever() {
        // Documented: a pending reply for a command with no token behaves like
        // a hung child nobody can cancel.
        let runner = ScriptedRunner::new().fallback(Reply::pending());
        let cmd = Command::new("gh");
        let call = runner.output(&cmd);
        tokio::pin!(call);
        assert!(
            tokio::time::timeout(std::time::Duration::from_secs(3600), &mut call)
                .await
                .is_err()
        );
    }

    #[tokio::test]
    async fn probe_reads_exit_code_as_bool() {
        use crate::error::Error;
        let runner = ScriptedRunner::new()
            .on(["yes"], Reply::ok(""))
            .on(["no"], Reply::fail(1, ""))
            .on(["boom"], Reply::fail(2, "bad"))
            .fallback(Reply::timeout());
        // 0 -> true, 1 -> false.
        assert!(runner.probe(&Command::new("t").arg("yes")).await.unwrap());
        assert!(!runner.probe(&Command::new("t").arg("no")).await.unwrap());
        // Any other code -> Exit error; no code (timeout) -> Timeout error.
        assert!(matches!(
            runner
                .probe(&Command::new("t").arg("boom"))
                .await
                .unwrap_err(),
            Error::Exit { code: 2, .. }
        ));
        assert!(matches!(
            runner
                .probe(&Command::new("t").arg("other"))
                .await
                .unwrap_err(),
            Error::Timeout { .. }
        ));
    }

    #[tokio::test]
    async fn run_ext_trims_and_checks_success() {
        let runner = ScriptedRunner::new().fallback(Reply::ok("  hello \n"));
        let trimmed = runner.run(&Command::new("echo")).await.unwrap();
        assert_eq!(trimmed, "  hello");
    }

    #[tokio::test]
    async fn recording_captures_args_cwd_and_absence() {
        let recorder = RecordingRunner::replying(Reply::ok("ok"));
        recorder
            .output(
                &Command::new("gh")
                    .current_dir("/repo")
                    .args(["pr", "create", "--title", "T"]),
            )
            .await
            .unwrap();

        let call = recorder.only_call();
        assert_eq!(call.program, OsString::from("gh"));
        assert_eq!(call.cwd, Some(OsString::from("/repo")));
        assert!(call.has_flag("--title"));
        assert!(!call.has_flag("--base"), "no --base flag was passed");
    }
}