use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Weak};
use std::task::{Context, Poll};
use tokio_stream::Stream;
use crate::error::Result;
use crate::group::ProcessGroup;
use crate::pump::{Popped, SharedLines, pump_lines_core};
use crate::result::Outcome;
use super::RunningProcess;
#[must_use = "a Finished carries the run's outcome; inspect `outcome` or it is silently discarded"]
#[derive(Debug, Clone, PartialEq, Eq)]
#[non_exhaustive]
pub struct Finished {
pub outcome: Outcome,
pub stderr: String,
}
impl RunningProcess {
pub fn stdout_lines(&mut self) -> Result<StdoutLines> {
let lines = self.drain_stdout_lines()?;
self.arm_stream_deadline();
Ok(lines)
}
pub(super) fn drain_stdout_lines(&mut self) -> Result<StdoutLines> {
self.ensure_stdout_streamable()?;
if self.stderr_sink.is_none() {
let stderr_sink = SharedLines::new(&self.buffer);
if let Some(pipe) = self.backend.take_stderr_reader() {
self.stderr_pump = Some(tokio::spawn(pump_lines_core(
pipe,
self.stderr_encoding,
self.stderr_handler.clone(),
self.stderr_tee.clone(),
stderr_sink.clone(),
)));
}
self.stderr_sink = Some(stderr_sink);
}
let stdout_sink = SharedLines::new(&self.buffer);
match self.backend.take_stdout_reader() {
Some(pipe) => {
self.stdout_pump = Some(tokio::spawn(pump_lines_core(
pipe,
self.stdout_encoding,
self.stdout_handler.clone(),
self.stdout_tee.clone(),
stdout_sink.clone(),
)));
}
None => stdout_sink.close_now(),
}
if self.stdout_sink.is_none() {
self.stdout_sink = Some(stdout_sink.clone());
}
Ok(StdoutLines {
sink: stdout_sink,
wait: None,
})
}
fn arm_stream_deadline(&mut self) {
if self.deadline_task.is_none()
&& let (Some(limit), Some(group)) = (self.timeout, self.backend.own_group())
{
let group = Arc::downgrade(group);
let pid = self.pid;
let grace = self.timeout_grace;
let signal = self.timeout_signal;
let started = self.started;
let timeout_state = self.timeout_state.clone();
self.deadline_task = Some(tokio::spawn(async move {
let remaining = limit
.checked_sub(started.elapsed())
.unwrap_or(std::time::Duration::ZERO);
tokio::time::sleep(remaining).await;
if timeout_state
.compare_exchange(
super::TS_PENDING,
super::TS_TIMED_OUT,
std::sync::atomic::Ordering::AcqRel,
std::sync::atomic::Ordering::Relaxed,
)
.is_err()
{
return; }
match grace {
Some(grace) => match group.upgrade() {
Some(group) => {
let _ = group.graceful_terminate(grace, signal).await;
}
None => kill_direct_child(pid), },
None => kill_via_weak(&group, pid),
}
}));
}
self.arm_scripted_deadline();
}
pub async fn finish(mut self) -> Result<Finished> {
if let Some(pipe) = self.backend.take_stdout_reader() {
let sink = crate::pump::SharedLines::new(&self.buffer);
self.stdout_pump = Some(tokio::spawn(crate::pump::pump_lines_core(
pipe,
self.stdout_encoding,
self.stdout_handler.clone(),
self.stdout_tee.clone(),
sink.clone(),
)));
self.stdout_sink = Some(sink);
}
if self.stderr_pump.is_none()
&& let Some(pipe) = self.backend.take_stderr_reader()
{
let sink = SharedLines::new(&self.buffer);
self.stderr_pump = Some(tokio::spawn(pump_lines_core(
pipe,
self.stderr_encoding,
self.stderr_handler.clone(),
self.stderr_tee.clone(),
sink.clone(),
)));
self.stderr_sink = Some(sink);
}
let raw_outcome = self.drive_to_exit().await?;
self.observe_stdin_task().await;
let pumps: Vec<_> = [self.stdout_pump.take(), self.stderr_pump.take()]
.into_iter()
.flatten()
.collect();
super::join_pumps(pumps).await;
let outcome = self.checked_outcome(raw_outcome)?;
for sink in [self.stdout_sink.as_ref(), self.stderr_sink.as_ref()]
.into_iter()
.flatten()
{
if sink.overflowed() {
return Err(crate::Error::OutputTooLarge {
program: self.program.clone(),
line_limit: self.buffer.max_lines,
byte_limit: self.buffer.max_bytes,
total_lines: sink.count(),
total_bytes: sink.seen_bytes(),
});
}
}
let stderr = self
.stderr_sink
.as_ref()
.map(|sink| sink.drain().join("\n"))
.unwrap_or_default();
Ok(Finished { outcome, stderr })
}
pub fn output_events(&mut self) -> Result<OutputEvents> {
self.ensure_stdout_streamable()?;
let stdout_sink = SharedLines::new(&self.buffer);
match self.backend.take_stdout_reader() {
Some(pipe) => {
self.stdout_pump = Some(tokio::spawn(pump_lines_core(
pipe,
self.stdout_encoding,
self.stdout_handler.clone(),
self.stdout_tee.clone(),
stdout_sink.clone(),
)));
}
None => stdout_sink.close_now(),
}
if self.stdout_sink.is_none() {
self.stdout_sink = Some(stdout_sink.clone());
}
let stderr_sink = if self.stderr_sink.is_none() {
let sink = SharedLines::new(&self.buffer);
if let Some(pipe) = self.backend.take_stderr_reader() {
self.stderr_pump = Some(tokio::spawn(pump_lines_core(
pipe,
self.stderr_encoding,
self.stderr_handler.clone(),
self.stderr_tee.clone(),
sink.clone(),
)));
} else {
sink.close_now();
}
self.stderr_sink = Some(sink.clone());
sink
} else {
let closed = SharedLines::new(&self.buffer);
closed.close_now();
closed
};
self.arm_stream_deadline();
Ok(OutputEvents {
stdout_sink,
stderr_sink,
stdout_wait: None,
stderr_wait: None,
stdout_done: false,
stderr_done: false,
prefer_stdout: true,
})
}
}
pub(super) fn kill_via_weak(group: &Weak<ProcessGroup>, pid: Option<u32>) {
if let Some(group) = group.upgrade() {
let _ = group.terminate_all();
}
kill_direct_child(pid);
}
pub(crate) async fn graceful_kill_pid(pid: Option<u32>, grace: std::time::Duration, signal: i32) {
#[cfg(unix)]
{
let Some(pid) = pid else { return };
let pid = pid as i32;
unsafe {
libc::kill(pid, signal);
}
let deadline = tokio::time::Instant::now() + grace.min(crate::MAX_DEADLINE);
loop {
let now = tokio::time::Instant::now();
if now >= deadline {
break;
}
let probe = unsafe { libc::kill(pid, 0) };
if probe != 0 && std::io::Error::last_os_error().raw_os_error() != Some(libc::EPERM) {
return; }
let poll = std::time::Duration::from_millis(20);
tokio::time::sleep(poll.min(deadline - now)).await;
}
unsafe {
libc::kill(pid, libc::SIGKILL);
}
}
#[cfg(not(unix))]
{
let _ = (grace, signal);
kill_direct_child(pid);
}
}
pub(super) fn kill_direct_child(pid: Option<u32>) {
let Some(pid) = pid else { return };
#[cfg(unix)]
unsafe {
libc::kill(pid as i32, libc::SIGKILL);
}
#[cfg(windows)]
unsafe {
use windows_sys::Win32::Foundation::CloseHandle;
use windows_sys::Win32::System::Threading::{
OpenProcess, PROCESS_TERMINATE, TerminateProcess,
};
let handle = OpenProcess(PROCESS_TERMINATE, 0, pid);
if !handle.is_null() {
TerminateProcess(handle, 1);
CloseHandle(handle);
}
}
}
pub struct StdoutLines {
sink: Arc<SharedLines>,
wait: Option<Pin<Box<dyn Future<Output = ()> + Send>>>,
}
impl std::fmt::Debug for StdoutLines {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("StdoutLines").finish_non_exhaustive()
}
}
impl Stream for StdoutLines {
type Item = String;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<String>> {
let this = self.get_mut();
loop {
match this.sink.try_pop() {
Popped::Line(line) => {
this.wait = None;
return Poll::Ready(Some(line));
}
Popped::Closed => return Poll::Ready(None),
Popped::Empty => {
if this.wait.is_none() {
this.wait = Some(Box::pin(this.sink.clone().changed()));
}
match this.wait.as_mut().expect("just set").as_mut().poll(cx) {
Poll::Ready(()) => {
this.wait = None;
continue;
}
Poll::Pending => return Poll::Pending,
}
}
}
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
#[non_exhaustive]
pub enum OutputEvent {
Stdout(OutputLine),
Stderr(OutputLine),
}
impl OutputEvent {
pub fn text(&self) -> Option<&str> {
match self {
OutputEvent::Stdout(line) | OutputEvent::Stderr(line) => Some(line.text()),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
#[non_exhaustive]
pub struct OutputLine {
text: String,
}
impl OutputLine {
pub fn text(&self) -> &str {
&self.text
}
#[must_use]
pub fn into_text(self) -> String {
self.text
}
}
pub struct OutputEvents {
stdout_sink: Arc<SharedLines>,
stderr_sink: Arc<SharedLines>,
stdout_wait: Option<Pin<Box<dyn Future<Output = ()> + Send>>>,
stderr_wait: Option<Pin<Box<dyn Future<Output = ()> + Send>>>,
stdout_done: bool,
stderr_done: bool,
prefer_stdout: bool,
}
impl std::fmt::Debug for OutputEvents {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("OutputEvents").finish_non_exhaustive()
}
}
impl Stream for OutputEvents {
type Item = OutputEvent;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<OutputEvent>> {
let this = self.get_mut();
loop {
for stdout_turn in [this.prefer_stdout, !this.prefer_stdout] {
if stdout_turn && !this.stdout_done {
match this.stdout_sink.try_pop() {
Popped::Line(line) => {
this.stdout_wait = None;
this.prefer_stdout = false; return Poll::Ready(Some(OutputEvent::Stdout(OutputLine {
text: line,
})));
}
Popped::Closed => {
this.stdout_done = true;
this.stdout_wait = None;
}
Popped::Empty => {}
}
} else if !stdout_turn && !this.stderr_done {
match this.stderr_sink.try_pop() {
Popped::Line(line) => {
this.stderr_wait = None;
this.prefer_stdout = true;
return Poll::Ready(Some(OutputEvent::Stderr(OutputLine {
text: line,
})));
}
Popped::Closed => {
this.stderr_done = true;
this.stderr_wait = None;
}
Popped::Empty => {}
}
}
}
if this.stdout_done && this.stderr_done {
return Poll::Ready(None);
}
let mut any_ready = false;
if !this.stdout_done {
if this.stdout_wait.is_none() {
this.stdout_wait = Some(Box::pin(this.stdout_sink.clone().changed()));
}
if this
.stdout_wait
.as_mut()
.expect("just set")
.as_mut()
.poll(cx)
.is_ready()
{
this.stdout_wait = None;
any_ready = true;
}
}
if !this.stderr_done {
if this.stderr_wait.is_none() {
this.stderr_wait = Some(Box::pin(this.stderr_sink.clone().changed()));
}
if this
.stderr_wait
.as_mut()
.expect("just set")
.as_mut()
.poll(cx)
.is_ready()
{
this.stderr_wait = None;
any_ready = true;
}
}
if any_ready {
continue;
}
return Poll::Pending;
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::buffer::OutputBufferPolicy;
use tokio_stream::StreamExt;
#[tokio::test]
async fn output_events_interleaves_fairly_between_two_ready_streams() {
let policy = OutputBufferPolicy::unbounded();
let stdout_sink = SharedLines::new(&policy);
let stderr_sink = SharedLines::new(&policy);
for line in ["o1", "o2", "o3"] {
stdout_sink.push(line.to_owned());
}
for line in ["e1", "e2", "e3"] {
stderr_sink.push(line.to_owned());
}
stdout_sink.close_now();
stderr_sink.close_now();
let mut events = OutputEvents {
stdout_sink,
stderr_sink,
stdout_wait: None,
stderr_wait: None,
stdout_done: false,
stderr_done: false,
prefer_stdout: true,
};
let mut seq = Vec::new();
while let Some(ev) = events.next().await {
seq.push(match ev {
OutputEvent::Stdout(l) => format!("O:{}", l.text()),
OutputEvent::Stderr(l) => format!("E:{}", l.text()),
});
}
assert_eq!(
seq,
["O:o1", "E:e1", "O:o2", "E:e2", "O:o3", "E:e3"],
"merged stream must interleave, not drain stdout first"
);
}
#[tokio::test]
async fn output_event_carries_an_output_line_with_a_text_accessor() {
let policy = OutputBufferPolicy::unbounded();
let stdout_sink = SharedLines::new(&policy);
let stderr_sink = SharedLines::new(&policy);
stdout_sink.push("out".to_owned());
stderr_sink.push("err".to_owned());
stdout_sink.close_now();
stderr_sink.close_now();
let mut events = OutputEvents {
stdout_sink,
stderr_sink,
stdout_wait: None,
stderr_wait: None,
stdout_done: false,
stderr_done: false,
prefer_stdout: true,
};
let first = events.next().await.expect("a stdout event");
assert!(
matches!(&first, OutputEvent::Stdout(line) if line.text() == "out"),
"stdout event carries an OutputLine: {first:?}"
);
assert_eq!(first.text(), Some("out"), "text() reads the line");
let second = events.next().await.expect("a stderr event");
assert!(matches!(&second, OutputEvent::Stderr(line) if line.text() == "err"));
assert_eq!(second.text(), Some("err"));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn stdout_lines_loses_no_line_under_a_parking_consumer() {
const N: usize = 5_000;
let sink = SharedLines::new(&OutputBufferPolicy::unbounded());
let producer = {
let sink = sink.clone();
tokio::spawn(async move {
for i in 0..N {
sink.push(i.to_string());
if i % 7 == 0 {
tokio::task::yield_now().await;
}
}
sink.close_now();
})
};
let mut lines = StdoutLines { sink, wait: None };
let consume = async {
let mut seen = 0usize;
while let Some(line) = lines.next().await {
assert_eq!(line, seen.to_string(), "lines must arrive in push order");
seen += 1;
}
seen
};
let seen = tokio::time::timeout(std::time::Duration::from_secs(30), consume)
.await
.expect("consumer hung — possible lost wakeup");
producer.await.expect("producer task");
assert_eq!(seen, N, "every pushed line must be received");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 3)]
async fn output_events_lose_no_line_under_two_racing_producers() {
const N: usize = 3_000;
let stdout_sink = SharedLines::new(&OutputBufferPolicy::unbounded());
let stderr_sink = SharedLines::new(&OutputBufferPolicy::unbounded());
let feed = |sink: Arc<SharedLines>| {
tokio::spawn(async move {
for i in 0..N {
sink.push(i.to_string());
if i % 5 == 0 {
tokio::task::yield_now().await;
}
}
sink.close_now();
})
};
let p_out = feed(stdout_sink.clone());
let p_err = feed(stderr_sink.clone());
let mut events = OutputEvents {
stdout_sink,
stderr_sink,
stdout_wait: None,
stderr_wait: None,
stdout_done: false,
stderr_done: false,
prefer_stdout: true,
};
let consume = async {
let (mut out, mut err) = (0usize, 0usize);
while let Some(ev) = events.next().await {
match ev {
OutputEvent::Stdout(_) => out += 1,
OutputEvent::Stderr(_) => err += 1,
}
}
(out, err)
};
let (out, err) = tokio::time::timeout(std::time::Duration::from_secs(30), consume)
.await
.expect("consumer hung — possible lost wakeup");
p_out.await.expect("stdout producer");
p_err.await.expect("stderr producer");
assert_eq!(out, N, "every stdout line received");
assert_eq!(err, N, "every stderr line received");
}
}