#[cfg(unix)]
use std::os::unix::io::{FromRawFd, IntoRawFd, OwnedFd};
#[cfg(windows)]
use std::os::windows::io::{FromRawHandle, IntoRawHandle, OwnedHandle};
use command_group::AsyncGroupChild;
use futures::{StreamExt, stream::BoxStream};
use tokio::io::{AsyncWrite, AsyncWriteExt};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tokio_util::io::ReaderStream;
use crate::executors::ExecutorError;
pub fn duplicate_stdout(
child: &mut AsyncGroupChild,
) -> Result<BoxStream<'static, std::io::Result<String>>, ExecutorError> {
let original_stdout = child.inner().stdout.take().ok_or_else(|| {
ExecutorError::Io(std::io::Error::new(
std::io::ErrorKind::NotFound,
"Child process has no stdout",
))
})?;
let (pipe_reader, pipe_writer) = os_pipe::pipe().map_err(|e| {
ExecutorError::Io(std::io::Error::other(format!("Failed to create pipe: {e}")))
})?;
child.inner().stdout = Some(wrap_fd_as_child_stdout(pipe_reader)?);
let mut fd_writer = wrap_fd_as_tokio_writer(pipe_writer)?;
let (dup_writer, dup_reader) =
tokio::sync::mpsc::unbounded_channel::<std::io::Result<String>>();
tokio::spawn(async move {
let mut stdout_stream = ReaderStream::new(original_stdout);
while let Some(res) = stdout_stream.next().await {
match res {
Ok(data) => {
let _ = fd_writer.write_all(&data).await;
let string_chunk = String::from_utf8_lossy(&data).into_owned();
let _ = dup_writer.send(Ok(string_chunk));
}
Err(err) => {
tracing::error!("Error reading from child stdout: {}", err);
let _ = dup_writer.send(Err(err));
}
}
}
});
Ok(Box::pin(UnboundedReceiverStream::new(dup_reader)))
}
pub struct StdoutAppender {
tx: tokio::sync::mpsc::UnboundedSender<String>,
}
impl StdoutAppender {
pub fn append_line<S: Into<String>>(&self, line: S) {
let _ = self.tx.send(line.into());
}
}
pub fn tee_stdout_with_appender(
child: &mut AsyncGroupChild,
) -> Result<(BoxStream<'static, std::io::Result<String>>, StdoutAppender), ExecutorError> {
let original_stdout = child.inner().stdout.take().ok_or_else(|| {
ExecutorError::Io(std::io::Error::new(
std::io::ErrorKind::NotFound,
"Child process has no stdout",
))
})?;
let (pipe_reader, pipe_writer) = os_pipe::pipe().map_err(|e| {
ExecutorError::Io(std::io::Error::other(format!("Failed to create pipe: {e}")))
})?;
child.inner().stdout = Some(wrap_fd_as_child_stdout(pipe_reader)?);
let writer = wrap_fd_as_tokio_writer(pipe_writer)?;
let shared_writer = std::sync::Arc::new(tokio::sync::Mutex::new(writer));
let (dup_tx, dup_rx) = tokio::sync::mpsc::unbounded_channel::<std::io::Result<String>>();
let (inj_tx, mut inj_rx) = tokio::sync::mpsc::unbounded_channel::<String>();
{
let shared_writer = shared_writer.clone();
tokio::spawn(async move {
let mut stdout_stream = ReaderStream::new(original_stdout);
while let Some(res) = stdout_stream.next().await {
match res {
Ok(data) => {
let mut w = shared_writer.lock().await;
let _ = w.write_all(&data).await;
let string_chunk = String::from_utf8_lossy(&data).into_owned();
let _ = dup_tx.send(Ok(string_chunk));
}
Err(err) => {
let _ = dup_tx.send(Err(err));
}
}
}
});
}
{
let shared_writer = shared_writer.clone();
tokio::spawn(async move {
while let Some(line) = inj_rx.recv().await {
let mut data = line.into_bytes();
data.push(b'\n');
let mut w = shared_writer.lock().await;
let _ = w.write_all(&data).await;
}
});
}
Ok((
Box::pin(UnboundedReceiverStream::new(dup_rx)),
StdoutAppender { tx: inj_tx },
))
}
pub fn create_stdout_pipe_writer<'b>(
child: &mut AsyncGroupChild,
) -> Result<impl AsyncWrite + 'b, ExecutorError> {
let (pipe_reader, pipe_writer) = os_pipe::pipe().map_err(|e| {
ExecutorError::Io(std::io::Error::other(format!("Failed to create pipe: {e}")))
})?;
child.inner().stdout = Some(wrap_fd_as_child_stdout(pipe_reader)?);
wrap_fd_as_tokio_writer(pipe_writer)
}
fn wrap_fd_as_child_stdout(
pipe_reader: os_pipe::PipeReader,
) -> Result<tokio::process::ChildStdout, ExecutorError> {
#[cfg(unix)]
{
let raw_fd = pipe_reader.into_raw_fd();
let owned_fd = unsafe { OwnedFd::from_raw_fd(raw_fd) };
let std_stdout = std::process::ChildStdout::from(owned_fd);
tokio::process::ChildStdout::from_std(std_stdout).map_err(ExecutorError::Io)
}
#[cfg(windows)]
{
let raw_handle = pipe_reader.into_raw_handle();
let owned_handle = unsafe { OwnedHandle::from_raw_handle(raw_handle) };
let std_stdout = std::process::ChildStdout::from(owned_handle);
tokio::process::ChildStdout::from_std(std_stdout).map_err(ExecutorError::Io)
}
}
fn wrap_fd_as_tokio_writer(
pipe_writer: os_pipe::PipeWriter,
) -> Result<impl AsyncWrite, ExecutorError> {
#[cfg(unix)]
{
let raw_fd = pipe_writer.into_raw_fd();
let owned_fd = unsafe { OwnedFd::from_raw_fd(raw_fd) };
let std_file = std::fs::File::from(owned_fd);
Ok(tokio::fs::File::from_std(std_file))
}
#[cfg(windows)]
{
let raw_handle = pipe_writer.into_raw_handle();
let owned_handle = unsafe { OwnedHandle::from_raw_handle(raw_handle) };
let std_file = std::fs::File::from(owned_handle);
Ok(tokio::fs::File::from_std(std_file))
}
}