1use std::io::ErrorKind;
2
3use imap_next::{Interrupt, Io, State};
4use thiserror::Error;
5use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
6use tracing::trace;
7
8pub struct Stream<S> {
9 stream: S,
10 buf: Vec<u8>,
11}
12
13impl<S> Stream<S> {
14 pub fn new(stream: S) -> Self {
15 Self {
16 stream,
17 buf: vec![0; 1024].into(),
18 }
19 }
20
21 pub fn into_inner(self) -> S {
22 self.stream
23 }
24}
25
26impl<S: AsyncRead + AsyncWrite + Unpin> Stream<S> {
27 pub async fn next<F: State>(&mut self, mut state: F) -> Result<F::Event, Error<F::Error>> {
28 let event = loop {
29 let result = state.next();
31
32 let interrupt = match result {
34 Err(interrupt) => interrupt,
35 Ok(event) => break event,
36 };
37
38 let io = match interrupt {
40 Interrupt::Io(io) => io,
41 Interrupt::Error(err) => return Err(Error::State(err)),
42 };
43
44 match io {
46 Io::Output(ref bytes) => {
47 match self.stream.write_all(bytes).await {
48 Ok(()) => trace!("wrote {} bytes", bytes.len()),
49 Err(e) if e.kind() == ErrorKind::WriteZero => return Err(Error::Closed),
50 Err(e) => return Err(e.into()),
51 }
52 self.stream.flush().await?;
53 }
54 Io::NeedMoreInput => {
55 trace!("more input needed");
56 }
57 }
58
59 match self.stream.read(&mut self.buf).await? {
60 0 => return Err(Error::Closed),
61 n => {
62 trace!("read {n}/{} bytes", self.buf.len());
63 state.enqueue_input(&self.buf[..n]);
64 }
65 }
66 };
67
68 Ok(event)
69 }
70}
71
72#[derive(Debug, Error)]
74pub enum Error<E> {
75 #[error("Stream was closed")]
80 Closed,
81 #[error(transparent)]
83 Io(#[from] std::io::Error),
84 #[error(transparent)]
86 State(E),
87}