async_imap/extensions/
idle.rs

1//! Adds support for the IMAP IDLE command specificed in [RFC2177](https://tools.ietf.org/html/rfc2177).
2
3use std::fmt;
4use std::pin::Pin;
5use std::time::Duration;
6
7#[cfg(feature = "runtime-async-std")]
8use async_std::{
9    future::timeout,
10    io::{Read, Write},
11};
12use futures::prelude::*;
13use futures::task::{Context, Poll};
14use imap_proto::{RequestId, Response, Status};
15use stop_token::prelude::*;
16#[cfg(feature = "runtime-tokio")]
17use tokio::{
18    io::{AsyncRead as Read, AsyncWrite as Write},
19    time::timeout,
20};
21
22use crate::client::Session;
23use crate::error::Result;
24use crate::parse::handle_unilateral;
25use crate::types::ResponseData;
26
27/// `Handle` allows a client to block waiting for changes to the remote mailbox.
28///
29/// The handle blocks using the [`IDLE` command](https://tools.ietf.org/html/rfc2177#section-3)
30/// specificed in [RFC 2177](https://tools.ietf.org/html/rfc2177) until the underlying server state
31/// changes in some way. While idling does inform the client what changes happened on the server,
32/// this implementation will currently just block until _anything_ changes, and then notify the
33///
34/// Note that the server MAY consider a client inactive if it has an IDLE command running, and if
35/// such a server has an inactivity timeout it MAY log the client off implicitly at the end of its
36/// timeout period.  Because of that, clients using IDLE are advised to terminate the IDLE and
37/// re-issue it at least every 29 minutes to avoid being logged off. [`Handle::wait`]
38/// does this. This still allows a client to receive immediate mailbox updates even though it need
39/// only "poll" at half hour intervals.
40///
41/// As long as a [`Handle`] is active, the mailbox cannot be otherwise accessed.
42#[derive(Debug)]
43pub struct Handle<T: Read + Write + Unpin + fmt::Debug> {
44    session: Session<T>,
45    id: Option<RequestId>,
46}
47
48impl<T: Read + Write + Unpin + fmt::Debug> Unpin for Handle<T> {}
49
50impl<T: Read + Write + Unpin + fmt::Debug + Send> Stream for Handle<T> {
51    type Item = std::io::Result<ResponseData>;
52
53    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
54        self.as_mut().session().get_stream().poll_next(cx)
55    }
56}
57
58/// A stream of server responses after sending `IDLE`.
59#[derive(Debug)]
60#[must_use = "futures do nothing unless polled"]
61pub struct IdleStream<'a, St> {
62    stream: &'a mut St,
63}
64
65impl<St: Unpin> Unpin for IdleStream<'_, St> {}
66
67impl<'a, St: Stream + Unpin> IdleStream<'a, St> {
68    unsafe_pinned!(stream: &'a mut St);
69
70    pub(crate) fn new(stream: &'a mut St) -> Self {
71        IdleStream { stream }
72    }
73}
74
75impl<St: futures::stream::FusedStream + Unpin> futures::stream::FusedStream for IdleStream<'_, St> {
76    fn is_terminated(&self) -> bool {
77        self.stream.is_terminated()
78    }
79}
80
81impl<St: Stream + Unpin> Stream for IdleStream<'_, St> {
82    type Item = St::Item;
83
84    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
85        self.stream().poll_next(cx)
86    }
87}
88
89/// Possible responses that happen on an open idle connection.
90#[derive(Debug, PartialEq, Eq)]
91pub enum IdleResponse {
92    /// The manual interrupt was used to interrupt the idle connection..
93    ManualInterrupt,
94    /// The idle connection timed out, because of the user set timeout.
95    Timeout,
96    /// The server has indicated that some new action has happened.
97    NewData(ResponseData),
98}
99
100// Make it possible to access the inner connection and modify its settings, such as read/write
101// timeouts.
102impl<T: Read + Write + Unpin + fmt::Debug> AsMut<T> for Handle<T> {
103    fn as_mut(&mut self) -> &mut T {
104        self.session.conn.stream.as_mut()
105    }
106}
107
108impl<T: Read + Write + Unpin + fmt::Debug + Send> Handle<T> {
109    unsafe_pinned!(session: Session<T>);
110
111    pub(crate) fn new(session: Session<T>) -> Handle<T> {
112        Handle { session, id: None }
113    }
114
115    /// Start listening to the server side responses.
116    /// Must be called after [`Handle::init`].
117    pub fn wait(
118        &mut self,
119    ) -> (
120        impl Future<Output = Result<IdleResponse>> + '_,
121        stop_token::StopSource,
122    ) {
123        self.wait_with_timeout(Duration::from_secs(24 * 60 * 60))
124    }
125
126    /// Start listening to the server side responses.
127    ///
128    /// Stops after the passed in `timeout` without any response from the server.
129    /// Timeout is reset by any response, including `* OK Still here` keepalives.
130    ///
131    /// Must be called after [Handle::init].
132    pub fn wait_with_timeout(
133        &mut self,
134        dur: Duration,
135    ) -> (
136        impl Future<Output = Result<IdleResponse>> + '_,
137        stop_token::StopSource,
138    ) {
139        assert!(
140            self.id.is_some(),
141            "Cannot listen to response without starting IDLE"
142        );
143
144        let sender = self.session.unsolicited_responses_tx.clone();
145
146        let interrupt = stop_token::StopSource::new();
147        let raw_stream = IdleStream::new(self);
148        let mut interruptible_stream = raw_stream.timeout_at(interrupt.token());
149
150        let fut = async move {
151            loop {
152                let Ok(res) = timeout(dur, interruptible_stream.next()).await else {
153                    return Ok(IdleResponse::Timeout);
154                };
155
156                let Some(Ok(resp)) = res else {
157                    return Ok(IdleResponse::ManualInterrupt);
158                };
159
160                let resp = resp?;
161                match resp.parsed() {
162                    Response::Data {
163                        status: Status::Ok, ..
164                    } => {
165                        // all good continue
166                    }
167                    Response::Continue { .. } => {
168                        // continuation, wait for it
169                    }
170                    Response::Done { .. } => {
171                        handle_unilateral(resp, sender.clone());
172                    }
173                    _ => return Ok(IdleResponse::NewData(resp)),
174                }
175            }
176        };
177
178        (fut, interrupt)
179    }
180
181    /// Initialise the idle connection by sending the `IDLE` command to the server.
182    pub async fn init(&mut self) -> Result<()> {
183        let id = self.session.run_command("IDLE").await?;
184        self.id = Some(id);
185        while let Some(res) = self.session.stream.try_next().await? {
186            match res.parsed() {
187                Response::Continue { .. } => {
188                    return Ok(());
189                }
190                Response::Done {
191                    tag,
192                    status,
193                    information,
194                    ..
195                } => {
196                    if tag == self.id.as_ref().unwrap() {
197                        if let Status::Bad = status {
198                            return Err(std::io::Error::new(
199                                std::io::ErrorKind::ConnectionRefused,
200                                information.as_ref().unwrap().to_string(),
201                            )
202                            .into());
203                        }
204                    }
205                    handle_unilateral(res, self.session.unsolicited_responses_tx.clone());
206                }
207                _ => {
208                    handle_unilateral(res, self.session.unsolicited_responses_tx.clone());
209                }
210            }
211        }
212
213        Err(std::io::Error::new(std::io::ErrorKind::ConnectionRefused, "").into())
214    }
215
216    /// Signal that we want to exit the idle connection, by sending the `DONE`
217    /// command to the server.
218    pub async fn done(mut self) -> Result<Session<T>> {
219        assert!(
220            self.id.is_some(),
221            "Cannot call DONE on a non initialized idle connection"
222        );
223        self.session.run_command_untagged("DONE").await?;
224        let sender = self.session.unsolicited_responses_tx.clone();
225        self.session
226            .check_done_ok(&self.id.expect("invalid setup"), Some(sender))
227            .await?;
228
229        Ok(self.session)
230    }
231}