Skip to main content

io_imap/rfc2177/
idle.rs

1//! IMAP IDLE coroutine yielding mailbox change events.
2//!
3//! # Example
4//!
5//! ```rust,no_run
6//! use core::sync::atomic::AtomicBool;
7//! use std::{
8//!     io::{Read, Write},
9//!     net::TcpStream,
10//!     sync::Arc,
11//! };
12//!
13//! use io_imap::{
14//!     codec::fragmentizer::Fragmentizer,
15//!     coroutine::{ImapCoroutine, ImapCoroutineState},
16//!     rfc2177::idle::{ImapIdle, ImapIdleOptions, ImapIdleYield},
17//! };
18//!
19//! // Ready stream needed (TCP-connected, TLS-negociated, IMAP-authenticated)
20//! let mut stream = TcpStream::connect("localhost:143").unwrap();
21//!
22//! let mut fragmentizer = Fragmentizer::new(50 * 1024 * 1024);
23//! let mut buf = [0u8; 4096];
24//!
25//! let shutdown = Arc::new(AtomicBool::new(false));
26//! let mut coroutine = ImapIdle::new(shutdown.clone(), ImapIdleOptions::default());
27//! let mut arg = None;
28//!
29//! loop {
30//!     match coroutine.resume(&mut fragmentizer, arg.take()) {
31//!         ImapCoroutineState::Yielded(ImapIdleYield::WantsWrite(bytes)) => {
32//!             stream.write_all(&bytes).unwrap();
33//!         }
34//!         ImapCoroutineState::Yielded(ImapIdleYield::WantsRead) => {
35//!             let n = stream.read(&mut buf).unwrap();
36//!             arg = Some(&buf[..n]);
37//!         }
38//!         ImapCoroutineState::Yielded(ImapIdleYield::Event(event)) => {
39//!             println!("{event:?}");
40//!         }
41//!         ImapCoroutineState::Complete(Ok(())) => break,
42//!         ImapCoroutineState::Complete(Err(err)) => panic!("{err}"),
43//!     }
44//! }
45//! ```
46
47use core::{
48    fmt, mem,
49    sync::atomic::{AtomicBool, Ordering},
50    time::Duration,
51};
52
53use alloc::{boxed::Box, string::String, string::ToString, sync::Arc, vec::Vec};
54
55#[cfg(feature = "client")]
56use std::time::Instant;
57
58use imap_codec::{
59    CommandCodec, IdleDoneCodec, ResponseCodec,
60    fragmentizer::{DecodeMessageError, FragmentInfo, Fragmentizer},
61    imap_types::{
62        IntoStatic,
63        command::{Command, CommandBody},
64        core::TagGenerator,
65        extensions::idle::IdleDone,
66        response::{Bye, Data, Response, Status, StatusBody, StatusKind, Tagged},
67        secret::Secret,
68        utils::escape_byte_string,
69    },
70};
71use log::trace;
72use thiserror::Error;
73
74use crate::{coroutine::*, imap_try, send::*};
75
76/// Refresh interval kept under the 29-minute RFC 2177 ยง3 cap.
77#[cfg(feature = "client")]
78const IDLE_DEFAULT_TIMEOUT: Duration = Duration::from_secs(29);
79
80/// Failure causes during the IMAP IDLE flow.
81#[derive(Clone, Debug, Error)]
82pub enum ImapIdleError {
83    #[error("IMAP IDLE failed: NO {0}")]
84    No(String),
85    #[error("IMAP IDLE failed: BAD {0}")]
86    Bad(String),
87    #[error("IMAP IDLE failed: BYE {0}")]
88    Bye(String),
89
90    #[error("IMAP IDLE failed: server returned a tagged response before the continuation request")]
91    UnexpectedTagged,
92    #[error("IMAP IDLE failed: server did not send the expected continuation request")]
93    ExpectedContinuationRequest,
94    #[error("IMAP IDLE failed: server did not return a tagged response to DONE")]
95    MissingTagged,
96    #[error("IMAP IDLE failed: reached unexpected EOF on stream")]
97    Eof,
98    #[error("IMAP IDLE failed: decode response error")]
99    DecodingFailure(Secret<Box<[u8]>>),
100    #[error("IMAP IDLE failed: parse response error: message is poisoned")]
101    MessageIsPoisoned(Secret<Box<[u8]>>),
102    #[error("IMAP IDLE failed: parse response error: message is too long")]
103    MessageTooLong(Secret<Box<[u8]>>),
104
105    #[error("IMAP IDLE failed: {0}")]
106    Send(#[from] SendImapCommandError),
107}
108
109/// Batch of unilateral untagged responses received during an IDLE.
110#[derive(Debug)]
111pub struct ImapIdleEvent {
112    pub untagged: Vec<StatusBody<'static>>,
113    pub data: Vec<Data<'static>>,
114}
115
116/// Yield variants from the IDLE coroutine.
117#[derive(Debug)]
118pub enum ImapIdleYield {
119    WantsRead,
120    WantsWrite(Vec<u8>),
121    Event(ImapIdleEvent),
122}
123
124impl From<ImapYield> for ImapIdleYield {
125    fn from(y: ImapYield) -> Self {
126        match y {
127            ImapYield::WantsRead => ImapIdleYield::WantsRead,
128            ImapYield::WantsWrite(bytes) => ImapIdleYield::WantsWrite(bytes),
129        }
130    }
131}
132
133/// Options for [`ImapIdle::new`].
134#[derive(Clone, Debug, Default, Eq, PartialEq)]
135pub struct ImapIdleOptions {
136    /// Refresh interval; defaults to [`IDLE_DEFAULT_TIMEOUT`]. Unused
137    /// without the `client` feature.
138    pub timeout: Option<Duration>,
139}
140
141/// I/O-free IMAP IDLE coroutine yielding mailbox change events.
142pub struct ImapIdle {
143    tag: TagGenerator,
144    state: State,
145    wants_read: bool,
146    codec: ResponseCodec,
147    data: Vec<Data<'static>>,
148    untagged: Vec<StatusBody<'static>>,
149    bye: Option<Bye<'static>>,
150    done: Arc<AtomicBool>,
151    #[cfg_attr(not(feature = "client"), allow(dead_code))]
152    opts: ImapIdleOptions,
153    #[cfg(feature = "client")]
154    timer: Option<Instant>,
155}
156
157impl ImapIdle {
158    /// Flip `done` to `true` to wind down with a clean `DONE`.
159    pub fn new(done: Arc<AtomicBool>, opts: ImapIdleOptions) -> Self {
160        let mut tag = TagGenerator::new();
161
162        let command = Command {
163            tag: tag.generate(),
164            body: CommandBody::Idle,
165        };
166
167        trace!("send IMAP command {command:?}");
168
169        let state = State::Idle(SendImapCommand::new(CommandCodec::new(), command));
170
171        Self {
172            tag,
173            state,
174            wants_read: false,
175            codec: ResponseCodec::new(),
176            data: Vec::new(),
177            untagged: Vec::new(),
178            bye: None,
179            done,
180            opts,
181            #[cfg(feature = "client")]
182            timer: None,
183        }
184    }
185
186    #[cfg(feature = "client")]
187    fn timeout(&self) -> Duration {
188        self.opts.timeout.unwrap_or(IDLE_DEFAULT_TIMEOUT)
189    }
190
191    #[cfg(feature = "client")]
192    fn timed_out(&self) -> bool {
193        self.timer
194            .as_ref()
195            .map(|t| t.elapsed() >= self.timeout())
196            .unwrap_or(false)
197    }
198}
199
200impl ImapCoroutine for ImapIdle {
201    type Yield = ImapIdleYield;
202    type Return = Result<(), ImapIdleError>;
203
204    fn resume(
205        &mut self,
206        fragmentizer: &mut Fragmentizer,
207        mut arg: Option<&[u8]>,
208    ) -> ImapCoroutineState<Self::Yield, Self::Return> {
209        #[cfg(feature = "client")]
210        if self.timer.is_none() {
211            self.timer = Some(Instant::now());
212        }
213
214        loop {
215            trace!("idle: {}", self.state);
216
217            if mem::take(&mut self.wants_read) {
218                return ImapCoroutineState::Yielded(ImapIdleYield::WantsRead);
219            }
220
221            match &mut self.state {
222                State::Idle(send) => {
223                    // NOTE: servers may pack untagged responses into the same
224                    // frame as `+ idling`; surface them immediately.
225                    let out = imap_try!(send, fragmentizer, arg.take());
226
227                    if let Some(bye) = out.bye {
228                        let err = ImapIdleError::Bye(bye.text.to_string());
229                        return ImapCoroutineState::Complete(Err(err));
230                    }
231
232                    if let Some(Tagged { body, .. }) = out.tagged {
233                        let err = match body.kind {
234                            StatusKind::Ok => ImapIdleError::UnexpectedTagged,
235                            StatusKind::No => ImapIdleError::No(body.text.to_string()),
236                            StatusKind::Bad => ImapIdleError::Bad(body.text.to_string()),
237                        };
238
239                        return ImapCoroutineState::Complete(Err(err));
240                    }
241
242                    if out.continuation_request.is_none() {
243                        let err = ImapIdleError::ExpectedContinuationRequest;
244                        return ImapCoroutineState::Complete(Err(err));
245                    }
246
247                    self.state = State::Read;
248
249                    if !out.data.is_empty() || !out.untagged.is_empty() {
250                        let event = ImapIdleEvent {
251                            data: out.data,
252                            untagged: out.untagged,
253                        };
254
255                        return ImapCoroutineState::Yielded(ImapIdleYield::Event(event));
256                    }
257                }
258                State::Read => {
259                    let done = self.done.load(Ordering::SeqCst);
260                    #[cfg(feature = "client")]
261                    let timed_out = self.timed_out();
262                    #[cfg(not(feature = "client"))]
263                    let timed_out = false;
264
265                    if done || timed_out {
266                        trace!("idle done: {done}");
267                        trace!("idle timed out: {timed_out}");
268                        let send = SendImapCommand::new(IdleDoneCodec::new(), IdleDone);
269                        self.state = State::Done(send);
270                        continue;
271                    }
272
273                    match arg.take() {
274                        Some(&[]) => {
275                            return ImapCoroutineState::Complete(Err(ImapIdleError::Eof));
276                        }
277                        Some(bytes) => {
278                            trace!("read bytes: {}", escape_byte_string(bytes));
279                            fragmentizer.enqueue_bytes(bytes);
280                        }
281                        None => {
282                            self.wants_read = true;
283                            continue;
284                        }
285                    }
286
287                    loop {
288                        match fragmentizer.progress() {
289                            Some(info @ FragmentInfo::Line { .. }) => {
290                                let bytes = fragmentizer.fragment_bytes(info);
291                                trace!("read line fragment: {}", escape_byte_string(bytes));
292
293                                if !fragmentizer.is_message_complete() {
294                                    continue;
295                                }
296
297                                match fragmentizer.decode_message(&self.codec) {
298                                    Ok(Response::Data(data)) => {
299                                        self.data.push(data.into_static());
300                                    }
301                                    Ok(Response::Status(Status::Untagged(status))) => {
302                                        self.untagged.push(status.into_static());
303                                    }
304                                    Ok(Response::Status(Status::Tagged(_))) => {}
305                                    Ok(Response::Status(Status::Bye(bye))) => {
306                                        self.bye.replace(bye.into_static());
307                                    }
308                                    Ok(Response::CommandContinuationRequest(_)) => {}
309                                    Err(decode_err) => {
310                                        let bytes = fragmentizer.message_bytes();
311                                        let bytes = Secret::new(bytes.into());
312                                        let err = match decode_err {
313                                            DecodeMessageError::DecodingFailure(_)
314                                            | DecodeMessageError::DecodingRemainder { .. } => {
315                                                ImapIdleError::DecodingFailure(bytes)
316                                            }
317                                            DecodeMessageError::MessageTooLong { .. } => {
318                                                ImapIdleError::MessageTooLong(bytes)
319                                            }
320                                            DecodeMessageError::MessagePoisoned { .. } => {
321                                                ImapIdleError::MessageIsPoisoned(bytes)
322                                            }
323                                        };
324                                        return ImapCoroutineState::Complete(Err(err));
325                                    }
326                                }
327                            }
328                            Some(info @ FragmentInfo::Literal { .. }) => {
329                                let bytes = fragmentizer.fragment_bytes(info);
330                                trace!("read literal fragment ({} bytes)", bytes.len());
331                            }
332                            None => {
333                                let event = ImapIdleEvent {
334                                    data: mem::take(&mut self.data),
335                                    untagged: mem::take(&mut self.untagged),
336                                };
337
338                                return ImapCoroutineState::Yielded(ImapIdleYield::Event(event));
339                            }
340                        }
341                    }
342                }
343                State::Done(send) => {
344                    let out = imap_try!(send, fragmentizer, arg.take());
345
346                    if let Some(bye) = out.bye {
347                        let err = ImapIdleError::Bye(bye.text.to_string());
348                        return ImapCoroutineState::Complete(Err(err));
349                    }
350
351                    let Some(Tagged { body, .. }) = out.tagged else {
352                        return ImapCoroutineState::Complete(Err(ImapIdleError::MissingTagged));
353                    };
354
355                    #[cfg(feature = "client")]
356                    let timed_out = self
357                        .timer
358                        .take()
359                        .map(|t| t.elapsed() >= self.timeout())
360                        .unwrap_or(false);
361                    #[cfg(not(feature = "client"))]
362                    let timed_out = false;
363
364                    return match body.kind {
365                        StatusKind::Ok if timed_out => {
366                            trace!("reached timeout, starting a new IDLE command");
367                            let command = Command {
368                                tag: self.tag.generate(),
369                                body: CommandBody::Idle,
370                            };
371                            let send = SendImapCommand::new(CommandCodec::new(), command);
372                            self.state = State::Idle(send);
373                            continue;
374                        }
375                        StatusKind::Ok => ImapCoroutineState::Complete(Ok(())),
376                        StatusKind::No => ImapCoroutineState::Complete(Err(ImapIdleError::No(
377                            body.text.to_string(),
378                        ))),
379                        StatusKind::Bad => ImapCoroutineState::Complete(Err(ImapIdleError::Bad(
380                            body.text.to_string(),
381                        ))),
382                    };
383                }
384            }
385        }
386    }
387}
388
389enum State {
390    Idle(SendImapCommand<CommandCodec>),
391    Read,
392    Done(SendImapCommand<IdleDoneCodec>),
393}
394
395impl fmt::Display for State {
396    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
397        match self {
398            Self::Idle(_) => f.write_str("send idle"),
399            Self::Read => f.write_str("read events"),
400            Self::Done(_) => f.write_str("send done"),
401        }
402    }
403}
404
405#[cfg(test)]
406mod tests {
407    use core::str;
408
409    use alloc::borrow::ToOwned;
410
411    use super::*;
412
413    #[test]
414    fn shutdown_returns_ok() {
415        let done = Arc::new(AtomicBool::new(false));
416        let mut idle = ImapIdle::new(done.clone(), ImapIdleOptions::default());
417        let mut frag = Fragmentizer::new(50 * 1024 * 1024);
418
419        let bytes = expect_wants_write(&mut idle, &mut frag, None);
420        let line = str::from_utf8(&bytes).expect("utf8 command");
421        let tag = first_word(line).to_owned();
422        assert!(line.trim_end().ends_with("IDLE"));
423
424        expect_wants_read(&mut idle, &mut frag);
425        expect_wants_read_after(&mut idle, &mut frag, b"+ idling\r\n");
426
427        done.store(true, Ordering::SeqCst);
428        let bytes = expect_wants_write(&mut idle, &mut frag, None);
429        assert_eq!(b"DONE\r\n", &*bytes);
430
431        expect_wants_read(&mut idle, &mut frag);
432
433        let reply = format!("{tag} OK IDLE terminated\r\n");
434        expect_complete_ok(&mut idle, &mut frag, reply.as_bytes());
435    }
436
437    #[test]
438    fn unsolicited_during_read_yields_event() {
439        let done = Arc::new(AtomicBool::new(false));
440        let mut idle = ImapIdle::new(done, ImapIdleOptions::default());
441        let mut frag = Fragmentizer::new(50 * 1024 * 1024);
442
443        let _ = expect_wants_write(&mut idle, &mut frag, None);
444        expect_wants_read(&mut idle, &mut frag);
445        expect_wants_read_after(&mut idle, &mut frag, b"+ idling\r\n");
446
447        let event = expect_event(&mut idle, &mut frag, b"* 5 EXISTS\r\n");
448        assert_eq!(1, event.data.len());
449        assert!(event.untagged.is_empty());
450    }
451
452    #[test]
453    fn unsolicited_piggyback_on_continuation_yields_event() {
454        let done = Arc::new(AtomicBool::new(false));
455        let mut idle = ImapIdle::new(done, ImapIdleOptions::default());
456        let mut frag = Fragmentizer::new(50 * 1024 * 1024);
457
458        let _ = expect_wants_write(&mut idle, &mut frag, None);
459        expect_wants_read(&mut idle, &mut frag);
460
461        let event = expect_event(&mut idle, &mut frag, b"+ idling\r\n* 10 EXISTS\r\n");
462        assert_eq!(1, event.data.len());
463    }
464
465    #[test]
466    fn idle_tagged_bad_returns_bad_error() {
467        let done = Arc::new(AtomicBool::new(false));
468        let mut idle = ImapIdle::new(done, ImapIdleOptions::default());
469        let mut frag = Fragmentizer::new(50 * 1024 * 1024);
470
471        let bytes = expect_wants_write(&mut idle, &mut frag, None);
472        let tag = first_word(str::from_utf8(&bytes).expect("utf8 command")).to_owned();
473
474        expect_wants_read(&mut idle, &mut frag);
475
476        let reply = format!("{tag} BAD IDLE not supported\r\n");
477        let err = expect_complete_err(&mut idle, &mut frag, reply.as_bytes());
478        let ImapIdleError::Bad(text) = err else {
479            panic!("expected ImapIdleError::Bad, got {err:?}");
480        };
481        assert_eq!(text, "IDLE not supported");
482    }
483
484    #[test]
485    fn done_tagged_no_returns_no_error() {
486        let done = Arc::new(AtomicBool::new(false));
487        let mut idle = ImapIdle::new(done.clone(), ImapIdleOptions::default());
488        let mut frag = Fragmentizer::new(50 * 1024 * 1024);
489
490        let bytes = expect_wants_write(&mut idle, &mut frag, None);
491        let tag = first_word(str::from_utf8(&bytes).expect("utf8 command")).to_owned();
492
493        expect_wants_read(&mut idle, &mut frag);
494        expect_wants_read_after(&mut idle, &mut frag, b"+ idling\r\n");
495
496        done.store(true, Ordering::SeqCst);
497        let _ = expect_wants_write(&mut idle, &mut frag, None);
498        expect_wants_read(&mut idle, &mut frag);
499
500        let reply = format!("{tag} NO IDLE aborted\r\n");
501        let err = expect_complete_err(&mut idle, &mut frag, reply.as_bytes());
502        let ImapIdleError::No(text) = err else {
503            panic!("expected ImapIdleError::No, got {err:?}");
504        };
505        assert_eq!(text, "IDLE aborted");
506    }
507
508    // --- utils
509
510    fn expect_wants_write(
511        cor: &mut ImapIdle,
512        frag: &mut Fragmentizer,
513        arg: Option<&[u8]>,
514    ) -> Vec<u8> {
515        match cor.resume(frag, arg) {
516            ImapCoroutineState::Yielded(ImapIdleYield::WantsWrite(bytes)) => bytes,
517            state => panic!("expected WantsWrite, got {state:?}"),
518        }
519    }
520
521    fn expect_wants_read(cor: &mut ImapIdle, frag: &mut Fragmentizer) {
522        match cor.resume(frag, None) {
523            ImapCoroutineState::Yielded(ImapIdleYield::WantsRead) => {}
524            state => panic!("expected WantsRead, got {state:?}"),
525        }
526    }
527
528    fn expect_wants_read_after(cor: &mut ImapIdle, frag: &mut Fragmentizer, arg: &[u8]) {
529        match cor.resume(frag, Some(arg)) {
530            ImapCoroutineState::Yielded(ImapIdleYield::WantsRead) => {}
531            state => panic!("expected WantsRead, got {state:?}"),
532        }
533    }
534
535    fn expect_event(cor: &mut ImapIdle, frag: &mut Fragmentizer, arg: &[u8]) -> ImapIdleEvent {
536        match cor.resume(frag, Some(arg)) {
537            ImapCoroutineState::Yielded(ImapIdleYield::Event(event)) => event,
538            state => panic!("expected Event, got {state:?}"),
539        }
540    }
541
542    fn expect_complete_ok(cor: &mut ImapIdle, frag: &mut Fragmentizer, reply: &[u8]) {
543        match cor.resume(frag, Some(reply)) {
544            ImapCoroutineState::Complete(Ok(())) => {}
545            state => panic!("expected Complete(Ok), got {state:?}"),
546        }
547    }
548
549    fn expect_complete_err(
550        cor: &mut ImapIdle,
551        frag: &mut Fragmentizer,
552        reply: &[u8],
553    ) -> ImapIdleError {
554        match cor.resume(frag, Some(reply)) {
555            ImapCoroutineState::Complete(Err(err)) => err,
556            state => panic!("expected Complete(Err), got {state:?}"),
557        }
558    }
559
560    fn first_word(line: &str) -> &str {
561        line.split_whitespace()
562            .next()
563            .expect("first whitespace-separated token")
564    }
565}