async_imap/extensions/
idle.rs

1//! Adds support for the IMAP IDLE command specificed in [RFC2177](https://tools.ietf.org/html/rfc2177).
2
3use std::fmt;
4use std::pin::Pin;
5use std::time::Duration;
6
7#[cfg(feature = "runtime-async-std")]
8use async_std::{
9    future::timeout,
10    io::{Read, Write},
11};
12use futures::prelude::*;
13use futures::task::{Context, Poll};
14use imap_proto::{RequestId, Response, Status};
15use stop_token::prelude::*;
16#[cfg(feature = "runtime-tokio")]
17use tokio::{
18    io::{AsyncRead as Read, AsyncWrite as Write},
19    time::timeout,
20};
21
22use crate::client::Session;
23use crate::error::Result;
24use crate::parse::handle_unilateral;
25use crate::types::ResponseData;
26
27/// `Handle` allows a client to block waiting for changes to the remote mailbox.
28///
29/// The handle blocks using the [`IDLE` command](https://tools.ietf.org/html/rfc2177#section-3)
30/// specificed in [RFC 2177](https://tools.ietf.org/html/rfc2177) until the underlying server state
31/// changes in some way. While idling does inform the client what changes happened on the server,
32/// this implementation will currently just block until _anything_ changes, and then notify the
33///
34/// Note that the server MAY consider a client inactive if it has an IDLE command running, and if
35/// such a server has an inactivity timeout it MAY log the client off implicitly at the end of its
36/// timeout period.  Because of that, clients using IDLE are advised to terminate the IDLE and
37/// re-issue it at least every 29 minutes to avoid being logged off. [`Handle::wait`]
38/// does this. This still allows a client to receive immediate mailbox updates even though it need
39/// only "poll" at half hour intervals.
40///
41/// As long as a [`Handle`] is active, the mailbox cannot be otherwise accessed.
42#[derive(Debug)]
43pub struct Handle<T: Read + Write + Unpin + fmt::Debug> {
44    session: Session<T>,
45    id: Option<RequestId>,
46}
47
48impl<T: Read + Write + Unpin + fmt::Debug> Unpin for Handle<T> {}
49
50impl<T: Read + Write + Unpin + fmt::Debug + Send> Stream for Handle<T> {
51    type Item = std::io::Result<ResponseData>;
52
53    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
54        self.as_mut().session().get_stream().poll_next(cx)
55    }
56}
57
58/// A stream of server responses after sending `IDLE`.
59#[derive(Debug)]
60#[must_use = "futures do nothing unless polled"]
61pub struct IdleStream<'a, St> {
62    stream: &'a mut St,
63}
64
65impl<St: Unpin> Unpin for IdleStream<'_, St> {}
66
67impl<'a, St: Stream + Unpin> IdleStream<'a, St> {
68    unsafe_pinned!(stream: &'a mut St);
69
70    pub(crate) fn new(stream: &'a mut St) -> Self {
71        IdleStream { stream }
72    }
73}
74
75impl<St: futures::stream::FusedStream + Unpin> futures::stream::FusedStream for IdleStream<'_, St> {
76    fn is_terminated(&self) -> bool {
77        self.stream.is_terminated()
78    }
79}
80
81impl<St: Stream + Unpin> Stream for IdleStream<'_, St> {
82    type Item = St::Item;
83
84    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
85        self.stream().poll_next(cx)
86    }
87}
88
89/// Possible responses that happen on an open idle connection.
90#[derive(Debug, PartialEq, Eq)]
91pub enum IdleResponse {
92    /// The manual interrupt was used to interrupt the idle connection..
93    ManualInterrupt,
94    /// The idle connection timed out, because of the user set timeout.
95    Timeout,
96    /// The server has indicated that some new action has happened.
97    NewData(ResponseData),
98}
99
100// Make it possible to access the inner connection and modify its settings, such as read/write
101// timeouts.
102impl<T: Read + Write + Unpin + fmt::Debug> AsMut<T> for Handle<T> {
103    fn as_mut(&mut self) -> &mut T {
104        self.session.conn.stream.as_mut()
105    }
106}
107
108impl<T: Read + Write + Unpin + fmt::Debug + Send> Handle<T> {
109    unsafe_pinned!(session: Session<T>);
110
111    pub(crate) fn new(session: Session<T>) -> Handle<T> {
112        Handle { session, id: None }
113    }
114
115    /// Start listening to the server side responses.
116    /// Must be called after [`Handle::init`].
117    pub fn wait(
118        &mut self,
119    ) -> (
120        impl Future<Output = Result<IdleResponse>> + '_,
121        stop_token::StopSource,
122    ) {
123        self.wait_with_timeout(Duration::from_secs(24 * 60 * 60))
124    }
125
126    /// Start listening to the server side responses.
127    ///
128    /// Stops after the passed in `timeout` without any response from the server.
129    /// Timeout is reset by any response, including `* OK Still here` keepalives.
130    ///
131    /// Must be called after [Handle::init].
132    pub fn wait_with_timeout(
133        &mut self,
134        dur: Duration,
135    ) -> (
136        impl Future<Output = Result<IdleResponse>> + '_,
137        stop_token::StopSource,
138    ) {
139        assert!(
140            self.id.is_some(),
141            "Cannot listen to response without starting IDLE"
142        );
143
144        let sender = self.session.unsolicited_responses_tx.clone();
145
146        let interrupt = stop_token::StopSource::new();
147        let raw_stream = IdleStream::new(self);
148        let mut interruptible_stream = raw_stream.timeout_at(interrupt.token());
149
150        let fut = async move {
151            loop {
152                let Ok(res) = timeout(dur, interruptible_stream.next()).await else {
153                    return Ok(IdleResponse::Timeout);
154                };
155
156                let Some(Ok(resp)) = res else {
157                    return Ok(IdleResponse::ManualInterrupt);
158                };
159
160                let resp = resp?;
161                match resp.parsed() {
162                    Response::Data {
163                        status: Status::Ok, ..
164                    } => {
165                        // all good continue
166                    }
167                    Response::Continue { .. } => {
168                        // continuation, wait for it
169                    }
170                    Response::Done { .. } => {
171                        handle_unilateral(resp, sender.clone());
172                    }
173                    _ => return Ok(IdleResponse::NewData(resp)),
174                }
175            }
176        };
177
178        (fut, interrupt)
179    }
180
181    /// Initialise the idle connection by sending the `IDLE` command to the server.
182    pub async fn init(&mut self) -> Result<()> {
183        let id = self.session.run_command("IDLE").await?;
184        self.id = Some(id);
185        while let Some(res) = self.session.stream.next().await {
186            let res = res?;
187            match res.parsed() {
188                Response::Continue { .. } => {
189                    return Ok(());
190                }
191                Response::Done {
192                    tag,
193                    status,
194                    information,
195                    ..
196                } => {
197                    if tag == self.id.as_ref().unwrap() {
198                        if let Status::Bad = status {
199                            return Err(std::io::Error::new(
200                                std::io::ErrorKind::ConnectionRefused,
201                                information.as_ref().unwrap().to_string(),
202                            )
203                            .into());
204                        }
205                    }
206                    handle_unilateral(res, self.session.unsolicited_responses_tx.clone());
207                }
208                _ => {
209                    handle_unilateral(res, self.session.unsolicited_responses_tx.clone());
210                }
211            }
212        }
213
214        Err(std::io::Error::new(std::io::ErrorKind::ConnectionRefused, "").into())
215    }
216
217    /// Signal that we want to exit the idle connection, by sending the `DONE`
218    /// command to the server.
219    pub async fn done(mut self) -> Result<Session<T>> {
220        assert!(
221            self.id.is_some(),
222            "Cannot call DONE on a non initialized idle connection"
223        );
224        self.session.run_command_untagged("DONE").await?;
225        let sender = self.session.unsolicited_responses_tx.clone();
226        self.session
227            .check_done_ok(&self.id.expect("invalid setup"), Some(sender))
228            .await?;
229
230        Ok(self.session)
231    }
232}