use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use processkit::{Command, OutputBufferPolicy};
use crate::common::*;
#[tokio::test]
#[ignore = "spawns a real subprocess that outlives its timeout"]
async fn streaming_honors_timeout() {
use tokio_stream::StreamExt;
let cmd = if cfg!(windows) {
Command::new("cmd").args(["/c", "echo one& ping -n 30 127.0.0.1 >NUL"])
} else {
Command::new("sh").args(["-c", "echo one; sleep 30"])
}
.timeout(Duration::from_millis(500));
let start = Instant::now();
let mut run = cmd.start().await.expect("start");
let mut lines = run.stdout_lines();
let mut seen = Vec::new();
while let Some(line) = lines.next().await {
seen.push(line);
}
drop(lines);
let (code, _stderr) = run.finish_streamed().await.expect("finish");
assert!(
start.elapsed() < Duration::from_secs(15),
"stream did not end at the deadline (took {:?})",
start.elapsed()
);
assert!(
!matches!(code, Some(0)),
"a timed-out streamed run must not look successful (got {code:?})"
);
assert!(seen.iter().any(|l| l.contains("one")), "saw: {seen:?}");
}
#[tokio::test]
#[ignore = "spawns a real subprocess"]
async fn stdout_line_handler_sees_every_line() {
let seen = Arc::new(Mutex::new(Vec::<String>::new()));
let captured = seen.clone();
let result = five_lines()
.on_stdout_line(move |line| captured.lock().unwrap().push(line.to_owned()))
.output_string()
.await
.expect("run");
assert!(result.is_success());
let lines = seen.lock().unwrap();
assert_eq!(lines.len(), 5, "handler saw: {lines:?}");
}
#[tokio::test]
#[ignore = "spawns a real subprocess"]
async fn output_buffer_drops_oldest_lines() {
let result = five_lines()
.output_buffer(OutputBufferPolicy::bounded(2))
.output_string()
.await
.expect("run");
let kept: Vec<&str> = result.stdout().lines().collect();
assert_eq!(kept.len(), 2, "retained: {:?}", result.stdout());
assert!(kept.iter().all(|l| l.trim() == "4" || l.trim() == "5"));
}
#[tokio::test]
#[ignore = "spawns a real subprocess driven via interactive stdin"]
async fn interactive_stdin_round_trips() {
let program = if cfg!(windows) {
Command::new("cmd").args(["/c", "sort"])
} else {
Command::new("sort")
};
let mut process = program.keep_stdin_open().start().await.expect("start sort");
let mut stdin = process.standard_input().expect("stdin kept open");
stdin.write_line("banana").await.expect("write");
stdin.write_line("apple").await.expect("write");
stdin.finish().await.expect("eof");
let result = process.output_string().await.expect("collect");
assert!(result.is_success());
let first = result
.stdout()
.lines()
.next()
.unwrap_or("")
.trim()
.to_owned();
assert_eq!(first, "apple", "sorted output: {:?}", result.stdout());
}
#[tokio::test]
#[ignore = "spawns a real subprocess and streams its stdout"]
async fn stdout_lines_streams_incrementally() {
use tokio_stream::StreamExt;
let mut process = two_line_echo().start().await.expect("start echo");
let mut lines = process.stdout_lines();
let mut collected: Vec<String> = Vec::new();
while let Some(line) = lines.next().await {
collected.push(line);
}
assert!(
collected.iter().any(|l| l.contains("first")),
"lines: {collected:?}"
);
assert!(
collected.iter().any(|l| l.contains("second")),
"lines: {collected:?}"
);
}
#[tokio::test]
#[ignore = "spawns a real subprocess: stream stdout, then collect exit + stderr"]
async fn finish_streamed_returns_code_and_stderr() {
use tokio_stream::StreamExt;
let cmd = if cfg!(windows) {
Command::new("cmd").args(["/c", "echo out& echo err 1>&2"])
} else {
Command::new("sh").args(["-c", "echo out; echo err 1>&2"])
};
let mut process = cmd.start().await.expect("start");
let mut lines = process.stdout_lines();
let mut out = Vec::new();
while let Some(line) = lines.next().await {
out.push(line);
}
drop(lines);
let (code, stderr) = process.finish_streamed().await.expect("finish");
assert_eq!(code, Some(0));
assert!(out.iter().any(|l| l.contains("out")), "stdout: {out:?}");
assert!(stderr.contains("err"), "stderr: {stderr:?}");
}
#[tokio::test]
#[ignore = "spawns a real subprocess"]
async fn second_stdout_lines_call_ends_immediately() {
use tokio_stream::StreamExt;
let mut process = five_lines().start().await.expect("start");
let mut first = process.stdout_lines();
let mut seen = 0;
while tokio::time::timeout(Duration::from_secs(10), first.next())
.await
.expect("first stream ends")
.is_some()
{
seen += 1;
}
assert_eq!(seen, 5);
let mut second = process.stdout_lines();
let next = tokio::time::timeout(Duration::from_secs(5), second.next())
.await
.expect("the second stream must end immediately");
assert!(next.is_none(), "second stream yields nothing: {next:?}");
let _ = process.finish_streamed().await;
}
#[tokio::test]
#[ignore = "spawns a real subprocess"]
async fn finish_streamed_without_streaming_first_drains_and_exits() {
let process = two_line_echo().start().await.expect("start");
let (code, _stderr) = tokio::time::timeout(Duration::from_secs(15), process.finish_streamed())
.await
.expect("finish_streamed must not hang without a prior stdout_lines")
.expect("finish");
assert_eq!(code, Some(0));
}