procpilot 0.7.0

Production-grade subprocess runner with typed errors, retry, and timeout
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
//! Integration tests for `Cmd::run_async` and `Cmd::spawn_async`.

use std::time::{Duration, Instant};

use procpilot::{Cmd, RetryPolicy};
use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};

const PP_ECHO: &str = env!("CARGO_BIN_EXE_pp_echo");
const PP_CAT: &str = env!("CARGO_BIN_EXE_pp_cat");
const PP_SLEEP: &str = env!("CARGO_BIN_EXE_pp_sleep");
const PP_STATUS: &str = env!("CARGO_BIN_EXE_pp_status");
const PP_PRINT_ENV: &str = env!("CARGO_BIN_EXE_pp_print_env");

#[tokio::test]
async fn run_async_before_spawn_hook_fires_once() {
    use std::sync::Arc;
    use std::sync::atomic::{AtomicUsize, Ordering};
    let count = Arc::new(AtomicUsize::new(0));
    let c = count.clone();
    let out = Cmd::new(PP_ECHO)
        .arg("hi")
        .before_spawn(move |_cmd| {
            c.fetch_add(1, Ordering::SeqCst);
            Ok(())
        })
        .run_async()
        .await
        .expect("ok");
    assert_eq!(count.load(Ordering::SeqCst), 1);
    assert_eq!(out.stdout_lossy().trim(), "hi");
}

#[tokio::test]
async fn run_async_before_spawn_fires_per_stage_on_pipeline() {
    use std::sync::Arc;
    use std::sync::atomic::{AtomicUsize, Ordering};
    let count = Arc::new(AtomicUsize::new(0));
    let c = count.clone();
    let _ = Cmd::new(PP_ECHO)
        .arg("x")
        .pipe(Cmd::new(PP_CAT))
        .pipe(Cmd::new(PP_CAT))
        .before_spawn(move |_cmd| {
            c.fetch_add(1, Ordering::SeqCst);
            Ok(())
        })
        .run_async()
        .await
        .expect("ok");
    assert_eq!(count.load(Ordering::SeqCst), 3);
}

#[tokio::test]
async fn run_async_before_spawn_error_aborts_with_spawn_error() {
    use std::io;
    let err = Cmd::new(PP_ECHO)
        .arg("unused")
        .before_spawn(|_cmd| Err(io::Error::other("hook rejected")))
        .run_async()
        .await
        .expect_err("hook error should abort");
    assert!(err.is_spawn_failure());
}

#[tokio::test]
async fn run_async_before_spawn_can_mutate_env() {
    // Hook sets TEST_BEFORE_SPAWN via as_std_mut. Verify the child sees it.
    let out = Cmd::new(PP_PRINT_ENV)
        .arg("TEST_BEFORE_SPAWN")
        .before_spawn(|cmd| {
            cmd.env("TEST_BEFORE_SPAWN", "set_by_hook");
            Ok(())
        })
        .run_async()
        .await
        .expect("ok");
    assert_eq!(out.stdout_lossy().trim(), "set_by_hook");
}

#[tokio::test]
async fn run_async_captures_stdout() {
    let out = Cmd::new(PP_ECHO).arg("hello").run_async().await.expect("ok");
    assert_eq!(out.stdout_lossy().trim(), "hello");
}

#[tokio::test]
async fn run_async_surfaces_nonzero() {
    let err = Cmd::new(PP_STATUS)
        .args(["1", "--err", "boom"])
        .run_async()
        .await
        .expect_err("fail");
    assert!(err.is_non_zero_exit());
    assert_eq!(err.stderr(), Some("boom\n"));
}

#[tokio::test]
async fn run_async_missing_binary_is_spawn_failure() {
    let err = Cmd::new("nonexistent_binary_xyz_42")
        .run_async()
        .await
        .expect_err("fail");
    assert!(err.is_spawn_failure());
}

#[tokio::test]
async fn run_async_with_env_and_cwd() {
    let tmp = tempfile::tempdir().expect("tempdir");
    let out = Cmd::new(PP_PRINT_ENV)
        .arg("ASYNC_TEST")
        .env("ASYNC_TEST", "hello")
        .in_dir(tmp.path())
        .run_async()
        .await
        .expect("ok");
    assert_eq!(out.stdout_lossy().trim(), "hello");
}

#[tokio::test]
async fn run_async_stdin_bytes() {
    let out = Cmd::new(PP_CAT)
        .stdin("piped through cat\n")
        .run_async()
        .await
        .expect("ok");
    assert_eq!(out.stdout_lossy().trim(), "piped through cat");
}

#[tokio::test]
async fn run_async_stdin_reader() {
    use std::io::Cursor;
    use procpilot::StdinData;
    let out = Cmd::new(PP_CAT)
        .stdin(StdinData::from_reader(Cursor::new(b"from reader".to_vec())))
        .run_async()
        .await
        .expect("ok");
    assert_eq!(out.stdout_lossy(), "from reader");
}

#[tokio::test]
async fn run_async_stdin_async_reader_streams() {
    use procpilot::StdinData;
    let src = std::io::Cursor::new(b"async stream".to_vec());
    let out = Cmd::new(PP_CAT)
        .stdin(StdinData::from_async_reader(src))
        .run_async()
        .await
        .expect("ok");
    assert_eq!(out.stdout_lossy(), "async stream");
}

#[tokio::test]
async fn sync_rejection_preserves_async_reader_for_later_clone() {
    use procpilot::StdinData;
    // If rejection happens AFTER consuming the reader, a later run_async on
    // a clone would get no stdin. Fix 1 ensures rejection happens BEFORE
    // attempt_stdin takes the reader.
    let base = Cmd::new(PP_CAT).stdin(StdinData::from_async_reader(
        std::io::Cursor::new(b"preserved".to_vec()),
    ));
    let sync_clone = base.clone();
    let async_clone = base.clone();

    // First: sync attempt that must reject.
    let err = sync_clone.run().expect_err("sync must reject AsyncReader");
    assert!(err.is_spawn_failure());

    // Second: async attempt on the SAME underlying reader via a clone —
    // must still succeed with the full payload because the reject path
    // never consumed the reader.
    let out = async_clone.run_async().await.expect("async ok");
    assert_eq!(out.stdout_lossy(), "preserved");
}

#[tokio::test]
async fn sync_rejection_with_spawn_retrying_predicate_does_not_retry() {
    use procpilot::{RetryPolicy, RunError, StdinData};
    use std::sync::Arc;
    use std::sync::atomic::{AtomicUsize, Ordering};

    // Custom predicate that would retry Spawn errors. The sync rejection
    // must not trigger it (the check is a structural pre-condition, not a
    // transient failure). The predicate should never see our error — the
    // failure happens before any attempt reaches the retry loop.
    let attempts = Arc::new(AtomicUsize::new(0));
    let counter = attempts.clone();

    let err = Cmd::new(PP_CAT)
        .stdin(StdinData::from_async_reader(std::io::Cursor::new(
            b"nope".to_vec(),
        )))
        .retry(RetryPolicy::default())
        .retry_when(move |_err| {
            counter.fetch_add(1, Ordering::SeqCst);
            true
        })
        .run()
        .expect_err("must reject");
    match err {
        RunError::Spawn { source, .. } => {
            assert_eq!(source.kind(), std::io::ErrorKind::InvalidInput);
        }
        other => panic!("expected Spawn(InvalidInput), got {other:?}"),
    }
    // Predicate never called — rejection short-circuits before retry loop.
    assert_eq!(attempts.load(Ordering::SeqCst), 0);
}

#[tokio::test]
async fn run_sync_with_async_reader_returns_invalid_input() {
    use procpilot::{RunError, StdinData};
    let err = Cmd::new(PP_CAT)
        .stdin(StdinData::from_async_reader(std::io::Cursor::new(
            b"nope".to_vec(),
        )))
        .run()
        .expect_err("sync runner rejects async reader");
    match err {
        RunError::Spawn { source, .. } => {
            assert_eq!(source.kind(), std::io::ErrorKind::InvalidInput);
        }
        other => panic!("expected Spawn(InvalidInput), got {other:?}"),
    }
}

#[tokio::test]
async fn run_async_async_reader_is_one_shot_across_clones() {
    use procpilot::StdinData;
    // Cursor is AsyncRead. Two clones share the same Mutex<Option<Reader>>;
    // whichever attempt runs first takes it.
    let base = Cmd::new(PP_CAT)
        .stdin(StdinData::from_async_reader(std::io::Cursor::new(
            b"taken once".to_vec(),
        )));
    let a = base.clone();
    let b = base.clone();
    let out_a = a.run_async().await.expect("ok");
    let out_b = b.run_async().await.expect("ok");
    // First clone got the bytes, second got None (empty stdin → cat echoes nothing).
    let (primary, empty) = if out_a.stdout.is_empty() {
        (out_b, out_a)
    } else {
        (out_a, out_b)
    };
    assert_eq!(primary.stdout_lossy(), "taken once");
    assert_eq!(empty.stdout_lossy(), "");
}

#[tokio::test]
async fn run_async_timeout_fires() {
    let start = Instant::now();
    let err = Cmd::new(PP_SLEEP)
        .arg("10000")
        .timeout(Duration::from_millis(200))
        .run_async()
        .await
        .expect_err("should time out");
    assert!(err.is_timeout());
    assert!(start.elapsed() < Duration::from_secs(5));
}

#[tokio::test]
async fn run_async_retry_default_predicate_does_not_fire_without_match() {
    // Default predicate matches "stale" / ".lock" — plain exit 1 with no
    // stderr should not retry. One attempt, one failure.
    let err = Cmd::new(PP_STATUS)
        .arg("1")
        .retry(RetryPolicy::default())
        .run_async()
        .await
        .expect_err("fail");
    assert!(err.is_non_zero_exit());
}

#[tokio::test]
async fn run_async_retry_actually_loops() {
    use std::sync::Arc;
    use std::sync::atomic::{AtomicUsize, Ordering};

    // Custom predicate that returns true (retry) for the first two attempts,
    // then false (surface the error). The hook tracks how many spawn
    // attempts occurred — should be exactly 3.
    let attempts = Arc::new(AtomicUsize::new(0));
    let counter = attempts.clone();

    let err = Cmd::new(PP_STATUS)
        .arg("1")
        .retry(RetryPolicy::default())
        .retry_when(move |_err| counter.fetch_add(1, Ordering::SeqCst) < 2)
        .run_async()
        .await
        .expect_err("fail");
    assert!(err.is_non_zero_exit());
    // Predicate called once per failed attempt. 3 attempts means it was
    // called 3 times (twice returning true, once returning false).
    assert_eq!(attempts.load(Ordering::SeqCst), 3);
}

#[tokio::test]
async fn run_async_parallel_via_try_join() {
    let a = Cmd::new(PP_ECHO).arg("a").run_async();
    let b = Cmd::new(PP_ECHO).arg("b").run_async();
    let c = Cmd::new(PP_ECHO).arg("c").run_async();
    let (ra, rb, rc) = tokio::try_join!(a, b, c).expect("all ok");
    assert_eq!(ra.stdout_lossy().trim(), "a");
    assert_eq!(rb.stdout_lossy().trim(), "b");
    assert_eq!(rc.stdout_lossy().trim(), "c");
}

#[tokio::test]
async fn run_async_pipeline() {
    let out = Cmd::new(PP_ECHO)
        .arg("piped")
        .pipe(Cmd::new(PP_CAT))
        .run_async()
        .await
        .expect("ok");
    assert_eq!(out.stdout_lossy().trim(), "piped");
}

#[tokio::test]
async fn run_async_pipeline_pipefail() {
    let err = Cmd::new(PP_ECHO)
        .arg("x")
        .pipe(Cmd::new(PP_STATUS).arg("2"))
        .run_async()
        .await
        .expect_err("fail");
    assert!(err.is_non_zero_exit());
    assert_eq!(err.exit_status().and_then(|s| s.code()), Some(2));
}

#[tokio::test]
async fn spawn_async_wait_succeeds() {
    let mut proc = Cmd::new(PP_ECHO).arg("hi").spawn_async().await.expect("spawn");
    let out = proc.wait().await.expect("wait");
    assert_eq!(out.stdout_lossy().trim(), "hi");
}

#[tokio::test]
async fn spawn_async_take_stdin_stdout_bidirectional() {
    let mut proc = Cmd::new(PP_CAT).spawn_async().await.expect("spawn");
    let mut stdin = proc.take_stdin().expect("stdin piped");
    let mut stdout = proc.take_stdout().expect("stdout piped");

    tokio::spawn(async move {
        stdin.write_all(b"async bidirectional\n").await.expect("write");
        drop(stdin);
    });

    let mut buf = String::new();
    stdout.read_to_string(&mut buf).await.expect("read");
    let out = proc.wait().await.expect("wait ok after stdin closed");
    assert!(buf.contains("async bidirectional"));
    // RunOutput carries no stdout here because the caller drained via
    // take_stdout — the drain inside finalize is a no-op in that path.
    assert!(out.stdout.is_empty());
}

#[tokio::test]
async fn spawn_async_kill_via_select() {
    let mut proc = Cmd::new(PP_SLEEP).arg("10000").spawn_async().await.expect("spawn");
    let cancel = tokio::time::sleep(Duration::from_millis(100));
    tokio::pin!(cancel);
    tokio::select! {
        _ = proc.wait() => panic!("should not exit on its own"),
        _ = &mut cancel => {
            proc.kill().await.expect("kill");
            let _ = proc.wait().await;
        }
    }
}

#[tokio::test]
async fn spawn_async_wait_timeout_returns_none_while_running() {
    let mut proc = Cmd::new(PP_SLEEP).arg("5000").spawn_async().await.expect("spawn");
    let res = proc
        .wait_timeout(Duration::from_millis(100))
        .await
        .expect("wait_timeout");
    assert!(res.is_none());
    proc.kill().await.expect("kill");
    let _ = proc.wait().await;
}

#[tokio::test]
async fn spawn_async_pipeline_pids_count() {
    let mut proc = Cmd::new(PP_ECHO)
        .arg("x")
        .pipe(Cmd::new(PP_CAT))
        .spawn_async()
        .await
        .expect("spawn");
    assert!(proc.is_pipeline());
    assert_eq!(proc.pids().len(), 2);
    let out = proc.wait().await.expect("wait");
    assert_eq!(out.stdout_lossy().trim(), "x");
}

#[tokio::test]
async fn async_wait_is_idempotent_on_success() {
    let mut proc = Cmd::new(PP_ECHO).arg("async-idempotent").spawn_async().await.expect("spawn");
    let first = proc.wait().await.expect("first wait");
    let second = proc.wait().await.expect("second wait");
    assert_eq!(first.stdout, second.stdout);
    assert_eq!(first.stdout_lossy().trim(), "async-idempotent");
}

#[tokio::test]
async fn async_wait_is_idempotent_on_failure() {
    let mut proc = Cmd::new(PP_STATUS)
        .args(["5", "--err", "err-bytes"])
        .spawn_async()
        .await
        .expect("spawn");
    let first = proc.wait().await.expect_err("first fails");
    let second = proc.wait().await.expect_err("second fails");
    assert_eq!(first.exit_status().unwrap().code(), Some(5));
    assert_eq!(second.exit_status().unwrap().code(), Some(5));
    assert_eq!(first.stderr(), second.stderr());
}

#[tokio::test]
async fn async_wait_timeout_none_then_wait_works() {
    let mut proc = Cmd::new(PP_SLEEP).arg("200").spawn_async().await.expect("spawn");
    let first = proc
        .wait_timeout(Duration::from_millis(50))
        .await
        .expect("wait_timeout");
    assert!(first.is_none());
    let out = proc.wait().await.expect("wait after timeout");
    assert!(out.stderr.is_empty());
}

#[tokio::test]
async fn async_cancel_via_select_then_wait_returns_same() {
    // The README's cancellation pattern: select! drops the first wait,
    // kill + second wait must complete.
    let mut proc = Cmd::new(PP_SLEEP).arg("10000").spawn_async().await.expect("spawn");
    let cancel = tokio::time::sleep(Duration::from_millis(100));
    tokio::pin!(cancel);
    let _first_result = tokio::select! {
        res = proc.wait() => Some(res),
        _ = &mut cancel => None,
    };
    // Cancellation fired; first wait was dropped. Kill and wait again.
    proc.kill().await.expect("kill");
    let _second = proc.wait().await;
    // Third wait must be idempotent with the second.
    let _third = proc.wait().await;
}

#[tokio::test]
async fn async_pipeline_spawn_failure_does_not_leak_earlier_stages() {
    let start = Instant::now();
    let err = Cmd::new(PP_SLEEP)
        .arg("10000")
        .pipe(Cmd::new("nonexistent_binary_xyz_42"))
        .run_async()
        .await
        .expect_err("should fail");
    let elapsed = start.elapsed();
    assert!(err.is_spawn_failure());
    assert!(
        elapsed < Duration::from_secs(2),
        "async pipeline spawn-failure cleanup didn't kill stage 1 (took {elapsed:?})"
    );
}

#[tokio::test]
async fn spawn_async_streaming_lines() {
    let mut proc = Cmd::new(PP_CAT)
        .stdin("one\ntwo\nthree\n")
        .spawn_async()
        .await
        .expect("spawn");
    let stdout = proc.take_stdout().expect("piped");
    let mut reader = BufReader::new(stdout).lines();

    let mut lines = Vec::new();
    while let Ok(Some(line)) = reader.next_line().await {
        lines.push(line);
    }
    let _ = proc.wait().await;
    assert_eq!(lines, vec!["one", "two", "three"]);
}