daaki_imap/connection/uid_ops.rs
1#![allow(clippy::wildcard_imports)]
2use super::*;
3use crate::types::validated::ParsedUidSet;
4
5impl ImapConnection {
6 // -----------------------------------------------------------------------
7 // Message operations (UID variants)
8 // -----------------------------------------------------------------------
9
10 /// UID FETCH (RFC 3501 Section 6.4.5).
11 ///
12 /// Thin wrapper around [`uid_fetch_streaming`](Self::uid_fetch_streaming)
13 /// that collects all responses into a `Vec`. Logs a warning when the
14 /// buffered data exceeds 10 MB to nudge callers toward the streaming
15 /// variant for large result sets.
16 pub async fn uid_fetch(
17 &self,
18 sequence_set: &SequenceSet,
19 items: &[FetchAttr],
20 timeout: Duration,
21 ) -> Result<Vec<FetchResponse>, Error> {
22 let (tx, mut rx) = tokio::sync::mpsc::channel(256);
23 let fetch_fut = self.uid_fetch_streaming(sequence_set, items, tx, timeout);
24 let collect_fut = async {
25 let mut out = Vec::new();
26 let mut total_bytes: usize = 0;
27 let mut warned = false;
28 while let Some(resp) = rx.recv().await {
29 let resp = resp?;
30 total_bytes += dispatch::estimate_fetch_response_bytes(&resp);
31 if !warned && total_bytes > dispatch::DEFAULT_FETCH_WARN_BYTES {
32 tracing::warn!(
33 estimated_bytes = total_bytes,
34 threshold = dispatch::DEFAULT_FETCH_WARN_BYTES,
35 "uid_fetch buffered >{} MB — consider uid_fetch_streaming",
36 dispatch::DEFAULT_FETCH_WARN_BYTES / (1024 * 1024),
37 );
38 warned = true;
39 }
40 out.push(resp);
41 }
42 Ok::<_, Error>(out)
43 };
44 let (fetch_result, collect_result) = tokio::join!(fetch_fut, collect_fut);
45 fetch_result?;
46 collect_result
47 }
48
49 /// UID FETCH with CHANGEDSINCE modifier (RFC 7162 Section 3.1.4).
50 ///
51 /// Returns only messages whose mod-sequence is greater than `mod_seq`.
52 /// Requires the server to support CONDSTORE (RFC 7162).
53 pub async fn uid_fetch_changed_since(
54 &self,
55 sequence_set: &SequenceSet,
56 items: &[FetchAttr],
57 mod_seq: u64,
58 timeout: Duration,
59 ) -> Result<Vec<FetchResponse>, Error> {
60 self.validate_requested_fetch_items(items)?;
61 // RFC 5182 Section 2: `$` references saved search results and requires SEARCHRES.
62 if sequence_set.as_str().contains('$') {
63 self.require_searchres()?;
64 }
65 self.fetch_impl(
66 Command::UidFetch {
67 sequence_set: sequence_set.clone(),
68 items: format_fetch_attrs(items),
69 changed_since: Some(mod_seq),
70 vanished: false,
71 },
72 Some(mod_seq),
73 timeout,
74 )
75 .await
76 }
77
78 /// Shared implementation for FETCH and UID FETCH (RFC 3501 Section 6.4.5).
79 ///
80 /// When `changed_since` is `Some`, validates that the server supports
81 /// CONDSTORE (RFC 7162 Section 3.1.4) before issuing the command.
82 pub(super) async fn fetch_impl(
83 &self,
84 cmd: Command,
85 changed_since: Option<u64>,
86 timeout: Duration,
87 ) -> Result<Vec<FetchResponse>, Error> {
88 self.require_state(&[SessionState::Selected])?;
89 if changed_since.is_some() {
90 self.require_condstore()?;
91 }
92 tokio::time::timeout(
93 timeout,
94 self.submit_regular(cmd, dispatch::FetchConsumer::new()),
95 )
96 .await
97 .map_err(|_| Error::Timeout)?
98 }
99
100 /// UID FETCH with CHANGEDSINCE and VANISHED modifiers (RFC 7162 Section 3.2.6).
101 ///
102 /// Issues `UID FETCH <set> (<items>) (CHANGEDSINCE <mod_seq> VANISHED)`.
103 /// The server returns `VANISHED (EARLIER)` responses for UIDs in the
104 /// requested set that have been expunged since `mod_seq`, plus regular
105 /// FETCH responses for messages whose flags changed.
106 ///
107 /// `VANISHED (EARLIER)` UIDs are defensively filtered to only include
108 /// UIDs within the requested `sequence_set` — non-conformant servers
109 /// may return UIDs outside the set (RFC 7162 Section 3.2.6). When the
110 /// sequence set contains `$` (RFC 5182 search result reference),
111 /// filtering is best-effort (skipped, since `$` cannot be resolved
112 /// client-side).
113 ///
114 /// Requires:
115 /// - QRESYNC must have been `ENABLE`d (RFC 7162 Section 3.2.6).
116 /// - The mailbox must be selected.
117 pub async fn uid_fetch_vanished(
118 &self,
119 sequence_set: &SequenceSet,
120 items: &[FetchAttr],
121 mod_seq: u64,
122 timeout: Duration,
123 ) -> Result<(Vec<FetchResponse>, Vec<UidRange>), Error> {
124 self.require_state(&[SessionState::Selected])?;
125 self.validate_requested_fetch_items(items)?;
126 // RFC 5182 Section 2: `$` references saved search results and requires SEARCHRES.
127 if sequence_set.as_str().contains('$') {
128 self.require_searchres()?;
129 }
130 // RFC 7162 Section 3.2.6: VANISHED modifier requires QRESYNC to be ENABLEd.
131 {
132 let snap = self.state_rx.borrow();
133 if !snap.enabled.iter().any(|e| e == "QRESYNC") {
134 return Err(Error::MissingCapability("QRESYNC (not ENABLEd)".into()));
135 }
136 }
137 // Parse the requested set for defensive filtering of VANISHED (EARLIER)
138 // UIDs. Returns None when the set contains `$` (RFC 5182), in which case
139 // filtering is skipped inside the consumer.
140 let parsed_set = ParsedUidSet::new(sequence_set);
141 let cmd = Command::UidFetch {
142 sequence_set: sequence_set.clone(),
143 items: format_fetch_attrs(items),
144 changed_since: Some(mod_seq),
145 vanished: true,
146 };
147 tokio::time::timeout(
148 timeout,
149 self.submit_regular(cmd, dispatch::FetchVanishedConsumer::new(parsed_set)),
150 )
151 .await
152 .map_err(|_| Error::Timeout)?
153 }
154
155 /// UID FETCH streaming (RFC 3501 Section 6.4.5).
156 ///
157 /// Pushes each [`FetchResponse`] through the provided `tx` channel as it
158 /// arrives from the server, rather than buffering the entire result set
159 /// in memory. The channel is closed when the tagged OK is received
160 /// (the consumer drops `tx` in [`finalize`]).
161 ///
162 /// Callers should read from the corresponding `rx` concurrently or drain
163 /// it after this method returns — responses are sent via `try_send`, so
164 /// the channel buffer must be large enough to hold in-flight data.
165 ///
166 /// Prefer this over [`uid_fetch`](Self::uid_fetch) when the result set
167 /// may be large enough to cause memory pressure.
168 pub async fn uid_fetch_streaming(
169 &self,
170 sequence_set: &SequenceSet,
171 items: &[FetchAttr],
172 tx: tokio::sync::mpsc::Sender<Result<FetchResponse, Error>>,
173 timeout: Duration,
174 ) -> Result<(), Error> {
175 self.validate_requested_fetch_items(items)?;
176 // RFC 5182 Section 2: `$` references saved search results and requires SEARCHRES.
177 if sequence_set.as_str().contains('$') {
178 self.require_searchres()?;
179 }
180 self.fetch_streaming_impl(
181 Command::UidFetch {
182 sequence_set: sequence_set.clone(),
183 items: format_fetch_attrs(items),
184 changed_since: None,
185 vanished: false,
186 },
187 tx,
188 timeout,
189 )
190 .await
191 }
192
193 /// Shared implementation for streaming FETCH and UID FETCH
194 /// (RFC 3501 Section 6.4.5).
195 ///
196 /// Validates session state and dispatches the command with a
197 /// [`StreamingFetchConsumer`](dispatch::StreamingFetchConsumer) that
198 /// pushes each response through `tx`.
199 pub(super) async fn fetch_streaming_impl(
200 &self,
201 cmd: Command,
202 tx: tokio::sync::mpsc::Sender<Result<FetchResponse, Error>>,
203 timeout: Duration,
204 ) -> Result<(), Error> {
205 self.require_state(&[SessionState::Selected])?;
206 tokio::time::timeout(
207 timeout,
208 self.submit_regular(cmd, dispatch::StreamingFetchConsumer::new(tx)),
209 )
210 .await
211 .map_err(|_| Error::Timeout)?
212 }
213
214 /// UID SEARCH (RFC 3501 Section 6.4.4).
215 ///
216 /// `criteria` is the raw IMAP search criteria string, e.g. `"UNSEEN"` or
217 /// `"FROM \"alice\" SINCE 1-Mar-2026"`.
218 /// Returns a [`SearchResult`] containing matching UIDs and an optional
219 /// MODSEQ value (RFC 7162 Section 3.1.5).
220 /// Handles both SEARCH and ESEARCH responses. For ESEARCH, the ALL uid-set
221 /// is expanded into individual UIDs for backward compatibility.
222 ///
223 /// Accepts anything that implements `AsRef<str>`, including `&str`,
224 /// `String`, and [`SearchCriteria`](crate::types::SearchCriteria).
225 pub async fn uid_search(
226 &self,
227 criteria: impl AsRef<str>,
228 timeout: Duration,
229 ) -> Result<SearchResult, Error> {
230 let criteria = criteria.as_ref();
231 self.search_impl(
232 criteria,
233 Command::UidSearch {
234 criteria: criteria.to_owned(),
235 },
236 "UID SEARCH",
237 timeout,
238 )
239 .await
240 }
241
242 /// Shared implementation for SEARCH and UID SEARCH (RFC 3501 Section 6.4.4).
243 pub(super) async fn search_impl(
244 &self,
245 criteria: &str,
246 cmd: Command,
247 _label: &str,
248 timeout: Duration,
249 ) -> Result<SearchResult, Error> {
250 self.require_state(&[SessionState::Selected])?;
251 // RFC 6855 Section 6: SEARCH criteria are free-form strings, so a
252 // UTF8=ONLY server requires ENABLE UTF8=ACCEPT before SEARCH.
253 self.check_utf8_only_enforced()?;
254 self.validate_search_criteria_capabilities(criteria)?;
255 tokio::time::timeout(
256 timeout,
257 self.submit_regular(cmd, dispatch::SearchConsumer::new()),
258 )
259 .await
260 .map_err(|_| Error::Timeout)??
261 }
262
263 /// UID SEARCH with RETURN options (RFC 4731 Section 3.2).
264 ///
265 /// Returns the full [`EsearchResponse`] with MIN, MAX, COUNT, and ALL fields.
266 /// Requires the server to advertise the `ESEARCH` capability.
267 ///
268 /// RFC 4731 Section 3.2: an extended SEARCH with RETURN options causes the
269 /// server to return a single ESEARCH response instead of a SEARCH response.
270 ///
271 /// Accepts anything that implements `AsRef<str>`, including `&str`,
272 /// `String`, and [`SearchCriteria`](crate::types::SearchCriteria).
273 pub async fn uid_search_esearch(
274 &self,
275 criteria: impl AsRef<str>,
276 return_opts: &[&str],
277 timeout: Duration,
278 ) -> Result<EsearchResponse, Error> {
279 let criteria = criteria.as_ref();
280 self.search_esearch_impl(
281 criteria,
282 Command::UidSearchReturn {
283 criteria: criteria.to_owned(),
284 return_opts: return_opts.iter().map(|s| (*s).to_owned()).collect(),
285 },
286 "UID SEARCH RETURN",
287 timeout,
288 )
289 .await
290 }
291
292 /// Shared implementation for SEARCH RETURN and UID SEARCH RETURN
293 /// (RFC 4731 Section 3.2).
294 ///
295 /// Requires the server to advertise the `ESEARCH` capability, or to be
296 /// running `IMAP4rev2` (RFC 9051 Section 6.4.4).
297 pub(super) async fn search_esearch_impl(
298 &self,
299 criteria: &str,
300 cmd: Command,
301 _label: &str,
302 timeout: Duration,
303 ) -> Result<EsearchResponse, Error> {
304 self.require_state(&[SessionState::Selected])?;
305 // RFC 6855 Section 6: ESEARCH is an extended SEARCH form and uses the
306 // same free-form criteria strings, so it must honor UTF8=ONLY too.
307 self.check_utf8_only_enforced()?;
308 self.validate_search_criteria_capabilities(criteria)?;
309 {
310 let snap = self.state_rx.borrow();
311 // ESEARCH is a base feature in IMAP4rev2 (RFC 9051 Section 6.4.4).
312 if !snap.capabilities.contains(&Capability::Esearch)
313 && !super::auth::is_rev2_from_snapshot(&snap)
314 {
315 return Err(Error::MissingCapability("ESEARCH".into()));
316 }
317 }
318 // RFC 5182 Section 2 extends SEARCH RETURN with the SAVE result option.
319 // Generic ESEARCH callers can request SAVE through return_opts, so
320 // SEARCHRES remains mandatory unless IMAP4rev2 folds it into the base
321 // protocol (RFC 9051 Appendix E).
322 if Self::search_return_requests_save(&cmd) {
323 self.require_searchres()?;
324 }
325 tokio::time::timeout(
326 timeout,
327 self.submit_regular(cmd, dispatch::EsearchConsumer::new()),
328 )
329 .await
330 .map_err(|_| Error::Timeout)??
331 }
332
333 /// UID STORE (RFC 3501 Section 6.4.6).
334 ///
335 /// If `unchanged_since` is `Some`, uses CONDSTORE (RFC 7162 Section 3.1.3)
336 /// to avoid overwriting concurrent flag changes.
337 ///
338 /// `\Recent` and `\*` are silently filtered per RFC 3501 Section 2.3.2
339 /// (they are not valid in STORE flag lists).
340 pub async fn uid_store(
341 &self,
342 sequence_set: &SequenceSet,
343 operation: StoreOperation,
344 flags: &[Flag],
345 unchanged_since: Option<u64>,
346 timeout: Duration,
347 ) -> Result<StoreResult, Error> {
348 // RFC 5182 Section 2: `$` references saved search results and requires SEARCHRES.
349 if sequence_set.as_str().contains('$') {
350 self.require_searchres()?;
351 }
352 // RFC 3501 Section 2.3.2 / Section 9: \Recent is server-only and \*
353 // is not valid in STORE flag lists. Filter them out before encoding,
354 // consistent with append() (RFC 3501 Section 6.3.11).
355 let filtered_flags = filter_store_flags(flags);
356 self.store_impl(
357 Command::UidStore {
358 sequence_set: sequence_set.clone(),
359 operation,
360 flags: filtered_flags,
361 unchanged_since,
362 },
363 unchanged_since,
364 timeout,
365 )
366 .await
367 }
368
369 /// Shared implementation for STORE and UID STORE (RFC 3501 Section 6.4.6).
370 ///
371 /// Validates session state, optionally checks CONDSTORE, executes the
372 /// command, and collects the resulting FETCH responses and response code.
373 pub(super) async fn store_impl(
374 &self,
375 cmd: Command,
376 unchanged_since: Option<u64>,
377 timeout: Duration,
378 ) -> Result<StoreResult, Error> {
379 self.require_state(&[SessionState::Selected])?;
380 if unchanged_since.is_some() {
381 self.require_condstore()?;
382 }
383 tokio::time::timeout(
384 timeout,
385 self.submit_regular(cmd, dispatch::StoreConsumer::new()),
386 )
387 .await
388 .map_err(|_| Error::Timeout)?
389 }
390
391 /// UID MOVE (RFC 6851 Section 3, RFC 9051 Appendix E item 2).
392 ///
393 /// Falls back to COPY + Store `\Deleted` + UID EXPUNGE (RFC 6851 Section 4)
394 /// when the server doesn't advertise the MOVE capability and isn't `IMAP4rev2`.
395 /// The fallback requires UIDPLUS (RFC 4315) — plain EXPUNGE is never used
396 /// because it can delete unrelated `\Deleted` messages (data-loss risk).
397 ///
398 /// Returns a [`MoveResult`] containing:
399 /// - `code`: the server's response code, typically `[COPYUID ...]` per
400 /// RFC 6851 Section 4.3 / RFC 4315 Section 3.
401 /// - `expunged`: the EXPUNGE or VANISHED responses (RFC 6851 Section 3).
402 /// When QRESYNC is enabled (RFC 7162 Section 3.2.10), the server sends
403 /// VANISHED instead of EXPUNGE.
404 pub async fn uid_move_messages(
405 &self,
406 sequence_set: &SequenceSet,
407 mailbox: &str,
408 timeout: Duration,
409 ) -> Result<MoveResult, Error> {
410 self.require_state(&[SessionState::Selected])?;
411 self.check_utf8_only_enforced()?;
412 // RFC 5182 Section 2: `$` references saved search results and requires SEARCHRES.
413 if sequence_set.as_str().contains('$') {
414 self.require_searchres()?;
415 }
416
417 // Read capabilities from snapshot to decide which path to take.
418 let (has_move, has_uidplus, is_rev2) = {
419 let snap = self.state_rx.borrow();
420 (
421 snap.capabilities.contains(&Capability::Move),
422 snap.capabilities.contains(&Capability::UidPlus),
423 super::auth::is_rev2_from_snapshot(&snap),
424 )
425 };
426
427 // MOVE is a base feature in IMAP4rev2 (RFC 9051 Appendix E item 2).
428 if has_move || is_rev2 {
429 let mbox = MailboxName::new(mailbox)?;
430 self.move_native_impl(
431 Command::UidMove {
432 sequence_set: sequence_set.clone(),
433 mailbox: mbox,
434 },
435 timeout,
436 )
437 .await
438 } else if has_uidplus {
439 // Fallback: COPY + STORE +FLAGS.SILENT \Deleted + UID EXPUNGE
440 // per RFC 6851 Section 3.3. The `.SILENT` modifier is required
441 // to suppress the implicit untagged FETCH responses that a plain
442 // `+FLAGS` would trigger (RFC 3501 Section 6.4.6). Only safe
443 // with UID EXPUNGE which targets specific UIDs.
444 // Capture the COPYUID response code from uid_copy
445 // (RFC 6851 Section 4.3, RFC 4315 Section 3).
446 let copy_result = self.uid_copy(sequence_set, mailbox, timeout).await?;
447 self.uid_store(
448 sequence_set,
449 StoreOperation::AddSilent,
450 &[Flag::Deleted],
451 Option::None,
452 timeout,
453 )
454 .await?;
455 let expunged = self.uid_expunge(sequence_set, timeout).await?;
456 Ok(MoveResult {
457 code: copy_result.code,
458 expunged,
459 })
460 } else {
461 // No MOVE, no UIDPLUS — cannot safely move messages.
462 // Plain EXPUNGE would delete ALL \Deleted messages, not just the
463 // requested set (RFC 9051 Section 6.4.3).
464 Err(Error::MissingCapability(
465 "MOVE or UIDPLUS (plain EXPUNGE is unsafe for move fallback)".into(),
466 ))
467 }
468 }
469
470 /// Shared implementation for the native MOVE path used by both
471 /// `move_messages()` and `uid_move_messages()` (RFC 6851 Section 3).
472 ///
473 /// Executes the MOVE command and collects EXPUNGE/VANISHED responses.
474 pub(super) async fn move_native_impl(
475 &self,
476 cmd: Command,
477 timeout: Duration,
478 ) -> Result<MoveResult, Error> {
479 tokio::time::timeout(
480 timeout,
481 self.submit_regular(cmd, dispatch::MoveConsumer::new()),
482 )
483 .await
484 .map_err(|_| Error::Timeout)?
485 }
486
487 /// UID COPY (RFC 3501 Section 6.4.7, RFC 4315 Section 3).
488 ///
489 /// On success, returns the server's response code, which SHOULD be
490 /// `[COPYUID uid-validity source-uids dest-uids]` per RFC 4315 Section 3.
491 pub async fn uid_copy(
492 &self,
493 sequence_set: &SequenceSet,
494 mailbox: &str,
495 timeout: Duration,
496 ) -> Result<CopyResult, Error> {
497 // RFC 5182 Section 2: `$` references saved search results and requires SEARCHRES.
498 if sequence_set.as_str().contains('$') {
499 self.require_searchres()?;
500 }
501 let mbox = MailboxName::new(mailbox)?;
502 self.copy_impl(
503 Command::UidCopy {
504 sequence_set: sequence_set.clone(),
505 mailbox: mbox,
506 },
507 timeout,
508 )
509 .await
510 }
511
512 /// Shared implementation for COPY and UID COPY (RFC 3501 Section 6.4.7,
513 /// RFC 4315 Section 3).
514 ///
515 /// On success, returns a [`CopyResult`] containing the server's response
516 /// code, which SHOULD be `[COPYUID uid-validity source-uids dest-uids]`
517 /// per RFC 4315 Section 3.
518 pub(super) async fn copy_impl(
519 &self,
520 cmd: Command,
521 timeout: Duration,
522 ) -> Result<CopyResult, Error> {
523 self.require_state(&[SessionState::Selected])?;
524 self.check_utf8_only_enforced()?;
525 tokio::time::timeout(
526 timeout,
527 self.submit_regular(cmd, dispatch::CopyConsumer::new()),
528 )
529 .await
530 .map_err(|_| Error::Timeout)?
531 }
532
533 /// UID EXPUNGE (RFC 4315 UIDPLUS / RFC 9051 Section 6.4.9).
534 pub async fn uid_expunge(
535 &self,
536 sequence_set: &SequenceSet,
537 timeout: Duration,
538 ) -> Result<ExpungeResult, Error> {
539 self.require_state(&[SessionState::Selected])?;
540 // RFC 5182 Section 2: `$` references saved search results and requires SEARCHRES.
541 if sequence_set.as_str().contains('$') {
542 self.require_searchres()?;
543 }
544 {
545 let snap = self.state_rx.borrow();
546 // RFC 4315 Section 2: UID EXPUNGE requires the UIDPLUS extension.
547 // RFC 9051 Appendix E item 3: IMAP4rev2 incorporates UIDPLUS into
548 // the base command set, so rev2 servers implicitly support it.
549 if !snap.capabilities.contains(&Capability::UidPlus)
550 && !super::auth::is_rev2_from_snapshot(&snap)
551 {
552 return Err(Error::MissingCapability("UIDPLUS".into()));
553 }
554 }
555 let cmd = Command::UidExpunge {
556 sequence_set: sequence_set.clone(),
557 };
558 tokio::time::timeout(
559 timeout,
560 self.submit_regular(cmd, dispatch::ExpungeConsumer::new()),
561 )
562 .await
563 .map_err(|_| Error::Timeout)?
564 }
565
566 /// EXPUNGE (RFC 3501 Section 6.4.3 / RFC 7162 Section 3.2.10).
567 ///
568 /// Without QRESYNC, returns `ExpungeResult::Expunged` with the sequence
569 /// numbers of removed messages (`* n EXPUNGE`, RFC 3501 Section 7.4.1).
570 ///
571 /// After `ENABLE QRESYNC` (RFC 7162 Section 3.2.10), the server sends
572 /// `VANISHED` responses instead of `EXPUNGE`, and this method returns
573 /// `ExpungeResult::Vanished` with UID ranges.
574 pub async fn expunge(&self, timeout: Duration) -> Result<ExpungeResult, Error> {
575 self.require_state(&[SessionState::Selected])?;
576 tokio::time::timeout(
577 timeout,
578 self.submit_regular(Command::Expunge, dispatch::ExpungeConsumer::new()),
579 )
580 .await
581 .map_err(|_| Error::Timeout)?
582 }
583}