Skip to main content

io_imap/
watch.rs

1//! IMAP single-mailbox watcher: IDLE (RFC 2177) for the wake signal,
2//! SELECT (QRESYNC) (RFC 7162) for UID-keyed deltas.
3//!
4//! QRESYNC: <https://www.rfc-editor.org/rfc/rfc7162>
5//!
6//! ```text
7//! SELECT (CONDSTORE) → FETCH 1:* (UID FLAGS) [seed shadow]
8//!     → IDLE → SELECT (QRESYNC) → emit deltas → IDLE → ...
9//! ```
10//!
11//! Connection is dedicated. Flip the shared [`AtomicBool`] to wind
12//! down cleanly.
13//!
14//! # Example
15//!
16//! ```rust,no_run
17//! use core::sync::atomic::AtomicBool;
18//! use std::{
19//!     io::{Read, Write},
20//!     net::TcpStream,
21//!     sync::Arc,
22//! };
23//!
24//! use io_imap::{
25//!     codec::fragmentizer::Fragmentizer,
26//!     coroutine::{ImapCoroutine, ImapCoroutineState},
27//!     types::response::Capability,
28//!     watch::{ImapMailboxWatch, ImapMailboxWatchYield},
29//! };
30//!
31//! // Ready stream needed (TCP-connected, TLS-negociated, IMAP-authenticated)
32//! let mut stream = TcpStream::connect("localhost:143").unwrap();
33//!
34//! let mut fragmentizer = Fragmentizer::new(50 * 1024 * 1024);
35//! let mut buf = [0u8; 4096];
36//!
37//! let capability = [Capability::QResync];
38//! let mailbox = "INBOX".try_into().unwrap();
39//! let shutdown = Arc::new(AtomicBool::new(false));
40//! let mut coroutine = ImapMailboxWatch::new(&capability, mailbox, shutdown.clone()).unwrap();
41//! let mut arg = None;
42//!
43//! loop {
44//!     match coroutine.resume(&mut fragmentizer, arg.take()) {
45//!         ImapCoroutineState::Yielded(ImapMailboxWatchYield::WantsWrite(bytes)) => {
46//!             stream.write_all(&bytes).unwrap();
47//!         }
48//!         ImapCoroutineState::Yielded(ImapMailboxWatchYield::WantsRead) => {
49//!             let n = stream.read(&mut buf).unwrap();
50//!             arg = Some(&buf[..n]);
51//!         }
52//!         ImapCoroutineState::Yielded(ImapMailboxWatchYield::Event(event)) => {
53//!             println!("{event:?}");
54//!         }
55//!         ImapCoroutineState::Complete(Ok(())) => break,
56//!         ImapCoroutineState::Complete(Err(err)) => panic!("{err}"),
57//!     }
58//! }
59//! ```
60
61use core::{
62    mem,
63    num::{NonZeroU32, NonZeroU64},
64    sync::atomic::{AtomicBool, Ordering},
65};
66
67use alloc::{
68    collections::{BTreeMap, VecDeque},
69    string::String,
70    sync::Arc,
71    vec,
72    vec::Vec,
73};
74
75use imap_codec::{
76    fragmentizer::Fragmentizer,
77    imap_types::{
78        command::SelectParameter,
79        core::{Atom, Vec1},
80        extensions::enable::CapabilityEnable,
81        fetch::{MacroOrMessageDataItemNames, MessageDataItem, MessageDataItemName},
82        flag::{Flag, FlagFetch},
83        mailbox::Mailbox,
84        response::Capability,
85        sequence::SequenceSet,
86    },
87};
88use log::trace;
89use thiserror::Error;
90
91use crate::{
92    coroutine::*,
93    rfc2177::idle::{ImapIdle, ImapIdleError, ImapIdleOptions, ImapIdleYield},
94    rfc3501::{
95        fetch::{ImapMessageFetch, ImapMessageFetchError, ImapMessageFetchOptions},
96        select::{ImapMailboxSelect, ImapMailboxSelectError, ImapMailboxSelectOptions, SelectData},
97    },
98    rfc5161::enable::{ImapExtensionEnable, ImapExtensionEnableError},
99};
100
101/// `FlagsAdded`/`FlagsRemoved` are pre-diffed against the internal
102/// shadow; each `flags` vector lists only the changed flags.
103#[derive(Clone, Debug)]
104pub enum ImapMailboxWatchEvent {
105    EnvelopeAdded {
106        uid: NonZeroU32,
107        items: Vec<MessageDataItem<'static>>,
108    },
109    FlagsAdded {
110        uid: NonZeroU32,
111        flags: Vec<Flag<'static>>,
112    },
113    FlagsRemoved {
114        uid: NonZeroU32,
115        flags: Vec<Flag<'static>>,
116    },
117    EnvelopeRemoved {
118        uid: NonZeroU32,
119    },
120}
121
122/// Failure causes during the mailbox watch flow.
123#[derive(Debug, Error)]
124pub enum ImapMailboxWatchError {
125    #[error("IMAP server does not advertise QRESYNC")]
126    QresyncUnsupported,
127    #[error("IMAP server did not return UIDVALIDITY in SELECT response")]
128    MissingUidValidity,
129    #[error("IMAP server did not return HIGHESTMODSEQ in SELECT response")]
130    MissingHighestModSeq,
131    #[error("Invalid `1:*` sequence set: {0}")]
132    InvalidSequenceSet(String),
133    #[error("IMAP SELECT error")]
134    Select(#[from] ImapMailboxSelectError),
135    #[error("IMAP FETCH error")]
136    Fetch(#[from] ImapMessageFetchError),
137    #[error("IMAP IDLE error")]
138    Idle(#[from] ImapIdleError),
139    #[error("IMAP ENABLE error")]
140    Enable(#[from] ImapExtensionEnableError),
141}
142
143/// Yield variants from the mailbox watcher.
144#[derive(Debug)]
145pub enum ImapMailboxWatchYield {
146    WantsRead,
147    WantsWrite(Vec<u8>),
148    Event(ImapMailboxWatchEvent),
149}
150
151enum State {
152    EnableQresync(ImapExtensionEnable),
153    SelectInitial(ImapMailboxSelect),
154    FetchBaseline(ImapMessageFetch),
155    BeginIdle,
156    Idle(ImapIdle),
157    SelectQresync(ImapMailboxSelect),
158    EmitDeltas,
159    Terminal,
160}
161
162/// I/O-free IDLE+QRESYNC mailbox watcher.
163pub struct ImapMailboxWatch {
164    state: State,
165    shutdown: Arc<AtomicBool>,
166    idle_done: Arc<AtomicBool>,
167    idle_saw_data: bool,
168    mailbox: Mailbox<'static>,
169    uid_validity: Option<NonZeroU32>,
170    highest_mod_seq: u64,
171    shadow: BTreeMap<NonZeroU32, Vec<Flag<'static>>>,
172    pending: VecDeque<ImapMailboxWatchEvent>,
173}
174
175impl ImapMailboxWatch {
176    /// Errors with `QresyncUnsupported` when `capability` lacks QRESYNC.
177    pub fn new(
178        capability: &[Capability<'static>],
179        mailbox: Mailbox<'static>,
180        shutdown: Arc<AtomicBool>,
181    ) -> Result<Self, ImapMailboxWatchError> {
182        if !capability.contains(&Capability::QResync) {
183            return Err(ImapMailboxWatchError::QresyncUnsupported);
184        }
185
186        // NOTE: RFC 7162 §3.1 — QRESYNC implies CONDSTORE, but pass
187        // both since some servers only echo CONDSTORE in ENABLED.
188        let condstore = CapabilityEnable::CondStore;
189        // NOTE: QRESYNC is not in the typed enum, route via Atom.
190        let qresync = CapabilityEnable::from(
191            Atom::try_from("QRESYNC").expect("`QRESYNC` is a syntactically valid IMAP atom"),
192        );
193        let capabilities =
194            Vec1::try_from(vec![condstore, qresync]).expect("two capabilities is non-empty");
195        let enable = ImapExtensionEnable::new(capabilities);
196
197        Ok(Self {
198            state: State::EnableQresync(enable),
199            shutdown,
200            idle_done: Arc::new(AtomicBool::new(false)),
201            idle_saw_data: false,
202            mailbox,
203            uid_validity: None,
204            highest_mod_seq: 0,
205            shadow: BTreeMap::new(),
206            pending: VecDeque::new(),
207        })
208    }
209
210    fn compute_deltas(&mut self, data: &SelectData) {
211        for uid in &data.vanished_earlier {
212            if self.shadow.remove(uid).is_some() {
213                self.pending
214                    .push_back(ImapMailboxWatchEvent::EnvelopeRemoved { uid: *uid });
215            }
216        }
217
218        for fetch in &data.changed {
219            let items_vec: Vec<MessageDataItem<'static>> =
220                fetch.items.clone().into_inner().into_iter().collect();
221            let (uid_opt, new_flags) = extract_uid_flags(&items_vec);
222            let Some(uid) = uid_opt else {
223                continue;
224            };
225
226            match self.shadow.get(&uid).cloned() {
227                None => {
228                    self.shadow.insert(uid, new_flags);
229                    self.pending
230                        .push_back(ImapMailboxWatchEvent::EnvelopeAdded {
231                            uid,
232                            items: items_vec,
233                        });
234                }
235                Some(old_flags) => {
236                    let added: Vec<Flag<'static>> = new_flags
237                        .iter()
238                        .filter(|f| !old_flags.contains(f))
239                        .cloned()
240                        .collect();
241                    let removed: Vec<Flag<'static>> = old_flags
242                        .iter()
243                        .filter(|f| !new_flags.contains(f))
244                        .cloned()
245                        .collect();
246                    self.shadow.insert(uid, new_flags);
247                    if !added.is_empty() {
248                        self.pending
249                            .push_back(ImapMailboxWatchEvent::FlagsAdded { uid, flags: added });
250                    }
251                    if !removed.is_empty() {
252                        self.pending.push_back(ImapMailboxWatchEvent::FlagsRemoved {
253                            uid,
254                            flags: removed,
255                        });
256                    }
257                }
258            }
259        }
260    }
261}
262
263impl ImapCoroutine for ImapMailboxWatch {
264    type Yield = ImapMailboxWatchYield;
265    type Return = Result<(), ImapMailboxWatchError>;
266
267    fn resume(
268        &mut self,
269        fragmentizer: &mut Fragmentizer,
270        mut arg: Option<&[u8]>,
271    ) -> ImapCoroutineState<Self::Yield, Self::Return> {
272        if self.shutdown.load(Ordering::SeqCst) {
273            self.idle_done.store(true, Ordering::SeqCst);
274        }
275
276        loop {
277            let state = mem::replace(&mut self.state, State::Terminal);
278
279            match state {
280                State::EnableQresync(mut enable) => match enable.resume(fragmentizer, arg.take()) {
281                    ImapCoroutineState::Yielded(ImapYield::WantsRead) => {
282                        self.state = State::EnableQresync(enable);
283                        return ImapCoroutineState::Yielded(ImapMailboxWatchYield::WantsRead);
284                    }
285                    ImapCoroutineState::Yielded(ImapYield::WantsWrite(bytes)) => {
286                        self.state = State::EnableQresync(enable);
287                        return ImapCoroutineState::Yielded(ImapMailboxWatchYield::WantsWrite(
288                            bytes,
289                        ));
290                    }
291                    ImapCoroutineState::Complete(Ok(enabled)) => {
292                        trace!("watch: ENABLE OK ({enabled:?})");
293                        let parameters = vec![SelectParameter::CondStore];
294                        let select = ImapMailboxSelect::new(
295                            self.mailbox.clone(),
296                            ImapMailboxSelectOptions { parameters },
297                        );
298                        self.state = State::SelectInitial(select);
299                    }
300                    ImapCoroutineState::Complete(Err(err)) => {
301                        return ImapCoroutineState::Complete(Err(err.into()));
302                    }
303                },
304
305                State::SelectInitial(mut select) => match select.resume(fragmentizer, arg.take()) {
306                    ImapCoroutineState::Yielded(ImapYield::WantsRead) => {
307                        self.state = State::SelectInitial(select);
308                        return ImapCoroutineState::Yielded(ImapMailboxWatchYield::WantsRead);
309                    }
310                    ImapCoroutineState::Yielded(ImapYield::WantsWrite(bytes)) => {
311                        self.state = State::SelectInitial(select);
312                        return ImapCoroutineState::Yielded(ImapMailboxWatchYield::WantsWrite(
313                            bytes,
314                        ));
315                    }
316                    ImapCoroutineState::Complete(Ok(data)) => {
317                        let Some(uid_validity) = data.uid_validity else {
318                            return ImapCoroutineState::Complete(Err(
319                                ImapMailboxWatchError::MissingUidValidity,
320                            ));
321                        };
322                        let Some(highest_mod_seq) = data.highest_mod_seq else {
323                            return ImapCoroutineState::Complete(Err(
324                                ImapMailboxWatchError::MissingHighestModSeq,
325                            ));
326                        };
327
328                        self.uid_validity = Some(uid_validity);
329                        self.highest_mod_seq = highest_mod_seq;
330                        trace!(
331                            "watch: SELECT OK uidvalidity={} highestmodseq={}",
332                            uid_validity.get(),
333                            highest_mod_seq,
334                        );
335
336                        let sequence_set: SequenceSet = match "1:*".try_into() {
337                            Ok(s) => s,
338                            Err(_) => {
339                                return ImapCoroutineState::Complete(Err(
340                                    ImapMailboxWatchError::InvalidSequenceSet("1:*".into()),
341                                ));
342                            }
343                        };
344                        let item_names = MacroOrMessageDataItemNames::MessageDataItemNames(vec![
345                            MessageDataItemName::Uid,
346                            MessageDataItemName::Flags,
347                        ]);
348                        let fetch = ImapMessageFetch::new(
349                            sequence_set,
350                            item_names,
351                            ImapMessageFetchOptions::default(),
352                        );
353                        self.state = State::FetchBaseline(fetch);
354                    }
355                    ImapCoroutineState::Complete(Err(err)) => {
356                        return ImapCoroutineState::Complete(Err(err.into()));
357                    }
358                },
359
360                State::FetchBaseline(mut fetch) => match fetch.resume(fragmentizer, arg.take()) {
361                    ImapCoroutineState::Yielded(ImapYield::WantsRead) => {
362                        self.state = State::FetchBaseline(fetch);
363                        return ImapCoroutineState::Yielded(ImapMailboxWatchYield::WantsRead);
364                    }
365                    ImapCoroutineState::Yielded(ImapYield::WantsWrite(bytes)) => {
366                        self.state = State::FetchBaseline(fetch);
367                        return ImapCoroutineState::Yielded(ImapMailboxWatchYield::WantsWrite(
368                            bytes,
369                        ));
370                    }
371                    ImapCoroutineState::Complete(Ok(data)) => {
372                        for (_seq, items) in data {
373                            let items_vec = items.into_inner();
374                            if let (Some(uid), flags) = extract_uid_flags(&items_vec) {
375                                self.shadow.insert(uid, flags);
376                            }
377                        }
378                        trace!(
379                            "watch: baseline shadow seeded with {} uids",
380                            self.shadow.len(),
381                        );
382                        self.state = State::BeginIdle;
383                    }
384                    ImapCoroutineState::Complete(Err(err)) => {
385                        return ImapCoroutineState::Complete(Err(err.into()));
386                    }
387                },
388
389                State::BeginIdle => {
390                    if self.shutdown.load(Ordering::SeqCst) {
391                        return ImapCoroutineState::Complete(Ok(()));
392                    }
393
394                    self.idle_done.store(false, Ordering::SeqCst);
395                    self.idle_saw_data = false;
396                    let idle = ImapIdle::new(self.idle_done.clone(), ImapIdleOptions::default());
397                    self.state = State::Idle(idle);
398                }
399
400                State::Idle(mut idle) => match idle.resume(fragmentizer, arg.take()) {
401                    ImapCoroutineState::Yielded(ImapIdleYield::Event(_)) => {
402                        trace!("watch: IDLE saw untagged data");
403                        self.idle_saw_data = true;
404                        self.idle_done.store(true, Ordering::SeqCst);
405                        self.state = State::Idle(idle);
406                    }
407                    ImapCoroutineState::Yielded(ImapIdleYield::WantsRead) => {
408                        self.state = State::Idle(idle);
409                        return ImapCoroutineState::Yielded(ImapMailboxWatchYield::WantsRead);
410                    }
411                    ImapCoroutineState::Yielded(ImapIdleYield::WantsWrite(bytes)) => {
412                        self.state = State::Idle(idle);
413                        return ImapCoroutineState::Yielded(ImapMailboxWatchYield::WantsWrite(
414                            bytes,
415                        ));
416                    }
417                    ImapCoroutineState::Complete(Ok(())) => {
418                        if self.shutdown.load(Ordering::SeqCst) {
419                            return ImapCoroutineState::Complete(Ok(()));
420                        }
421
422                        if self.idle_saw_data {
423                            // SAFETY: uid_validity is set by SelectInitial
424                            let uid_validity = self.uid_validity.unwrap();
425                            let modseq = NonZeroU64::new(self.highest_mod_seq)
426                                .unwrap_or_else(|| NonZeroU64::new(1).expect("1 is non-zero"));
427                            let parameters = vec![SelectParameter::QResync {
428                                uid_validity,
429                                mod_sequence_value: modseq,
430                                known_uids: None,
431                                seq_match_data: None,
432                            }];
433                            let select = ImapMailboxSelect::new(
434                                self.mailbox.clone(),
435                                ImapMailboxSelectOptions { parameters },
436                            );
437                            self.state = State::SelectQresync(select);
438                        } else {
439                            trace!("watch: IDLE timed out with no data, restarting");
440                            self.state = State::BeginIdle;
441                        }
442                    }
443                    ImapCoroutineState::Complete(Err(err)) => {
444                        return ImapCoroutineState::Complete(Err(err.into()));
445                    }
446                },
447
448                State::SelectQresync(mut select) => match select.resume(fragmentizer, arg.take()) {
449                    ImapCoroutineState::Yielded(ImapYield::WantsRead) => {
450                        self.state = State::SelectQresync(select);
451                        return ImapCoroutineState::Yielded(ImapMailboxWatchYield::WantsRead);
452                    }
453                    ImapCoroutineState::Yielded(ImapYield::WantsWrite(bytes)) => {
454                        self.state = State::SelectQresync(select);
455                        return ImapCoroutineState::Yielded(ImapMailboxWatchYield::WantsWrite(
456                            bytes,
457                        ));
458                    }
459                    ImapCoroutineState::Complete(Ok(data)) => {
460                        self.compute_deltas(&data);
461                        if let Some(new_modseq) = data.highest_mod_seq {
462                            self.highest_mod_seq = new_modseq;
463                        }
464                        self.state = State::EmitDeltas;
465                    }
466                    ImapCoroutineState::Complete(Err(err)) => {
467                        return ImapCoroutineState::Complete(Err(err.into()));
468                    }
469                },
470
471                State::EmitDeltas => {
472                    if let Some(event) = self.pending.pop_front() {
473                        self.state = State::EmitDeltas;
474                        return ImapCoroutineState::Yielded(ImapMailboxWatchYield::Event(event));
475                    }
476                    self.state = State::BeginIdle;
477                }
478
479                State::Terminal => {
480                    self.state = State::Terminal;
481                    return ImapCoroutineState::Complete(Ok(()));
482                }
483            }
484        }
485    }
486}
487
488/// Extract the UID and flag list from a single FETCH; preserves wire
489/// order, drops non-`Flag` variants of [`FlagFetch`].
490fn extract_uid_flags(
491    items: &[MessageDataItem<'static>],
492) -> (Option<NonZeroU32>, Vec<Flag<'static>>) {
493    let mut uid = None;
494    let mut flags = Vec::new();
495    for item in items {
496        match item {
497            MessageDataItem::Uid(u) => uid = Some(*u),
498            MessageDataItem::Flags(fs) => {
499                flags = fs
500                    .iter()
501                    .filter_map(|f| match f {
502                        FlagFetch::Flag(flag) => Some(flag.clone()),
503                        _ => None,
504                    })
505                    .collect();
506            }
507            _ => {}
508        }
509    }
510    (uid, flags)
511}