use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use futures_io::{AsyncBufRead, AsyncRead};
use futures_lite::ready;
use crate::{decode, BandRef, PacketLineRef, StreamingPeekableIter, TextRef, U16_HEX_BYTES};
type ReadLineResult<'a> = Option<std::io::Result<Result<PacketLineRef<'a>, decode::Error>>>;
pub struct WithSidebands<'a, T, F>
where
T: AsyncRead,
{
state: State<'a, T>,
handle_progress: Option<F>,
pos: usize,
cap: usize,
}
impl<'a, T, F> Drop for WithSidebands<'a, T, F>
where
T: AsyncRead,
{
fn drop(&mut self) {
if let State::Idle { ref mut parent } = self.state {
parent
.as_mut()
.expect("parent is always available if we are idle")
.reset();
}
}
}
impl<'a, T> WithSidebands<'a, T, fn(bool, &[u8])>
where
T: AsyncRead,
{
pub fn new(parent: &'a mut StreamingPeekableIter<T>) -> Self {
WithSidebands {
state: State::Idle { parent: Some(parent) },
handle_progress: None,
pos: 0,
cap: 0,
}
}
}
enum State<'a, T> {
Idle {
parent: Option<&'a mut StreamingPeekableIter<T>>,
},
ReadLine {
read_line: Pin<Box<dyn Future<Output = ReadLineResult<'a>> + 'a>>,
parent_inactive: Option<*mut StreamingPeekableIter<T>>,
},
}
#[allow(unsafe_code, clippy::non_send_fields_in_send_ty)]
unsafe impl<'a, T> Send for State<'a, T> where T: Send {}
#[cfg(test)]
mod tests {
use super::*;
fn receiver<T: Send>(_i: T) {}
#[test]
fn streaming_peekable_iter_is_send() {
receiver(StreamingPeekableIter::new(Vec::<u8>::new(), &[]));
}
#[test]
fn state_is_send() {
let mut s = StreamingPeekableIter::new(Vec::<u8>::new(), &[]);
receiver(State::Idle { parent: Some(&mut s) });
}
}
impl<'a, T, F> WithSidebands<'a, T, F>
where
T: AsyncRead + Unpin,
F: FnMut(bool, &[u8]) + Unpin,
{
pub fn with_progress_handler(parent: &'a mut StreamingPeekableIter<T>, handle_progress: F) -> Self {
WithSidebands {
state: State::Idle { parent: Some(parent) },
handle_progress: Some(handle_progress),
pos: 0,
cap: 0,
}
}
pub fn without_progress_handler(parent: &'a mut StreamingPeekableIter<T>) -> Self {
WithSidebands {
state: State::Idle { parent: Some(parent) },
handle_progress: None,
pos: 0,
cap: 0,
}
}
pub fn reset_with(&mut self, delimiters: &'static [PacketLineRef<'static>]) {
if let State::Idle { ref mut parent } = self.state {
parent
.as_mut()
.expect("parent is always available if we are idle")
.reset_with(delimiters)
}
}
pub fn stopped_at(&self) -> Option<PacketLineRef<'static>> {
match self.state {
State::Idle { ref parent } => {
parent
.as_ref()
.expect("parent is always available if we are idle")
.stopped_at
}
_ => None,
}
}
pub fn set_progress_handler(&mut self, handle_progress: Option<F>) {
self.handle_progress = handle_progress;
}
pub async fn peek_data_line(&mut self) -> Option<std::io::Result<Result<&[u8], decode::Error>>> {
match self.state {
State::Idle { ref mut parent } => match parent
.as_mut()
.expect("parent is always available if we are idle")
.peek_line()
.await
{
Some(Ok(Ok(PacketLineRef::Data(line)))) => Some(Ok(Ok(line))),
Some(Ok(Err(err))) => Some(Ok(Err(err))),
Some(Err(err)) => Some(Err(err)),
_ => None,
},
_ => None,
}
}
pub fn read_line<'b>(&'b mut self, buf: &'b mut String) -> ReadLineFuture<'a, 'b, T, F> {
ReadLineFuture { parent: self, buf }
}
pub async fn read_data_line(&mut self) -> Option<std::io::Result<Result<PacketLineRef<'_>, decode::Error>>> {
match &mut self.state {
State::Idle { parent: Some(parent) } => {
assert_eq!(
self.cap, 0,
"we don't support partial buffers right now - read-line must be used consistently"
);
parent.read_line().await
}
_ => None,
}
}
}
pub struct ReadDataLineFuture<'a, 'b, T: AsyncRead, F> {
parent: &'b mut WithSidebands<'a, T, F>,
buf: &'b mut Vec<u8>,
}
impl<'a, 'b, T, F> Future for ReadDataLineFuture<'a, 'b, T, F>
where
T: AsyncRead + Unpin,
F: FnMut(bool, &[u8]) + Unpin,
{
type Output = std::io::Result<usize>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
assert_eq!(
self.parent.cap, 0,
"we don't support partial buffers right now - read-line must be used consistently"
);
let Self { buf, parent } = &mut *self;
let line = ready!(Pin::new(parent).poll_fill_buf(cx))?;
buf.clear();
buf.extend_from_slice(line);
let bytes = line.len();
self.parent.cap = 0;
Poll::Ready(Ok(bytes))
}
}
pub struct ReadLineFuture<'a, 'b, T: AsyncRead, F> {
parent: &'b mut WithSidebands<'a, T, F>,
buf: &'b mut String,
}
impl<'a, 'b, T, F> Future for ReadLineFuture<'a, 'b, T, F>
where
T: AsyncRead + Unpin,
F: FnMut(bool, &[u8]) + Unpin,
{
type Output = std::io::Result<usize>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
assert_eq!(
self.parent.cap, 0,
"we don't support partial buffers right now - read-line must be used consistently"
);
let Self { buf, parent } = &mut *self;
let line = std::str::from_utf8(ready!(Pin::new(parent).poll_fill_buf(cx))?)
.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))?;
buf.clear();
buf.push_str(line);
let bytes = line.len();
self.parent.cap = 0;
Poll::Ready(Ok(bytes))
}
}
impl<'a, T, F> AsyncBufRead for WithSidebands<'a, T, F>
where
T: AsyncRead + Unpin,
F: FnMut(bool, &[u8]) + Unpin,
{
fn poll_fill_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<&[u8]>> {
use std::io;
use futures_lite::FutureExt;
{
let this = self.as_mut().get_mut();
if this.pos >= this.cap {
let (ofs, cap) = loop {
match this.state {
State::Idle { ref mut parent } => {
let parent = parent.take().expect("parent to be present here");
let inactive = parent as *mut _;
this.state = State::ReadLine {
read_line: parent.read_line().boxed_local(),
parent_inactive: Some(inactive),
}
}
State::ReadLine {
ref mut read_line,
ref mut parent_inactive,
} => {
let line = ready!(read_line.poll(cx));
this.state = {
let parent = parent_inactive.take().expect("parent pointer always set");
#[allow(unsafe_code)]
let parent = unsafe { &mut *parent };
State::Idle { parent: Some(parent) }
};
let line = match line {
Some(line) => line?.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?,
None => break (0, 0),
};
match this.handle_progress.as_mut() {
Some(handle_progress) => {
let band = line
.decode_band()
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;
const ENCODED_BAND: usize = 1;
match band {
BandRef::Data(d) => {
if d.is_empty() {
continue;
}
break (U16_HEX_BYTES + ENCODED_BAND, d.len());
}
BandRef::Progress(d) => {
let text = TextRef::from(d).0;
handle_progress(false, text);
}
BandRef::Error(d) => {
let text = TextRef::from(d).0;
handle_progress(true, text);
}
};
}
None => {
break match line.as_slice() {
Some(d) => (U16_HEX_BYTES, d.len()),
None => {
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"encountered non-data line in a data-line only context",
)))
}
}
}
}
}
}
};
this.cap = cap + ofs;
this.pos = ofs;
}
}
let range = self.pos..self.cap;
match &self.get_mut().state {
State::Idle { parent } => Poll::Ready(Ok(&parent.as_ref().expect("parent always available").buf[range])),
State::ReadLine { .. } => unreachable!("at least in theory"),
}
}
fn consume(self: Pin<&mut Self>, amt: usize) {
let this = self.get_mut();
this.pos = std::cmp::min(this.pos + amt, this.cap);
}
}
impl<'a, T, F> AsyncRead for WithSidebands<'a, T, F>
where
T: AsyncRead + Unpin,
F: FnMut(bool, &[u8]) + Unpin,
{
fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<std::io::Result<usize>> {
let nread = {
use std::io::Read;
let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?;
rem.read(buf)?
};
self.consume(nread);
Poll::Ready(Ok(nread))
}
}