Skip to main content

imap_client/
stream.rs

1use std::io::ErrorKind;
2
3use imap_next::{Interrupt, Io, State};
4use thiserror::Error;
5use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
6use tracing::trace;
7
8pub struct Stream<S> {
9    stream: S,
10    buf: Vec<u8>,
11}
12
13impl<S> Stream<S> {
14    pub fn new(stream: S) -> Self {
15        Self {
16            stream,
17            buf: vec![0; 1024].into(),
18        }
19    }
20
21    pub fn into_inner(self) -> S {
22        self.stream
23    }
24}
25
26impl<S: AsyncRead + AsyncWrite + Unpin> Stream<S> {
27    pub async fn next<F: State>(&mut self, mut state: F) -> Result<F::Event, Error<F::Error>> {
28        let event = loop {
29            // Progress the client/server
30            let result = state.next();
31
32            // Return events immediately without doing IO
33            let interrupt = match result {
34                Err(interrupt) => interrupt,
35                Ok(event) => break event,
36            };
37
38            // Return errors immediately without doing IO
39            let io = match interrupt {
40                Interrupt::Io(io) => io,
41                Interrupt::Error(err) => return Err(Error::State(err)),
42            };
43
44            // Handle the output bytes from the client/server
45            match io {
46                Io::Output(ref bytes) => {
47                    match self.stream.write_all(bytes).await {
48                        Ok(()) => trace!("wrote {} bytes", bytes.len()),
49                        Err(e) if e.kind() == ErrorKind::WriteZero => return Err(Error::Closed),
50                        Err(e) => return Err(e.into()),
51                    }
52                    self.stream.flush().await?;
53                }
54                Io::NeedMoreInput => {
55                    trace!("more input needed");
56                }
57            }
58
59            match self.stream.read(&mut self.buf).await? {
60                0 => return Err(Error::Closed),
61                n => {
62                    trace!("read {n}/{} bytes", self.buf.len());
63                    state.enqueue_input(&self.buf[..n]);
64                }
65            }
66        };
67
68        Ok(event)
69    }
70}
71
72/// Error during reading into or writing from a stream.
73#[derive(Debug, Error)]
74pub enum Error<E> {
75    /// Operation failed because stream is closed.
76    ///
77    /// We detect this by checking if the read or written byte count is 0. Whether the stream is
78    /// closed indefinitely or temporarily depends on the actual stream implementation.
79    #[error("Stream was closed")]
80    Closed,
81    /// An I/O error occurred in the underlying stream.
82    #[error(transparent)]
83    Io(#[from] std::io::Error),
84    /// An error occurred while progressing the state.
85    #[error(transparent)]
86    State(E),
87}