1use imap_next::{Interrupt, Io, State};
2use thiserror::Error;
3use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
4use tracing::trace;
5
6pub struct Stream<S> {
7 stream: S,
8 buf: Vec<u8>,
9}
10
11impl<S> Stream<S> {
12 pub fn new(stream: S) -> Self {
13 Self {
14 stream,
15 buf: vec![0; 1024].into(),
16 }
17 }
18
19 pub fn into_inner(self) -> S {
20 self.stream
21 }
22}
23
24impl<S: AsyncRead + AsyncWrite + Unpin> Stream<S> {
25 pub async fn next<F: State>(&mut self, mut state: F) -> Result<F::Event, Error<F::Error>> {
26 let event = loop {
27 let result = state.next();
29
30 let interrupt = match result {
32 Err(interrupt) => interrupt,
33 Ok(event) => break event,
34 };
35
36 let io = match interrupt {
38 Interrupt::Io(io) => io,
39 Interrupt::Error(err) => return Err(Error::State(err)),
40 };
41
42 match io {
44 Io::Output(ref bytes) => match self.stream.write(bytes).await? {
45 0 => return Err(Error::Closed),
46 n => trace!("wrote {n}/{} bytes", bytes.len()),
47 },
48 Io::NeedMoreInput => {
49 trace!("more input needed");
50 }
51 }
52
53 match self.stream.read(&mut self.buf).await? {
54 0 => return Err(Error::Closed),
55 n => {
56 trace!("read {n}/{} bytes", self.buf.len());
57 state.enqueue_input(&self.buf[..n]);
58 }
59 }
60 };
61
62 Ok(event)
63 }
64}
65
66#[derive(Debug, Error)]
68pub enum Error<E> {
69 #[error("Stream was closed")]
74 Closed,
75 #[error(transparent)]
77 Io(#[from] std::io::Error),
78 #[error(transparent)]
80 State(E),
81}