use std::io;
use bifrostlink::Port;
use bytes::BytesMut;
use tokio::io::{stdin, stdout, AsyncReadExt as _, AsyncWriteExt as _};
use tokio::select;
use tracing::{debug, error};
fn is_clean_end(e: &io::Error) -> bool {
matches!(
e.kind(),
io::ErrorKind::UnexpectedEof
| io::ErrorKind::BrokenPipe
| io::ErrorKind::ConnectionReset
| io::ErrorKind::ConnectionAborted
)
}
pub fn from_stdio() -> Port {
Port::new(|mut rx, tx| async move {
let reader = async move {
let mut stdin = stdin();
loop {
let len = match stdin.read_u32().await {
Ok(len) => len,
Err(e) => {
if is_clean_end(&e) {
debug!("stdin read ended: {e}");
} else {
error!("stdin read failed: {e}");
}
break;
}
};
let mut buf = BytesMut::zeroed(len as usize);
if let Err(e) = stdin.read_exact(&mut buf).await {
if is_clean_end(&e) {
debug!("stdin read ended: {e}");
} else {
error!("stdin read failed: {e}");
}
break;
}
if tx.send(buf.freeze()).is_err() {
break;
}
}
};
let writer = async move {
let mut stdout = stdout();
while let Some(msg) = rx.recv().await {
let len = match u32::try_from(msg.len()) {
Ok(len) => len,
Err(_) => {
error!("message shouldn't be larger than 4GB");
break;
}
};
let res = async {
stdout.write_u32(len).await?;
stdout.write_all(&msg).await?;
stdout.flush().await
}
.await;
if let Err(e) = res {
if is_clean_end(&e) {
debug!("stdout write ended: {e}");
} else {
error!("stdout write failed: {e}");
}
break;
}
}
};
select! {
_ = reader => {}
_ = writer => {}
}
})
}