use std::collections::VecDeque;
use std::sync::{Arc, Mutex};
use std::task::Poll;
use atomic_waker::AtomicWaker;
use super::{AsyncBytesRead, AsyncBytesWrite};
struct PipeInner {
buf: Mutex<VecDeque<u8>>,
read_waker: AtomicWaker,
closed: Mutex<bool>,
}
impl PipeInner {
fn new() -> Self {
Self {
buf: Mutex::new(VecDeque::new()),
read_waker: AtomicWaker::new(),
closed: Mutex::new(false),
}
}
}
pub struct PipeReader {
inner: Arc<PipeInner>,
}
pub struct PipeWriter {
inner: Arc<PipeInner>,
}
impl Drop for PipeWriter {
fn drop(&mut self) {
*self.inner.closed.lock().unwrap() = true;
self.inner.read_waker.wake();
}
}
#[derive(Debug)]
pub enum PipeError {
Closed,
}
impl core::fmt::Display for PipeError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
match self {
PipeError::Closed => write!(f, "pipe closed"),
}
}
}
impl std::error::Error for PipeError {}
pub fn pipe() -> (PipeReader, PipeWriter) {
let inner = Arc::new(PipeInner::new());
(
PipeReader {
inner: inner.clone(),
},
PipeWriter { inner },
)
}
pub fn duplex() -> ((PipeReader, PipeWriter), (PipeReader, PipeWriter)) {
let (r_b, w_a) = pipe(); let (r_a, w_b) = pipe(); ((r_a, w_a), (r_b, w_b))
}
impl AsyncBytesRead for PipeReader {
type Error = PipeError;
async fn read_exact(&mut self, buf: &mut [u8]) -> Result<(), PipeError> {
let mut filled = 0;
core::future::poll_fn(|cx| {
loop {
if filled == buf.len() {
return Poll::Ready(Ok(()));
}
let mut guard = self.inner.buf.lock().unwrap();
while filled < buf.len() {
match guard.pop_front() {
Some(b) => {
buf[filled] = b;
filled += 1;
}
None => break,
}
}
if filled == buf.len() {
return Poll::Ready(Ok(()));
}
drop(guard);
self.inner.read_waker.register(cx.waker());
let guard = self.inner.buf.lock().unwrap();
if !guard.is_empty() {
drop(guard);
continue;
}
if *self.inner.closed.lock().unwrap() {
return Poll::Ready(Err(PipeError::Closed));
}
return Poll::Pending;
}
})
.await
}
}
impl AsyncBytesWrite for PipeWriter {
type Error = PipeError;
async fn write_all(&mut self, buf: &[u8]) -> Result<(), PipeError> {
let mut guard = self.inner.buf.lock().unwrap();
guard.extend(buf.iter().copied());
drop(guard);
self.inner.read_waker.wake();
Ok(())
}
async fn flush(&mut self) -> Result<(), PipeError> {
Ok(())
}
}