imap_client/
stream.rs

1use imap_next::{Interrupt, Io, State};
2use thiserror::Error;
3use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
4use tracing::trace;
5
6pub struct Stream<S> {
7    stream: S,
8    buf: Vec<u8>,
9}
10
11impl<S> Stream<S> {
12    pub fn new(stream: S) -> Self {
13        Self {
14            stream,
15            buf: vec![0; 1024].into(),
16        }
17    }
18
19    pub fn into_inner(self) -> S {
20        self.stream
21    }
22}
23
24impl<S: AsyncRead + AsyncWrite + Unpin> Stream<S> {
25    pub async fn next<F: State>(&mut self, mut state: F) -> Result<F::Event, Error<F::Error>> {
26        let event = loop {
27            // Progress the client/server
28            let result = state.next();
29
30            // Return events immediately without doing IO
31            let interrupt = match result {
32                Err(interrupt) => interrupt,
33                Ok(event) => break event,
34            };
35
36            // Return errors immediately without doing IO
37            let io = match interrupt {
38                Interrupt::Io(io) => io,
39                Interrupt::Error(err) => return Err(Error::State(err)),
40            };
41
42            // Handle the output bytes from the client/server
43            match io {
44                Io::Output(ref bytes) => match self.stream.write(bytes).await? {
45                    0 => return Err(Error::Closed),
46                    n => trace!("wrote {n}/{} bytes", bytes.len()),
47                },
48                Io::NeedMoreInput => {
49                    trace!("more input needed");
50                }
51            }
52
53            match self.stream.read(&mut self.buf).await? {
54                0 => return Err(Error::Closed),
55                n => {
56                    trace!("read {n}/{} bytes", self.buf.len());
57                    state.enqueue_input(&self.buf[..n]);
58                }
59            }
60        };
61
62        Ok(event)
63    }
64}
65
66/// Error during reading into or writing from a stream.
67#[derive(Debug, Error)]
68pub enum Error<E> {
69    /// Operation failed because stream is closed.
70    ///
71    /// We detect this by checking if the read or written byte count is 0. Whether the stream is
72    /// closed indefinitely or temporarily depends on the actual stream implementation.
73    #[error("Stream was closed")]
74    Closed,
75    /// An I/O error occurred in the underlying stream.
76    #[error(transparent)]
77    Io(#[from] std::io::Error),
78    /// An error occurred while progressing the state.
79    #[error(transparent)]
80    State(E),
81}