use std::fmt;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::process::Stdio;
use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncWriteExt};
use tokio::sync::Mutex as AsyncMutex;
use tokio_stream::{Stream, StreamExt};
type SharedReader = Arc<AsyncMutex<Option<Pin<Box<dyn AsyncRead + Send>>>>>;
type SharedLines = Arc<AsyncMutex<Option<Pin<Box<dyn Stream<Item = String> + Send>>>>>;
#[derive(Clone)]
pub struct Stdin(Source);
#[derive(Clone)]
enum Source {
Empty,
Bytes(Vec<u8>),
File(PathBuf),
Reader(SharedReader),
Lines(SharedLines),
}
impl Stdin {
pub fn empty() -> Self {
Stdin(Source::Empty)
}
pub fn from_string(text: impl Into<String>) -> Self {
Stdin(Source::Bytes(text.into().into_bytes()))
}
pub fn from_bytes(bytes: impl Into<Vec<u8>>) -> Self {
Stdin(Source::Bytes(bytes.into()))
}
pub fn from_file(path: impl AsRef<Path>) -> Self {
Stdin(Source::File(path.as_ref().to_path_buf()))
}
pub fn from_iter_lines<I, S>(lines: I) -> Self
where
I: IntoIterator<Item = S>,
S: AsRef<str>,
{
let mut buf = Vec::new();
for line in lines {
buf.extend_from_slice(line.as_ref().as_bytes());
buf.push(b'\n');
}
Stdin(Source::Bytes(buf))
}
pub fn from_reader<R>(reader: R) -> Self
where
R: AsyncRead + Send + 'static,
{
Stdin(Source::Reader(Arc::new(AsyncMutex::new(Some(Box::pin(
reader,
))))))
}
pub fn from_lines<S>(lines: S) -> Self
where
S: Stream<Item = String> + Send + 'static,
{
Stdin(Source::Lines(Arc::new(AsyncMutex::new(Some(Box::pin(
lines,
))))))
}
pub(crate) fn is_empty(&self) -> bool {
matches!(self.0, Source::Empty)
}
pub(crate) fn is_one_shot(&self) -> bool {
matches!(self.0, Source::Reader(_) | Source::Lines(_))
}
#[cfg(feature = "record")]
pub(crate) fn content_digest(&self) -> u64 {
const OFFSET: u64 = 0xcbf2_9ce4_8422_2325;
const PRIME: u64 = 0x0000_0100_0000_01b3;
fn mix(mut h: u64, bytes: &[u8]) -> u64 {
for &b in bytes {
h ^= b as u64;
h = h.wrapping_mul(PRIME);
}
h
}
let (tag, payload): (u8, &[u8]) = match &self.0 {
Source::Empty => (0, &[]),
Source::Bytes(b) => (1, b),
Source::File(p) => (2, p.as_os_str().as_encoded_bytes()),
Source::Reader(_) | Source::Lines(_) => (3, b"<stream>"),
};
mix(mix(OFFSET, &[tag]), payload)
}
pub(crate) fn stdio(&self) -> Stdio {
if self.is_empty() {
Stdio::null()
} else {
Stdio::piped()
}
}
pub(crate) async fn take_for_run(&self) -> Result<TakenStdin, OneShotConsumed> {
Ok(match &self.0 {
Source::Empty => TakenStdin::Empty,
Source::Bytes(bytes) => TakenStdin::Bytes(bytes.clone()),
Source::File(path) => TakenStdin::File(path.clone()),
Source::Reader(reader) => match reader.lock().await.take() {
Some(r) => TakenStdin::Reader(r),
None => return Err(OneShotConsumed),
},
Source::Lines(lines) => match lines.lock().await.take() {
Some(s) => TakenStdin::Lines(s),
None => return Err(OneShotConsumed),
},
})
}
}
#[derive(Debug)]
pub(crate) struct OneShotConsumed;
pub(crate) enum TakenStdin {
Empty,
Bytes(Vec<u8>),
File(PathBuf),
Reader(Pin<Box<dyn AsyncRead + Send>>),
Lines(Pin<Box<dyn Stream<Item = String> + Send>>),
}
impl TakenStdin {
pub(crate) fn is_empty(&self) -> bool {
matches!(self, TakenStdin::Empty)
}
pub(crate) async fn write_to<W>(self, sink: &mut W) -> std::io::Result<()>
where
W: tokio::io::AsyncWrite + Unpin,
{
match self {
TakenStdin::Empty => Ok(()),
TakenStdin::Bytes(bytes) => sink.write_all(&bytes).await,
TakenStdin::File(path) => {
let mut file = tokio::fs::File::open(&path).await?;
tokio::io::copy(&mut file, sink).await.map(|_| ())
}
TakenStdin::Reader(mut r) => tokio::io::copy(&mut r, sink).await.map(|_| ()),
TakenStdin::Lines(mut stream) => {
while let Some(line) = stream.next().await {
sink.write_all(line.as_bytes()).await?;
sink.write_all(b"\n").await?;
}
Ok(())
}
}
}
}
impl fmt::Debug for Stdin {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let kind = match &self.0 {
Source::Empty => "Empty",
Source::Bytes(_) => "Bytes",
Source::File(_) => "File",
Source::Reader(_) => "Reader",
Source::Lines(_) => "Lines",
};
f.debug_tuple("Stdin").field(&kind).finish()
}
}
pub struct ProcessStdin {
sink: tokio::process::ChildStdin,
}
impl ProcessStdin {
pub(crate) fn new(sink: tokio::process::ChildStdin) -> Self {
Self { sink }
}
pub async fn write(&mut self, bytes: &[u8]) -> std::io::Result<()> {
self.sink.write_all(bytes).await
}
pub async fn write_line(&mut self, line: &str) -> std::io::Result<()> {
self.sink.write_all(line.as_bytes()).await?;
self.sink.write_all(b"\n").await?;
self.sink.flush().await
}
pub async fn flush(&mut self) -> std::io::Result<()> {
self.sink.flush().await
}
pub async fn finish(mut self) -> std::io::Result<()> {
self.sink.shutdown().await
}
}
impl fmt::Debug for ProcessStdin {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ProcessStdin").finish_non_exhaustive()
}
}
#[cfg(test)]
mod tests {
use super::*;
async fn written(stdin: &Stdin) -> Vec<u8> {
let mut sink = Vec::new();
stdin
.take_for_run()
.await
.unwrap_or_else(|_| panic!("source already consumed"))
.write_to(&mut sink)
.await
.expect("write_to");
sink
}
#[tokio::test]
async fn reader_source_is_one_shot() {
let stdin = Stdin::from_reader(&b"payload"[..]);
assert_eq!(written(&stdin).await, b"payload");
assert!(
stdin.take_for_run().await.is_err(),
"a consumed one-shot reader reports OneShotConsumed, not empty"
);
}
#[tokio::test]
async fn is_one_shot_classifies_streaming_sources() {
assert!(Stdin::from_reader(&b"x"[..]).is_one_shot());
assert!(Stdin::from_lines(tokio_stream::iter(vec!["x".to_owned()])).is_one_shot());
assert!(!Stdin::from_bytes(b"abc".to_vec()).is_one_shot());
assert!(!Stdin::from_iter_lines(["a", "b"]).is_one_shot());
assert!(!Stdin::from_string("x").is_one_shot());
assert!(!Stdin::empty().is_one_shot());
}
#[tokio::test]
async fn lines_source_is_one_shot_and_newline_terminated() {
let stdin = Stdin::from_lines(tokio_stream::iter(vec![
"first".to_owned(),
"second".to_owned(),
]));
assert_eq!(written(&stdin).await, b"first\nsecond\n");
assert!(
stdin.take_for_run().await.is_err(),
"the stream was consumed by the first run"
);
}
#[tokio::test]
async fn iter_lines_is_reusable_and_newline_terminated() {
let stdin = Stdin::from_iter_lines(["a", "b"]);
assert_eq!(written(&stdin).await, b"a\nb\n");
assert_eq!(
written(&stdin).await,
b"a\nb\n",
"eagerly-collected lines replay on every run"
);
}
#[tokio::test]
async fn iter_lines_appends_one_newline_per_item_verbatim() {
assert_eq!(
written(&Stdin::from_iter_lines(["a\n", "b"])).await,
b"a\n\nb\n"
);
assert_eq!(
written(&Stdin::from_iter_lines(Vec::<&str>::new())).await,
b""
);
}
#[tokio::test]
async fn missing_file_surfaces_not_found() {
let stdin = Stdin::from_file("processkit-definitely-missing-424242.txt");
let mut sink = Vec::new();
let err = stdin
.take_for_run()
.await
.unwrap_or_else(|_| panic!("file source is re-runnable"))
.write_to(&mut sink)
.await
.expect_err("a missing stdin file must error, not feed silence");
assert_eq!(err.kind(), std::io::ErrorKind::NotFound);
}
#[tokio::test]
async fn empty_source_writes_nothing() {
assert!(written(&Stdin::empty()).await.is_empty());
}
#[tokio::test]
async fn concurrent_reuse_of_a_one_shot_source_fails_the_loser_atomically() {
use std::time::Duration;
let (_tx, rx) = tokio::io::duplex(64);
let stdin = Stdin::from_reader(rx);
let stdin2 = stdin.clone();
let run1 = tokio::spawn(async move {
let taken = stdin.take_for_run().await.expect("run 1 wins the take");
let mut sink = Vec::new();
let _ = taken.write_to(&mut sink).await;
});
tokio::time::sleep(Duration::from_millis(50)).await;
let second = tokio::time::timeout(Duration::from_secs(2), stdin2.take_for_run()).await;
let second = second.expect("the loser must not block on the slow winner's copy");
assert!(
second.is_err(),
"the losing concurrent run sees the one-shot source already taken"
);
run1.abort();
}
}