use std::fmt;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::process::Stdio;
use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncWriteExt};
use tokio::sync::Mutex as AsyncMutex;
use tokio_stream::{Stream, StreamExt};
type SharedReader = Arc<AsyncMutex<Option<Pin<Box<dyn AsyncRead + Send>>>>>;
type SharedLines = Arc<AsyncMutex<Option<Pin<Box<dyn Stream<Item = String> + Send>>>>>;
#[derive(Clone)]
pub struct Stdin(Source);
#[derive(Clone)]
enum Source {
Empty,
Bytes(Vec<u8>),
File(PathBuf),
Reader(SharedReader),
Lines(SharedLines),
}
impl Stdin {
pub fn empty() -> Self {
Stdin(Source::Empty)
}
pub fn from_string(text: impl Into<String>) -> Self {
Stdin(Source::Bytes(text.into().into_bytes()))
}
pub fn from_bytes(bytes: impl Into<Vec<u8>>) -> Self {
Stdin(Source::Bytes(bytes.into()))
}
pub fn from_file(path: impl AsRef<Path>) -> Self {
Stdin(Source::File(path.as_ref().to_path_buf()))
}
pub fn from_iter_lines<I, S>(lines: I) -> Self
where
I: IntoIterator<Item = S>,
S: AsRef<str>,
{
let mut buf = Vec::new();
for line in lines {
buf.extend_from_slice(line.as_ref().as_bytes());
buf.push(b'\n');
}
Stdin(Source::Bytes(buf))
}
pub fn from_reader<R>(reader: R) -> Self
where
R: AsyncRead + Send + 'static,
{
Stdin(Source::Reader(Arc::new(AsyncMutex::new(Some(Box::pin(
reader,
))))))
}
pub fn from_lines<S>(lines: S) -> Self
where
S: Stream<Item = String> + Send + 'static,
{
Stdin(Source::Lines(Arc::new(AsyncMutex::new(Some(Box::pin(
lines,
))))))
}
pub(crate) fn is_empty(&self) -> bool {
matches!(self.0, Source::Empty)
}
pub(crate) fn stdio(&self) -> Stdio {
if self.is_empty() {
Stdio::null()
} else {
Stdio::piped()
}
}
pub(crate) async fn write_to<W>(&self, sink: &mut W) -> std::io::Result<()>
where
W: tokio::io::AsyncWrite + Unpin,
{
match &self.0 {
Source::Empty => Ok(()),
Source::Bytes(bytes) => sink.write_all(bytes).await,
Source::File(path) => {
let mut file = tokio::fs::File::open(path).await?;
tokio::io::copy(&mut file, sink).await.map(|_| ())
}
Source::Reader(reader) => {
let mut guard = reader.lock().await;
match guard.take() {
Some(mut r) => tokio::io::copy(&mut r, sink).await.map(|_| ()),
None => Ok(()), }
}
Source::Lines(lines) => {
let mut guard = lines.lock().await;
match guard.take() {
Some(mut stream) => {
while let Some(line) = stream.next().await {
sink.write_all(line.as_bytes()).await?;
sink.write_all(b"\n").await?;
}
Ok(())
}
None => Ok(()),
}
}
}
}
}
impl fmt::Debug for Stdin {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let kind = match &self.0 {
Source::Empty => "Empty",
Source::Bytes(_) => "Bytes",
Source::File(_) => "File",
Source::Reader(_) => "Reader",
Source::Lines(_) => "Lines",
};
f.debug_tuple("Stdin").field(&kind).finish()
}
}
pub struct ProcessStdin {
sink: tokio::process::ChildStdin,
}
impl ProcessStdin {
pub(crate) fn new(sink: tokio::process::ChildStdin) -> Self {
Self { sink }
}
pub async fn write(&mut self, bytes: &[u8]) -> std::io::Result<()> {
self.sink.write_all(bytes).await
}
pub async fn write_line(&mut self, line: &str) -> std::io::Result<()> {
self.sink.write_all(line.as_bytes()).await?;
self.sink.write_all(b"\n").await?;
self.sink.flush().await
}
pub async fn flush(&mut self) -> std::io::Result<()> {
self.sink.flush().await
}
pub async fn finish(mut self) -> std::io::Result<()> {
self.sink.shutdown().await
}
}
impl fmt::Debug for ProcessStdin {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ProcessStdin").finish_non_exhaustive()
}
}
#[cfg(test)]
mod tests {
use super::*;
async fn written(stdin: &Stdin) -> Vec<u8> {
let mut sink = Vec::new();
stdin.write_to(&mut sink).await.expect("write_to");
sink
}
#[tokio::test]
async fn reader_source_is_one_shot() {
let stdin = Stdin::from_reader(&b"payload"[..]);
assert_eq!(written(&stdin).await, b"payload");
assert!(
written(&stdin).await.is_empty(),
"a second run must see empty stdin — the reader was consumed"
);
}
#[tokio::test]
async fn lines_source_is_one_shot_and_newline_terminated() {
let stdin = Stdin::from_lines(tokio_stream::iter(vec![
"first".to_owned(),
"second".to_owned(),
]));
assert_eq!(written(&stdin).await, b"first\nsecond\n");
assert!(
written(&stdin).await.is_empty(),
"the stream was consumed by the first run"
);
}
#[tokio::test]
async fn iter_lines_is_reusable_and_newline_terminated() {
let stdin = Stdin::from_iter_lines(["a", "b"]);
assert_eq!(written(&stdin).await, b"a\nb\n");
assert_eq!(
written(&stdin).await,
b"a\nb\n",
"eagerly-collected lines replay on every run"
);
}
#[tokio::test]
async fn missing_file_surfaces_not_found() {
let stdin = Stdin::from_file("processkit-definitely-missing-424242.txt");
let mut sink = Vec::new();
let err = stdin
.write_to(&mut sink)
.await
.expect_err("a missing stdin file must error, not feed silence");
assert_eq!(err.kind(), std::io::ErrorKind::NotFound);
}
#[tokio::test]
async fn empty_source_writes_nothing() {
assert!(written(&Stdin::empty()).await.is_empty());
}
}