use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Weak};
use std::task::{Context, Poll};
use tokio::io::AsyncReadExt;
use tokio_stream::Stream;
use crate::error::Result;
use crate::group::ProcessGroup;
use crate::pump::{Popped, SharedLines, pump_lines};
use super::RunningProcess;
impl RunningProcess {
pub fn stdout_lines(&mut self) -> StdoutLines {
if self.stderr_sink.is_none() {
let stderr_sink = SharedLines::new(&self.buffer);
if let Some(pipe) = self.backend.take_stderr_reader() {
self.stderr_pump = Some(tokio::spawn(pump_lines(
pipe,
self.stderr_encoding,
self.stderr_handler.clone(),
stderr_sink.clone(),
)));
}
self.stderr_sink = Some(stderr_sink);
}
let stdout_sink = SharedLines::new(&self.buffer);
match self.backend.take_stdout_reader() {
Some(pipe) => {
tokio::spawn(pump_lines(
pipe,
self.stdout_encoding,
self.stdout_handler.clone(),
stdout_sink.clone(),
));
}
None => stdout_sink.close_now(),
}
self.stdout_sink = Some(stdout_sink.clone());
if self.deadline_task.is_none()
&& let (Some(limit), Some(group)) = (self.timeout, self.backend.own_group())
{
let group = Arc::downgrade(group);
let pid = self.pid;
let grace = self.timeout_grace;
let signal = self.timeout_signal;
self.deadline_task = Some(tokio::spawn(async move {
tokio::time::sleep(limit).await;
match grace {
Some(grace) => match group.upgrade() {
Some(group) => {
let _ = group.graceful_terminate(grace, signal).await;
}
None => kill_direct_child(pid),
},
None => kill_via_weak(&group, pid),
}
}));
}
#[cfg(feature = "cancellation")]
if self.cancel_task.is_none()
&& let (Some(token), Some(group)) =
(self.cancel_token.clone(), self.backend.own_group())
{
let group = Arc::downgrade(group);
let pid = self.pid;
self.cancel_task = Some(tokio::spawn(async move {
token.cancelled().await;
kill_via_weak(&group, pid);
}));
}
StdoutLines {
sink: stdout_sink,
wait: None,
}
}
pub async fn finish_streamed(mut self) -> Result<(Option<i32>, String)> {
if let Some(mut pipe) = self.backend.take_stdout_reader() {
tokio::spawn(async move {
let mut sink = Vec::new();
let _ = pipe.read_to_end(&mut sink).await;
});
}
if self.stderr_pump.is_none()
&& let Some(pipe) = self.backend.take_stderr_reader()
{
let sink = SharedLines::new(&self.buffer);
self.stderr_pump = Some(tokio::spawn(pump_lines(
pipe,
self.stderr_encoding,
self.stderr_handler.clone(),
sink.clone(),
)));
self.stderr_sink = Some(sink);
}
let outcome = self.drive_to_exit().await?;
self.observe_stdin_task().await;
let (code, _timed_out) = self.checked_outcome(outcome)?;
if let Some(pump) = self.stderr_pump.take() {
let _ = pump.await;
}
let stderr = self
.stderr_sink
.as_ref()
.map(|sink| sink.drain().join("\n"))
.unwrap_or_default();
Ok((code, stderr))
}
}
fn kill_via_weak(group: &Weak<ProcessGroup>, pid: Option<u32>) {
if let Some(group) = group.upgrade() {
let _ = group.terminate_all();
}
kill_direct_child(pid);
}
pub(crate) async fn graceful_kill_pid(pid: Option<u32>, grace: std::time::Duration, signal: i32) {
#[cfg(unix)]
{
let Some(pid) = pid else { return };
let pid = pid as i32;
unsafe {
libc::kill(pid, signal);
}
let deadline = tokio::time::Instant::now() + grace;
loop {
let now = tokio::time::Instant::now();
if now >= deadline {
break;
}
if unsafe { libc::kill(pid, 0) } != 0 {
return;
}
let poll = std::time::Duration::from_millis(20);
tokio::time::sleep(poll.min(deadline - now)).await;
}
unsafe {
libc::kill(pid, libc::SIGKILL);
}
}
#[cfg(not(unix))]
{
let _ = (grace, signal);
kill_direct_child(pid);
}
}
fn kill_direct_child(pid: Option<u32>) {
let Some(pid) = pid else { return };
#[cfg(unix)]
unsafe {
libc::kill(pid as i32, libc::SIGKILL);
}
#[cfg(windows)]
unsafe {
use windows_sys::Win32::Foundation::CloseHandle;
use windows_sys::Win32::System::Threading::{
OpenProcess, PROCESS_TERMINATE, TerminateProcess,
};
let handle = OpenProcess(PROCESS_TERMINATE, 0, pid);
if !handle.is_null() {
TerminateProcess(handle, 1);
CloseHandle(handle);
}
}
#[cfg(not(any(unix, windows)))]
let _ = pid;
}
pub struct StdoutLines {
sink: Arc<SharedLines>,
wait: Option<Pin<Box<dyn Future<Output = ()> + Send>>>,
}
impl std::fmt::Debug for StdoutLines {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("StdoutLines").finish_non_exhaustive()
}
}
impl Stream for StdoutLines {
type Item = String;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<String>> {
let this = self.get_mut();
loop {
match this.sink.try_pop() {
Popped::Line(line) => {
this.wait = None;
return Poll::Ready(Some(line));
}
Popped::Closed => return Poll::Ready(None),
Popped::Empty => {
if this.wait.is_none() {
this.wait = Some(Box::pin(this.sink.clone().changed()));
}
match this.wait.as_mut().expect("just set").as_mut().poll(cx) {
Poll::Ready(()) => {
this.wait = None;
continue;
}
Poll::Pending => return Poll::Pending,
}
}
}
}
}
}