use std::collections::VecDeque;
#[derive(Debug)]
pub struct PipeBuffer {
data: VecDeque<u8>,
capacity: usize,
write_closed: bool,
read_closed: bool,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum WriteResult {
Written(usize),
WouldBlock(usize),
BrokenPipe,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ReadResult {
Read(usize),
WouldBlock,
Eof,
}
impl PipeBuffer {
#[must_use]
pub fn new(capacity: usize) -> Self {
Self {
data: VecDeque::with_capacity(capacity),
capacity,
write_closed: false,
read_closed: false,
}
}
#[must_use]
pub fn default_size() -> Self {
Self::new(64 * 1024)
}
pub fn write(&mut self, buf: &[u8]) -> WriteResult {
if self.read_closed {
return WriteResult::BrokenPipe;
}
if buf.is_empty() {
return WriteResult::Written(0);
}
let available = self.capacity.saturating_sub(self.data.len());
if available == 0 {
return WriteResult::WouldBlock(0);
}
let to_write = buf.len().min(available);
self.data.extend(&buf[..to_write]);
if to_write < buf.len() {
WriteResult::WouldBlock(to_write)
} else {
WriteResult::Written(to_write)
}
}
const MAX_PIPE_TOTAL: usize = 64 * 1024 * 1024;
pub fn write_all(&mut self, buf: &[u8]) {
let remaining = Self::MAX_PIPE_TOTAL.saturating_sub(self.data.len());
let to_write = buf.len().min(remaining);
self.data.extend(&buf[..to_write]);
}
pub fn read_all(&mut self) -> ReadResult {
if self.data.is_empty() {
if self.write_closed {
return ReadResult::Eof;
}
return ReadResult::WouldBlock;
}
ReadResult::Read(self.data.len())
}
pub fn read(&mut self, buf: &mut [u8]) -> ReadResult {
if buf.is_empty() {
return ReadResult::Read(0);
}
if self.data.is_empty() {
if self.write_closed {
return ReadResult::Eof;
}
return ReadResult::WouldBlock;
}
let to_read = buf.len().min(self.data.len());
for slot in &mut buf[..to_read] {
*slot = self
.data
.pop_front()
.expect("pipe read length exceeded available data");
}
ReadResult::Read(to_read)
}
pub fn drain(&mut self) -> Vec<u8> {
self.data.drain(..).collect()
}
pub fn close_write(&mut self) {
self.write_closed = true;
}
pub fn close_read(&mut self) {
self.read_closed = true;
}
#[must_use]
pub fn is_write_closed(&self) -> bool {
self.write_closed
}
#[must_use]
pub fn is_read_closed(&self) -> bool {
self.read_closed
}
#[must_use]
pub fn has_data(&self) -> bool {
!self.data.is_empty()
}
#[must_use]
pub fn has_space(&self) -> bool {
self.data.len() < self.capacity
}
#[must_use]
pub fn len(&self) -> usize {
self.data.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.data.is_empty()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn write_and_drain() {
let mut pipe = PipeBuffer::new(1024);
assert!(matches!(pipe.write(b"hello"), WriteResult::Written(5)));
assert_eq!(pipe.len(), 5);
let data = pipe.drain();
assert_eq!(data, b"hello");
assert!(pipe.is_empty());
}
#[test]
fn write_would_block_at_capacity() {
let mut pipe = PipeBuffer::new(4);
assert!(matches!(pipe.write(b"abcd"), WriteResult::Written(4)));
assert!(matches!(pipe.write(b"x"), WriteResult::WouldBlock(0)));
}
#[test]
fn partial_write() {
let mut pipe = PipeBuffer::new(4);
assert!(matches!(pipe.write(b"abcdef"), WriteResult::WouldBlock(4)));
assert_eq!(pipe.len(), 4);
}
#[test]
fn read_eof_after_close() {
let mut pipe = PipeBuffer::new(1024);
pipe.write_all(b"data");
pipe.close_write();
assert!(matches!(pipe.read_all(), ReadResult::Read(4)));
pipe.drain();
assert!(matches!(pipe.read_all(), ReadResult::Eof));
}
#[test]
fn read_would_block_when_empty_and_open() {
let mut pipe = PipeBuffer::new(1024);
assert!(matches!(pipe.read_all(), ReadResult::WouldBlock));
}
#[test]
fn write_all_ignores_capacity() {
let mut pipe = PipeBuffer::new(4);
pipe.write_all(b"hello world"); assert_eq!(pipe.len(), 11);
}
#[test]
fn incremental_read_drains_prefix_only() {
let mut pipe = PipeBuffer::new(1024);
pipe.write_all(b"hello");
let mut buf = [0u8; 2];
assert!(matches!(pipe.read(&mut buf), ReadResult::Read(2)));
assert_eq!(&buf, b"he");
assert_eq!(pipe.drain(), b"llo");
}
#[test]
fn close_read_makes_future_writes_broken_pipe() {
let mut pipe = PipeBuffer::new(1024);
pipe.close_read();
assert!(matches!(pipe.write(b"hello"), WriteResult::BrokenPipe));
}
#[test]
fn reader_drain_unblocks_writer() {
let mut pipe = PipeBuffer::new(4);
assert!(matches!(pipe.write(b"abcd"), WriteResult::Written(4)));
assert!(matches!(pipe.write(b"e"), WriteResult::WouldBlock(0)));
let mut buf = [0u8; 2];
assert!(matches!(pipe.read(&mut buf), ReadResult::Read(2)));
assert_eq!(&buf, b"ab");
assert!(matches!(pipe.write(b"ef"), WriteResult::Written(2)));
assert_eq!(pipe.drain(), b"cdef");
}
#[test]
fn bounded_multi_stage_chain_makes_incremental_progress() {
let mut first = PipeBuffer::new(1);
let mut second = PipeBuffer::new(1);
let mut delivered = Vec::new();
for byte in *b"abc" {
assert!(matches!(first.write(&[byte]), WriteResult::Written(1)));
assert!(matches!(first.write(b"!"), WriteResult::WouldBlock(0)));
let mut stage_buf = [0u8; 1];
assert!(matches!(first.read(&mut stage_buf), ReadResult::Read(1)));
assert!(matches!(second.write(&stage_buf), WriteResult::Written(1)));
assert!(matches!(second.write(b"!"), WriteResult::WouldBlock(0)));
let mut out = [0u8; 1];
assert!(matches!(second.read(&mut out), ReadResult::Read(1)));
delivered.push(out[0]);
}
assert_eq!(delivered, b"abc");
assert!(first.is_empty());
assert!(second.is_empty());
}
}