use std::io;
use std::path::Path;
use std::pin::Pin;
use std::task::{Context, Poll};
use futures_core::Stream;
use tokio::io::{AsyncRead, BufReader, ReadBuf};
use tokio::net::UnixStream;
use tokio::net::unix::OwnedReadHalf;
use crate::enums::BufferChunkSize;
pub struct UdsByteStream {
reader: BufReader<OwnedReadHalf>,
eof: bool,
chunk_size: usize,
}
impl UdsByteStream {
pub async fn connect(path: impl AsRef<Path>) -> io::Result<Self> {
let stream = UnixStream::connect(path).await?;
let (read_half, _write_half) = stream.into_split();
Ok(Self::from_read_half(read_half, BufferChunkSize::Http))
}
pub fn from_read_half(read_half: OwnedReadHalf, size: BufferChunkSize) -> Self {
let chunk_size = size.chunk_size();
Self {
reader: BufReader::with_capacity(chunk_size, read_half),
eof: false,
chunk_size,
}
}
}
impl Stream for UdsByteStream {
type Item = Result<Vec<u8>, io::Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let me = self.get_mut();
if me.eof {
return Poll::Ready(None);
}
let mut buf = vec![0u8; me.chunk_size];
let mut read_buf = ReadBuf::new(&mut buf);
match Pin::new(&mut me.reader).poll_read(cx, &mut read_buf) {
Poll::Ready(Ok(())) => {
let n = read_buf.filled().len();
if n == 0 {
me.eof = true;
Poll::Ready(None)
} else {
buf.truncate(n);
Poll::Ready(Some(Ok(buf)))
}
}
Poll::Ready(Err(e)) => {
me.eof = true;
Poll::Ready(Some(Err(e)))
}
Poll::Pending => Poll::Pending,
}
}
}
impl AsyncRead for UdsByteStream {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
let me = self.get_mut();
Pin::new(&mut me.reader).poll_read(cx, buf)
}
}