use tokio::{
io::{AsyncBufReadExt, BufReader},
process::{ChildStderr, ChildStdout},
sync::mpsc,
time::timeout,
};
use super::types::{CommandStream, StreamConfig, StreamOutput};
use crate::error::{CommandError, Error, Result};
use std::{
sync::{
Arc,
atomic::{AtomicBool, Ordering},
},
time::Duration,
};
impl Default for StreamConfig {
fn default() -> Self {
Self { buffer_size: 1024, read_timeout: Duration::from_secs(1) }
}
}
impl StreamConfig {
#[must_use]
pub fn new(buffer_size: usize, read_timeout: Duration) -> Self {
Self { buffer_size, read_timeout }
}
}
impl CommandStream {
#[must_use]
pub fn new(stdout: ChildStdout, stderr: ChildStderr, config: &StreamConfig) -> Self {
let (tx, rx) = mpsc::channel(config.buffer_size);
let cancel = Arc::new(AtomicBool::new(false));
let stdout_cancel = Arc::clone(&cancel);
let stderr_cancel = Arc::clone(&cancel);
let stdout_tx = tx.clone();
let stderr_tx = tx;
tokio::spawn(async move {
let mut reader = BufReader::new(stdout).lines();
while let Ok(Some(line)) = reader.next_line().await {
if stdout_cancel.load(Ordering::Relaxed) {
break;
}
if stdout_tx.send(StreamOutput::Stdout(line)).await.is_err() {
break;
}
}
});
tokio::spawn(async move {
let mut reader = BufReader::new(stderr).lines();
while let Ok(Some(line)) = reader.next_line().await {
if stderr_cancel.load(Ordering::Relaxed) {
break;
}
if stderr_tx.send(StreamOutput::Stderr(line)).await.is_err() {
break;
}
}
});
Self { rx, cancel }
}
pub async fn next_timeout(
&mut self,
timeout_duration: Duration,
) -> Result<Option<StreamOutput>> {
match timeout(timeout_duration, self.rx.recv()).await {
Ok(Some(output)) => Ok(Some(output)),
Ok(None) => Ok(None),
Err(_) => Err(Error::Command(CommandError::Timeout { duration: timeout_duration })),
}
}
pub fn cancel(&self) {
self.cancel.store(true, Ordering::Relaxed);
}
}