Skip to main content

daaki_imap/connection/
uid_ops.rs

1#![allow(clippy::wildcard_imports)]
2use super::*;
3use crate::types::validated::ParsedUidSet;
4
5impl ImapConnection {
6    // -----------------------------------------------------------------------
7    // Message operations (UID variants)
8    // -----------------------------------------------------------------------
9
10    /// UID FETCH (RFC 3501 Section 6.4.5).
11    ///
12    /// Thin wrapper around [`uid_fetch_streaming`](Self::uid_fetch_streaming)
13    /// that collects all responses into a `Vec`. Logs a warning when the
14    /// buffered data exceeds 10 MB to nudge callers toward the streaming
15    /// variant for large result sets.
16    pub async fn uid_fetch(
17        &self,
18        sequence_set: &SequenceSet,
19        items: &[FetchAttr],
20        timeout: Duration,
21    ) -> Result<Vec<FetchResponse>, Error> {
22        let (tx, mut rx) = tokio::sync::mpsc::channel(256);
23        let fetch_fut = self.uid_fetch_streaming(sequence_set, items, tx, timeout);
24        let collect_fut = async {
25            let mut out = Vec::new();
26            let mut total_bytes: usize = 0;
27            let mut warned = false;
28            while let Some(resp) = rx.recv().await {
29                let resp = resp?;
30                total_bytes += dispatch::estimate_fetch_response_bytes(&resp);
31                if !warned && total_bytes > dispatch::DEFAULT_FETCH_WARN_BYTES {
32                    tracing::warn!(
33                        estimated_bytes = total_bytes,
34                        threshold = dispatch::DEFAULT_FETCH_WARN_BYTES,
35                        "uid_fetch buffered >{} MB — consider uid_fetch_streaming",
36                        dispatch::DEFAULT_FETCH_WARN_BYTES / (1024 * 1024),
37                    );
38                    warned = true;
39                }
40                out.push(resp);
41            }
42            Ok::<_, Error>(out)
43        };
44        let (fetch_result, collect_result) = tokio::join!(fetch_fut, collect_fut);
45        fetch_result?;
46        collect_result
47    }
48
49    /// UID FETCH with CHANGEDSINCE modifier (RFC 7162 Section 3.1.4).
50    ///
51    /// Returns only messages whose mod-sequence is greater than `mod_seq`.
52    /// Requires the server to support CONDSTORE (RFC 7162).
53    pub async fn uid_fetch_changed_since(
54        &self,
55        sequence_set: &SequenceSet,
56        items: &[FetchAttr],
57        mod_seq: u64,
58        timeout: Duration,
59    ) -> Result<Vec<FetchResponse>, Error> {
60        self.validate_requested_fetch_items(items)?;
61        // RFC 5182 Section 2: `$` references saved search results and requires SEARCHRES.
62        if sequence_set.as_str().contains('$') {
63            self.require_searchres()?;
64        }
65        self.fetch_impl(
66            Command::UidFetch {
67                sequence_set: sequence_set.clone(),
68                items: format_fetch_attrs(items),
69                changed_since: Some(mod_seq),
70                vanished: false,
71            },
72            Some(mod_seq),
73            timeout,
74        )
75        .await
76    }
77
78    /// Shared implementation for FETCH and UID FETCH (RFC 3501 Section 6.4.5).
79    ///
80    /// When `changed_since` is `Some`, validates that the server supports
81    /// CONDSTORE (RFC 7162 Section 3.1.4) before issuing the command.
82    pub(super) async fn fetch_impl(
83        &self,
84        cmd: Command,
85        changed_since: Option<u64>,
86        timeout: Duration,
87    ) -> Result<Vec<FetchResponse>, Error> {
88        self.require_state(&[SessionState::Selected])?;
89        if changed_since.is_some() {
90            self.require_condstore()?;
91        }
92        tokio::time::timeout(
93            timeout,
94            self.submit_regular(cmd, dispatch::FetchConsumer::new()),
95        )
96        .await
97        .map_err(|_| Error::Timeout)?
98    }
99
100    /// UID FETCH with CHANGEDSINCE and VANISHED modifiers (RFC 7162 Section 3.2.6).
101    ///
102    /// Issues `UID FETCH <set> (<items>) (CHANGEDSINCE <mod_seq> VANISHED)`.
103    /// The server returns `VANISHED (EARLIER)` responses for UIDs in the
104    /// requested set that have been expunged since `mod_seq`, plus regular
105    /// FETCH responses for messages whose flags changed.
106    ///
107    /// `VANISHED (EARLIER)` UIDs are defensively filtered to only include
108    /// UIDs within the requested `sequence_set` — non-conformant servers
109    /// may return UIDs outside the set (RFC 7162 Section 3.2.6). When the
110    /// sequence set contains `$` (RFC 5182 search result reference),
111    /// filtering is best-effort (skipped, since `$` cannot be resolved
112    /// client-side).
113    ///
114    /// Requires:
115    /// - QRESYNC must have been `ENABLE`d (RFC 7162 Section 3.2.6).
116    /// - The mailbox must be selected.
117    pub async fn uid_fetch_vanished(
118        &self,
119        sequence_set: &SequenceSet,
120        items: &[FetchAttr],
121        mod_seq: u64,
122        timeout: Duration,
123    ) -> Result<(Vec<FetchResponse>, Vec<UidRange>), Error> {
124        self.require_state(&[SessionState::Selected])?;
125        self.validate_requested_fetch_items(items)?;
126        // RFC 5182 Section 2: `$` references saved search results and requires SEARCHRES.
127        if sequence_set.as_str().contains('$') {
128            self.require_searchres()?;
129        }
130        // RFC 7162 Section 3.2.6: VANISHED modifier requires QRESYNC to be ENABLEd.
131        {
132            let snap = self.state_rx.borrow();
133            if !snap.enabled.iter().any(|e| e == "QRESYNC") {
134                return Err(Error::MissingCapability("QRESYNC (not ENABLEd)".into()));
135            }
136        }
137        // Parse the requested set for defensive filtering of VANISHED (EARLIER)
138        // UIDs. Returns None when the set contains `$` (RFC 5182), in which case
139        // filtering is skipped inside the consumer.
140        let parsed_set = ParsedUidSet::new(sequence_set);
141        let cmd = Command::UidFetch {
142            sequence_set: sequence_set.clone(),
143            items: format_fetch_attrs(items),
144            changed_since: Some(mod_seq),
145            vanished: true,
146        };
147        tokio::time::timeout(
148            timeout,
149            self.submit_regular(cmd, dispatch::FetchVanishedConsumer::new(parsed_set)),
150        )
151        .await
152        .map_err(|_| Error::Timeout)?
153    }
154
155    /// UID FETCH streaming (RFC 3501 Section 6.4.5).
156    ///
157    /// Pushes each [`FetchResponse`] through the provided `tx` channel as it
158    /// arrives from the server, rather than buffering the entire result set
159    /// in memory. The channel is closed when the tagged OK is received
160    /// (the consumer drops `tx` in [`finalize`]).
161    ///
162    /// Callers should read from the corresponding `rx` concurrently or drain
163    /// it after this method returns — responses are sent via `try_send`, so
164    /// the channel buffer must be large enough to hold in-flight data.
165    ///
166    /// Prefer this over [`uid_fetch`](Self::uid_fetch) when the result set
167    /// may be large enough to cause memory pressure.
168    pub async fn uid_fetch_streaming(
169        &self,
170        sequence_set: &SequenceSet,
171        items: &[FetchAttr],
172        tx: tokio::sync::mpsc::Sender<Result<FetchResponse, Error>>,
173        timeout: Duration,
174    ) -> Result<(), Error> {
175        self.validate_requested_fetch_items(items)?;
176        // RFC 5182 Section 2: `$` references saved search results and requires SEARCHRES.
177        if sequence_set.as_str().contains('$') {
178            self.require_searchres()?;
179        }
180        self.fetch_streaming_impl(
181            Command::UidFetch {
182                sequence_set: sequence_set.clone(),
183                items: format_fetch_attrs(items),
184                changed_since: None,
185                vanished: false,
186            },
187            tx,
188            timeout,
189        )
190        .await
191    }
192
193    /// Shared implementation for streaming FETCH and UID FETCH
194    /// (RFC 3501 Section 6.4.5).
195    ///
196    /// Validates session state and dispatches the command with a
197    /// [`StreamingFetchConsumer`](dispatch::StreamingFetchConsumer) that
198    /// pushes each response through `tx`.
199    pub(super) async fn fetch_streaming_impl(
200        &self,
201        cmd: Command,
202        tx: tokio::sync::mpsc::Sender<Result<FetchResponse, Error>>,
203        timeout: Duration,
204    ) -> Result<(), Error> {
205        self.require_state(&[SessionState::Selected])?;
206        tokio::time::timeout(
207            timeout,
208            self.submit_regular(cmd, dispatch::StreamingFetchConsumer::new(tx)),
209        )
210        .await
211        .map_err(|_| Error::Timeout)?
212    }
213
214    /// UID SEARCH (RFC 3501 Section 6.4.4).
215    ///
216    /// `criteria` is the raw IMAP search criteria string, e.g. `"UNSEEN"` or
217    /// `"FROM \"alice\" SINCE 1-Mar-2026"`.
218    /// Returns a [`SearchResult`] containing matching UIDs and an optional
219    /// MODSEQ value (RFC 7162 Section 3.1.5).
220    /// Handles both SEARCH and ESEARCH responses. For ESEARCH, the ALL uid-set
221    /// is expanded into individual UIDs for backward compatibility.
222    ///
223    /// Accepts anything that implements `AsRef<str>`, including `&str`,
224    /// `String`, and [`SearchCriteria`](crate::types::SearchCriteria).
225    pub async fn uid_search(
226        &self,
227        criteria: impl AsRef<str>,
228        timeout: Duration,
229    ) -> Result<SearchResult, Error> {
230        let criteria = criteria.as_ref();
231        self.search_impl(
232            criteria,
233            Command::UidSearch {
234                criteria: criteria.to_owned(),
235            },
236            "UID SEARCH",
237            timeout,
238        )
239        .await
240    }
241
242    /// Shared implementation for SEARCH and UID SEARCH (RFC 3501 Section 6.4.4).
243    pub(super) async fn search_impl(
244        &self,
245        criteria: &str,
246        cmd: Command,
247        _label: &str,
248        timeout: Duration,
249    ) -> Result<SearchResult, Error> {
250        self.require_state(&[SessionState::Selected])?;
251        // RFC 6855 Section 6: SEARCH criteria are free-form strings, so a
252        // UTF8=ONLY server requires ENABLE UTF8=ACCEPT before SEARCH.
253        self.check_utf8_only_enforced()?;
254        self.validate_search_criteria_capabilities(criteria)?;
255        tokio::time::timeout(
256            timeout,
257            self.submit_regular(cmd, dispatch::SearchConsumer::new()),
258        )
259        .await
260        .map_err(|_| Error::Timeout)??
261    }
262
263    /// UID SEARCH with RETURN options (RFC 4731 Section 3.2).
264    ///
265    /// Returns the full [`EsearchResponse`] with MIN, MAX, COUNT, and ALL fields.
266    /// Requires the server to advertise the `ESEARCH` capability.
267    ///
268    /// RFC 4731 Section 3.2: an extended SEARCH with RETURN options causes the
269    /// server to return a single ESEARCH response instead of a SEARCH response.
270    ///
271    /// Accepts anything that implements `AsRef<str>`, including `&str`,
272    /// `String`, and [`SearchCriteria`](crate::types::SearchCriteria).
273    pub async fn uid_search_esearch(
274        &self,
275        criteria: impl AsRef<str>,
276        return_opts: &[&str],
277        timeout: Duration,
278    ) -> Result<EsearchResponse, Error> {
279        let criteria = criteria.as_ref();
280        self.search_esearch_impl(
281            criteria,
282            Command::UidSearchReturn {
283                criteria: criteria.to_owned(),
284                return_opts: return_opts.iter().map(|s| (*s).to_owned()).collect(),
285            },
286            "UID SEARCH RETURN",
287            timeout,
288        )
289        .await
290    }
291
292    /// Shared implementation for SEARCH RETURN and UID SEARCH RETURN
293    /// (RFC 4731 Section 3.2).
294    ///
295    /// Requires the server to advertise the `ESEARCH` capability, or to be
296    /// running `IMAP4rev2` (RFC 9051 Section 6.4.4).
297    pub(super) async fn search_esearch_impl(
298        &self,
299        criteria: &str,
300        cmd: Command,
301        _label: &str,
302        timeout: Duration,
303    ) -> Result<EsearchResponse, Error> {
304        self.require_state(&[SessionState::Selected])?;
305        // RFC 6855 Section 6: ESEARCH is an extended SEARCH form and uses the
306        // same free-form criteria strings, so it must honor UTF8=ONLY too.
307        self.check_utf8_only_enforced()?;
308        self.validate_search_criteria_capabilities(criteria)?;
309        {
310            let snap = self.state_rx.borrow();
311            // ESEARCH is a base feature in IMAP4rev2 (RFC 9051 Section 6.4.4).
312            if !snap.capabilities.contains(&Capability::Esearch)
313                && !super::auth::is_rev2_from_snapshot(&snap)
314            {
315                return Err(Error::MissingCapability("ESEARCH".into()));
316            }
317        }
318        // RFC 5182 Section 2 extends SEARCH RETURN with the SAVE result option.
319        // Generic ESEARCH callers can request SAVE through return_opts, so
320        // SEARCHRES remains mandatory unless IMAP4rev2 folds it into the base
321        // protocol (RFC 9051 Appendix E).
322        if Self::search_return_requests_save(&cmd) {
323            self.require_searchres()?;
324        }
325        tokio::time::timeout(
326            timeout,
327            self.submit_regular(cmd, dispatch::EsearchConsumer::new()),
328        )
329        .await
330        .map_err(|_| Error::Timeout)??
331    }
332
333    /// UID STORE (RFC 3501 Section 6.4.6).
334    ///
335    /// If `unchanged_since` is `Some`, uses CONDSTORE (RFC 7162 Section 3.1.3)
336    /// to avoid overwriting concurrent flag changes.
337    ///
338    /// `\Recent` and `\*` are silently filtered per RFC 3501 Section 2.3.2
339    /// (they are not valid in STORE flag lists).
340    pub async fn uid_store(
341        &self,
342        sequence_set: &SequenceSet,
343        operation: StoreOperation,
344        flags: &[Flag],
345        unchanged_since: Option<u64>,
346        timeout: Duration,
347    ) -> Result<StoreResult, Error> {
348        // RFC 5182 Section 2: `$` references saved search results and requires SEARCHRES.
349        if sequence_set.as_str().contains('$') {
350            self.require_searchres()?;
351        }
352        // RFC 3501 Section 2.3.2 / Section 9: \Recent is server-only and \*
353        // is not valid in STORE flag lists. Filter them out before encoding,
354        // consistent with append() (RFC 3501 Section 6.3.11).
355        let filtered_flags = filter_store_flags(flags);
356        self.store_impl(
357            Command::UidStore {
358                sequence_set: sequence_set.clone(),
359                operation,
360                flags: filtered_flags,
361                unchanged_since,
362            },
363            unchanged_since,
364            timeout,
365        )
366        .await
367    }
368
369    /// Shared implementation for STORE and UID STORE (RFC 3501 Section 6.4.6).
370    ///
371    /// Validates session state, optionally checks CONDSTORE, executes the
372    /// command, and collects the resulting FETCH responses and response code.
373    pub(super) async fn store_impl(
374        &self,
375        cmd: Command,
376        unchanged_since: Option<u64>,
377        timeout: Duration,
378    ) -> Result<StoreResult, Error> {
379        self.require_state(&[SessionState::Selected])?;
380        if unchanged_since.is_some() {
381            self.require_condstore()?;
382        }
383        tokio::time::timeout(
384            timeout,
385            self.submit_regular(cmd, dispatch::StoreConsumer::new()),
386        )
387        .await
388        .map_err(|_| Error::Timeout)?
389    }
390
391    /// UID MOVE (RFC 6851 Section 3, RFC 9051 Appendix E item 2).
392    ///
393    /// Falls back to COPY + Store `\Deleted` + UID EXPUNGE (RFC 6851 Section 4)
394    /// when the server doesn't advertise the MOVE capability and isn't `IMAP4rev2`.
395    /// The fallback requires UIDPLUS (RFC 4315) — plain EXPUNGE is never used
396    /// because it can delete unrelated `\Deleted` messages (data-loss risk).
397    ///
398    /// Returns a [`MoveResult`] containing:
399    /// - `code`: the server's response code, typically `[COPYUID ...]` per
400    ///   RFC 6851 Section 4.3 / RFC 4315 Section 3.
401    /// - `expunged`: the EXPUNGE or VANISHED responses (RFC 6851 Section 3).
402    ///   When QRESYNC is enabled (RFC 7162 Section 3.2.10), the server sends
403    ///   VANISHED instead of EXPUNGE.
404    pub async fn uid_move_messages(
405        &self,
406        sequence_set: &SequenceSet,
407        mailbox: &str,
408        timeout: Duration,
409    ) -> Result<MoveResult, Error> {
410        self.require_state(&[SessionState::Selected])?;
411        self.check_utf8_only_enforced()?;
412        // RFC 5182 Section 2: `$` references saved search results and requires SEARCHRES.
413        if sequence_set.as_str().contains('$') {
414            self.require_searchres()?;
415        }
416
417        // Read capabilities from snapshot to decide which path to take.
418        let (has_move, has_uidplus, is_rev2) = {
419            let snap = self.state_rx.borrow();
420            (
421                snap.capabilities.contains(&Capability::Move),
422                snap.capabilities.contains(&Capability::UidPlus),
423                super::auth::is_rev2_from_snapshot(&snap),
424            )
425        };
426
427        // MOVE is a base feature in IMAP4rev2 (RFC 9051 Appendix E item 2).
428        if has_move || is_rev2 {
429            let mbox = MailboxName::new(mailbox)?;
430            self.move_native_impl(
431                Command::UidMove {
432                    sequence_set: sequence_set.clone(),
433                    mailbox: mbox,
434                },
435                timeout,
436            )
437            .await
438        } else if has_uidplus {
439            // Fallback: COPY + STORE +FLAGS.SILENT \Deleted + UID EXPUNGE
440            // per RFC 6851 Section 3.3.  The `.SILENT` modifier is required
441            // to suppress the implicit untagged FETCH responses that a plain
442            // `+FLAGS` would trigger (RFC 3501 Section 6.4.6).  Only safe
443            // with UID EXPUNGE which targets specific UIDs.
444            // Capture the COPYUID response code from uid_copy
445            // (RFC 6851 Section 4.3, RFC 4315 Section 3).
446            let copy_result = self.uid_copy(sequence_set, mailbox, timeout).await?;
447            self.uid_store(
448                sequence_set,
449                StoreOperation::AddSilent,
450                &[Flag::Deleted],
451                Option::None,
452                timeout,
453            )
454            .await?;
455            let expunged = self.uid_expunge(sequence_set, timeout).await?;
456            Ok(MoveResult {
457                code: copy_result.code,
458                expunged,
459            })
460        } else {
461            // No MOVE, no UIDPLUS — cannot safely move messages.
462            // Plain EXPUNGE would delete ALL \Deleted messages, not just the
463            // requested set (RFC 9051 Section 6.4.3).
464            Err(Error::MissingCapability(
465                "MOVE or UIDPLUS (plain EXPUNGE is unsafe for move fallback)".into(),
466            ))
467        }
468    }
469
470    /// Shared implementation for the native MOVE path used by both
471    /// `move_messages()` and `uid_move_messages()` (RFC 6851 Section 3).
472    ///
473    /// Executes the MOVE command and collects EXPUNGE/VANISHED responses.
474    pub(super) async fn move_native_impl(
475        &self,
476        cmd: Command,
477        timeout: Duration,
478    ) -> Result<MoveResult, Error> {
479        tokio::time::timeout(
480            timeout,
481            self.submit_regular(cmd, dispatch::MoveConsumer::new()),
482        )
483        .await
484        .map_err(|_| Error::Timeout)?
485    }
486
487    /// UID COPY (RFC 3501 Section 6.4.7, RFC 4315 Section 3).
488    ///
489    /// On success, returns the server's response code, which SHOULD be
490    /// `[COPYUID uid-validity source-uids dest-uids]` per RFC 4315 Section 3.
491    pub async fn uid_copy(
492        &self,
493        sequence_set: &SequenceSet,
494        mailbox: &str,
495        timeout: Duration,
496    ) -> Result<CopyResult, Error> {
497        // RFC 5182 Section 2: `$` references saved search results and requires SEARCHRES.
498        if sequence_set.as_str().contains('$') {
499            self.require_searchres()?;
500        }
501        let mbox = MailboxName::new(mailbox)?;
502        self.copy_impl(
503            Command::UidCopy {
504                sequence_set: sequence_set.clone(),
505                mailbox: mbox,
506            },
507            timeout,
508        )
509        .await
510    }
511
512    /// Shared implementation for COPY and UID COPY (RFC 3501 Section 6.4.7,
513    /// RFC 4315 Section 3).
514    ///
515    /// On success, returns a [`CopyResult`] containing the server's response
516    /// code, which SHOULD be `[COPYUID uid-validity source-uids dest-uids]`
517    /// per RFC 4315 Section 3.
518    pub(super) async fn copy_impl(
519        &self,
520        cmd: Command,
521        timeout: Duration,
522    ) -> Result<CopyResult, Error> {
523        self.require_state(&[SessionState::Selected])?;
524        self.check_utf8_only_enforced()?;
525        tokio::time::timeout(
526            timeout,
527            self.submit_regular(cmd, dispatch::CopyConsumer::new()),
528        )
529        .await
530        .map_err(|_| Error::Timeout)?
531    }
532
533    /// UID EXPUNGE (RFC 4315 UIDPLUS / RFC 9051 Section 6.4.9).
534    pub async fn uid_expunge(
535        &self,
536        sequence_set: &SequenceSet,
537        timeout: Duration,
538    ) -> Result<ExpungeResult, Error> {
539        self.require_state(&[SessionState::Selected])?;
540        // RFC 5182 Section 2: `$` references saved search results and requires SEARCHRES.
541        if sequence_set.as_str().contains('$') {
542            self.require_searchres()?;
543        }
544        {
545            let snap = self.state_rx.borrow();
546            // RFC 4315 Section 2: UID EXPUNGE requires the UIDPLUS extension.
547            // RFC 9051 Appendix E item 3: IMAP4rev2 incorporates UIDPLUS into
548            // the base command set, so rev2 servers implicitly support it.
549            if !snap.capabilities.contains(&Capability::UidPlus)
550                && !super::auth::is_rev2_from_snapshot(&snap)
551            {
552                return Err(Error::MissingCapability("UIDPLUS".into()));
553            }
554        }
555        let cmd = Command::UidExpunge {
556            sequence_set: sequence_set.clone(),
557        };
558        tokio::time::timeout(
559            timeout,
560            self.submit_regular(cmd, dispatch::ExpungeConsumer::new()),
561        )
562        .await
563        .map_err(|_| Error::Timeout)?
564    }
565
566    /// EXPUNGE (RFC 3501 Section 6.4.3 / RFC 7162 Section 3.2.10).
567    ///
568    /// Without QRESYNC, returns `ExpungeResult::Expunged` with the sequence
569    /// numbers of removed messages (`* n EXPUNGE`, RFC 3501 Section 7.4.1).
570    ///
571    /// After `ENABLE QRESYNC` (RFC 7162 Section 3.2.10), the server sends
572    /// `VANISHED` responses instead of `EXPUNGE`, and this method returns
573    /// `ExpungeResult::Vanished` with UID ranges.
574    pub async fn expunge(&self, timeout: Duration) -> Result<ExpungeResult, Error> {
575        self.require_state(&[SessionState::Selected])?;
576        tokio::time::timeout(
577            timeout,
578            self.submit_regular(Command::Expunge, dispatch::ExpungeConsumer::new()),
579        )
580        .await
581        .map_err(|_| Error::Timeout)?
582    }
583}