use std::io;
use std::os::unix::io::{AsRawFd, RawFd};
use std::pin::Pin;
use std::task::{Context, Poll};
use crate::driver::buffers::ProvidedBuf;
use crate::driver::{self, Action};
const DEFAULT_BUF_SIZE: usize = 4096;
pub(crate) struct Stream<T> {
inner: Inner,
io: T,
}
impl<T> Stream<T> {
pub fn get_ref(&self) -> &T {
&self.io
}
}
impl<T: AsRawFd> Stream<T> {
pub(crate) fn new(io: T) -> Stream<T> {
Stream {
io,
inner: Inner {
read_pos: 0,
rd: ProvidedBuf::default(),
read: Read::Idle,
wr: Vec::with_capacity(DEFAULT_BUF_SIZE),
write: Write::Idle,
},
}
}
pub(crate) fn poll_read(
&mut self,
cx: &mut Context,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
let src = ready!(self.inner.poll_fill_buf(cx, self.io.as_raw_fd()))?;
let n = buf.len().min(src.len());
buf[..n].copy_from_slice(&src[..n]);
self.inner.consume(n);
Poll::Ready(Ok(n))
}
pub(crate) fn poll_fill_buf(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
self.inner.poll_fill_buf(cx, self.io.as_raw_fd())
}
pub(crate) fn consume(&mut self, amt: usize) {
self.inner.consume(amt)
}
pub(crate) fn poll_write(&mut self, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
self.inner.poll_write(cx, buf, self.io.as_raw_fd())
}
}
struct Inner {
rd: ProvidedBuf,
read_pos: usize,
read: Read,
wr: Vec<u8>,
write: Write,
}
enum Write {
Idle,
Writing(Action<driver::Write>),
}
enum Read {
Idle,
Reading(Action<driver::Read>),
}
impl Inner {
fn poll_write(&mut self, cx: &mut Context, buf: &[u8], fd: RawFd) -> Poll<io::Result<usize>> {
loop {
match &mut self.write {
Write::Idle => {
let size = buf.len().min(self.wr.capacity());
self.wr.extend_from_slice(&buf[..size]);
let action = Action::write(fd, &self.wr[..size])?;
self.write = Write::Writing(action);
}
Write::Writing(action) => {
let n = ready!(Pin::new(action).poll_write(cx))?;
self.write = Write::Idle;
self.wr.clear();
return Poll::Ready(Ok(n));
}
}
}
}
fn poll_fill_buf(&mut self, cx: &mut Context, fd: RawFd) -> Poll<io::Result<&[u8]>> {
loop {
match &mut self.read {
Read::Idle => {
if !self.rd[self.read_pos..].is_empty() {
return Poll::Ready(Ok(&self.rd[self.read_pos..]));
}
self.read_pos = 0;
self.rd = ProvidedBuf::default();
let action = Action::read(fd, DEFAULT_BUF_SIZE as u32)?;
self.read = Read::Reading(action);
}
Read::Reading(action) => {
self.rd = ready!(Pin::new(action).poll_read(cx))?;
if self.rd.is_empty() {
return Poll::Ready(Ok(&self.rd[self.read_pos..]));
}
self.read = Read::Idle;
}
}
}
}
fn consume(&mut self, amt: usize) {
self.read_pos += amt;
}
}