use crate::{AsyncStreamRead, AsyncStreamWrite};
use rustix_uring::Errno;
use std::collections::VecDeque;
fn would_block() -> std::io::Error {
std::io::Error::new(
std::io::ErrorKind::WouldBlock,
"SyncBufferedStream lacks data.",
)
}
pub struct SyncBufferedStream {
read_buff: VecDeque<u8>,
write_buff: VecDeque<u8>,
eof: bool,
}
impl SyncBufferedStream {
pub fn new() -> Self {
Self {
read_buff: VecDeque::new(),
write_buff: VecDeque::new(),
eof: false,
}
}
}
impl std::io::Read for SyncBufferedStream {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
if self.eof {
return Ok(0);
}
assert_ne!(buf.len(), 0, "Buffer must not be empty");
let (slice1, slice2) = self.read_buff.as_slices();
let slice = if slice1.is_empty() { slice2 } else { slice1 };
let bytes_read = buf.len().min(slice.len());
if bytes_read == 0 {
return Err(would_block());
} else {
buf[..bytes_read].copy_from_slice(&slice[..bytes_read]);
self.read_buff.drain(..bytes_read);
}
Ok(bytes_read)
}
}
impl std::io::Write for SyncBufferedStream {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.write_buff.extend(buf);
Ok(buf.len())
}
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}
impl SyncBufferedStream {
pub async fn fill_read_buff(
&mut self,
s: &mut impl AsyncStreamRead,
deadline: Option<std::time::Instant>,
) -> Result<(), Errno> {
let mut buf = [0u8; 1024]; let bytes_read = s.try_read(&mut buf, deadline).await?;
if bytes_read == 0 {
self.eof = true; } else {
self.read_buff.extend(&buf[..bytes_read]);
}
Ok(())
}
pub async fn flush_write_buff(
&mut self,
s: &mut impl AsyncStreamWrite,
deadline: Option<std::time::Instant>,
) -> Result<usize, Errno> {
let bytes_to_write = self.write_buff.len();
if bytes_to_write == 0 {
return Ok(0); }
let (slice1, _slice2) = self.write_buff.as_slices();
assert_eq!(slice1.len(), self.write_buff.len(), "Slice length mismatch");
s.write(slice1, deadline).await?;
self.write_buff.clear();
Ok(bytes_to_write)
}
}
#[cfg(test)]
mod tests {
use std::io::{Read, Write};
use crate::BufferStream;
use super::*;
#[crate::test]
async fn test_sync_buffered_stream() {
let data = b"Hello, world!";
let data2 = b"Hello, world!2";
let mut s = BufferStream::new();
let mut buff_s = SyncBufferedStream::new();
let mut read_buf = [0u8; 1024];
{
s.write(data, None).await.unwrap();
buff_s.fill_read_buff(&mut s, None).await.unwrap();
let bytes_read = buff_s.read(&mut read_buf).unwrap();
assert_eq!(bytes_read, data.len());
assert_eq!(&read_buf[..bytes_read], data);
let e = buff_s.read(&mut read_buf).unwrap_err();
assert_eq!(e.kind(), std::io::ErrorKind::WouldBlock);
}
for i in 1..data2.len() {
let chunk = &data2[0..i];
buff_s.write_all(chunk).unwrap();
buff_s.flush_write_buff(&mut s, None).await.unwrap();
buff_s.fill_read_buff(&mut s, None).await.unwrap();
let bytes_read = buff_s.read(&mut read_buf).unwrap();
assert_eq!(bytes_read, chunk.len());
assert_eq!(&read_buf[..bytes_read], chunk);
}
}
}