Skip to main content

io_imap/
client.rs

1//! Blocking IMAP client wrapping a `Read + Write` stream with a
2//! per-connection [`Fragmentizer`] and one method per coroutine.
3//!
4//! Session state is intentionally not cached: callers retain what
5//! they need (capability list, selected mailbox, ...).
6
7use core::{
8    any::Any,
9    fmt,
10    num::{NonZeroU32, NonZeroU64},
11    sync::atomic::{AtomicBool, Ordering},
12    time::Duration,
13};
14
15#[cfg(any(
16    feature = "rustls-aws",
17    feature = "rustls-ring",
18    feature = "native-tls"
19))]
20use alloc::string::ToString;
21use alloc::{borrow::Cow, boxed::Box, collections::BTreeMap, string::String, vec::Vec};
22#[cfg(any(
23    feature = "rustls-aws",
24    feature = "rustls-ring",
25    feature = "native-tls"
26))]
27use secrecy::ExposeSecret;
28
29use std::{
30    io::{self, Read, Write},
31    sync::{
32        Arc,
33        mpsc::{self, Receiver, RecvTimeoutError, TryRecvError},
34    },
35    thread::{self, JoinHandle},
36};
37
38use imap_codec::{
39    fragmentizer::Fragmentizer,
40    imap_types::{
41        command::SelectParameter,
42        core::{IString, NString, Vec1},
43        datetime::DateTime,
44        extensions::{
45            binary::LiteralOrLiteral8,
46            enable::CapabilityEnable,
47            sort::SortCriterion,
48            thread::{Thread, ThreadingAlgorithm},
49        },
50        fetch::{MacroOrMessageDataItemNames, MessageDataItem},
51        flag::{Flag, StoreType},
52        mailbox::{ListMailbox, Mailbox},
53        response::Capability,
54        search::SearchKey,
55        sequence::SequenceSet,
56        status::{StatusDataItem, StatusDataItemName},
57    },
58};
59#[cfg(feature = "scram")]
60#[cfg(any(
61    feature = "rustls-aws",
62    feature = "rustls-ring",
63    feature = "native-tls"
64))]
65use pimalaya_stream::sasl::SaslScramSha256;
66#[cfg(any(
67    feature = "rustls-aws",
68    feature = "rustls-ring",
69    feature = "native-tls"
70))]
71use pimalaya_stream::{
72    sasl::{Sasl, SaslAnonymous, SaslLogin, SaslOauthbearer, SaslPlain, SaslXoauth2},
73    std::stream::StreamStd,
74    tls::Tls,
75};
76use thiserror::Error;
77#[cfg(any(
78    feature = "rustls-aws",
79    feature = "rustls-ring",
80    feature = "native-tls"
81))]
82use url::Url;
83
84#[cfg(feature = "scram")]
85use crate::rfc7677::auth_scram_sha_256::*;
86use crate::{
87    coroutine::*,
88    rfc2971::id::*,
89    rfc3501::{
90        append::*, capability::*, check::*, close::*, copy::*, create::*, delete::*, examine::*,
91        expunge::*, fetch::*, greeting::*, list::*, login::*, logout::*, lsub::*, noop::*,
92        rename::*, search::*, select::*, starttls::*, status::*, store::*, subscribe::*,
93        unsubscribe::*,
94    },
95    rfc3691::unselect::*,
96    rfc5161::enable::*,
97    rfc5256::{sort::*, thread::*},
98    rfc6851::r#move::*,
99    rfc7628::auth_oauthbearer::*,
100    sasl::{auth_anonymous::*, auth_login::*, auth_plain::*, auth_xoauth2::*},
101    watch::*,
102};
103
104/// Failure causes returned by [`ImapClientStd`].
105#[derive(Debug, Error)]
106pub enum ImapClientStdError {
107    #[error(transparent)]
108    Greeting(#[from] ImapGreetingGetError),
109    #[error(transparent)]
110    Login(#[from] ImapLoginError),
111    #[error(transparent)]
112    AuthLogin(#[from] ImapAuthLoginError),
113    #[error(transparent)]
114    AuthPlain(#[from] ImapAuthPlainError),
115    #[error(transparent)]
116    AuthAnonymous(#[from] ImapAuthAnonymousError),
117    #[error(transparent)]
118    AuthOAuthBearer(#[from] ImapAuthOauthbearerError),
119    #[error(transparent)]
120    AuthXOAuth2(#[from] ImapAuthXoauth2Error),
121    #[cfg(feature = "scram")]
122    #[error(transparent)]
123    AuthScramSha256(#[from] ImapAuthScramSha256Error),
124    #[cfg(any(
125        feature = "rustls-aws",
126        feature = "rustls-ring",
127        feature = "native-tls"
128    ))]
129    #[cfg(not(feature = "scram"))]
130    #[error("SCRAM-SHA-256 SASL mechanism requires the `scram` cargo feature")]
131    ScramSha256NotEnabled,
132    #[error(transparent)]
133    Logout(#[from] ImapLogoutError),
134
135    #[error(transparent)]
136    Capability(#[from] ImapCapabilityGetError),
137    #[error(transparent)]
138    Noop(#[from] ImapNoopError),
139    #[error(transparent)]
140    ServerId(#[from] ImapServerIdError),
141    #[error(transparent)]
142    ExtensionEnable(#[from] ImapExtensionEnableError),
143
144    #[error(transparent)]
145    MailboxList(#[from] ImapMailboxListError),
146    #[error(transparent)]
147    MailboxLsub(#[from] ImapMailboxLsubError),
148    #[error(transparent)]
149    MailboxStatus(#[from] ImapMailboxStatusError),
150    #[error(transparent)]
151    MailboxCreate(#[from] ImapMailboxCreateError),
152    #[error(transparent)]
153    MailboxDelete(#[from] ImapMailboxDeleteError),
154    #[error(transparent)]
155    MailboxRename(#[from] ImapMailboxRenameError),
156    #[error(transparent)]
157    MailboxSubscribe(#[from] ImapMailboxSubscribeError),
158    #[error(transparent)]
159    MailboxUnsubscribe(#[from] ImapMailboxUnsubscribeError),
160    #[error(transparent)]
161    MailboxSelect(#[from] ImapMailboxSelectError),
162    #[error(transparent)]
163    MailboxExamine(#[from] ImapMailboxExamineError),
164    #[error(transparent)]
165    MailboxWatch(#[from] ImapMailboxWatchError),
166    #[error(transparent)]
167    MailboxClose(#[from] ImapMailboxCloseError),
168    #[error(transparent)]
169    MailboxUnselect(#[from] ImapMailboxUnselectError),
170    #[error(transparent)]
171    MailboxCheck(#[from] ImapMailboxCheckError),
172    #[error(transparent)]
173    MailboxExpunge(#[from] ImapMailboxExpungeError),
174    #[error(transparent)]
175    MailboxSort(#[from] ImapMailboxSortError),
176
177    #[error(transparent)]
178    MessageFetch(#[from] ImapMessageFetchError),
179    #[error(transparent)]
180    MessageSearch(#[from] ImapMessageSearchError),
181    #[error(transparent)]
182    MessageStore(#[from] ImapMessageStoreError),
183    #[error(transparent)]
184    MessageCopy(#[from] ImapMessageCopyError),
185    #[error(transparent)]
186    MessageMove(#[from] ImapMessageMoveError),
187    #[error(transparent)]
188    MessageAppend(#[from] ImapMessageAppendError),
189    #[error(transparent)]
190    MessageThread(#[from] ImapMessageThreadError),
191
192    #[error(transparent)]
193    Io(#[from] io::Error),
194    #[error(transparent)]
195    StartTls(#[from] ImapStartTlsError),
196    #[cfg(any(
197        feature = "rustls-aws",
198        feature = "rustls-ring",
199        feature = "native-tls"
200    ))]
201    #[error(transparent)]
202    Tls(#[from] anyhow::Error),
203    #[cfg(any(
204        feature = "rustls-aws",
205        feature = "rustls-ring",
206        feature = "native-tls"
207    ))]
208    #[error("IMAP URL `{0}` has no host")]
209    UrlMissingHost(String),
210    #[cfg(any(
211        feature = "rustls-aws",
212        feature = "rustls-ring",
213        feature = "native-tls"
214    ))]
215    #[error("IMAP URL `{0}` has unsupported scheme `{1}` (expected `imap` or `imaps`)")]
216    UrlUnsupportedScheme(String, String),
217    #[cfg(any(
218        feature = "rustls-aws",
219        feature = "rustls-ring",
220        feature = "native-tls"
221    ))]
222    #[error("STARTTLS requested on an `imaps://` URL: TLS is already active")]
223    StartTlsOverTls,
224    #[error("Invalid IMAP LOGIN credentials")]
225    InvalidLoginCredentials(#[from] imap_codec::imap_types::error::ValidationError),
226
227    #[error("IMAP server does not advertise QRESYNC capability")]
228    QresyncNotSupported,
229    #[error("Invalid mod-sequence value: 0")]
230    InvalidModSeq,
231}
232
233const READ_BUFFER_SIZE: usize = 16 * 1024;
234const FRAGMENTIZER_MAX_MESSAGE_SIZE: u32 = 100 * 1024 * 1024;
235
236/// Default ALPN identifier for IMAP TLS (RFC 7595).
237pub fn default_alpn() -> Vec<String> {
238    vec![String::from("imap")]
239}
240
241/// `auto_id` is consumed by every auth_*/login: `None` skips,
242/// `Some(empty)` sends `ID NIL`, `Some(params)` sends `ID (k v ...)`.
243/// Required by a few providers (mail.qq.com, fastmail).
244pub struct ImapClientStd {
245    pub stream: Box<dyn ImapStream>,
246    pub fragmentizer: Fragmentizer,
247    pub auto_id: Option<Vec<(IString<'static>, NString<'static>)>>,
248}
249
250impl ImapClientStd {
251    /// Caller is responsible for opening the connection (TCP, TLS,
252    /// STARTTLS).
253    pub fn new<S: Read + Write + Send + 'static>(stream: S) -> Self {
254        Self {
255            stream: Box::new(stream),
256            fragmentizer: Fragmentizer::new(FRAGMENTIZER_MAX_MESSAGE_SIZE),
257            auto_id: None,
258        }
259    }
260
261    /// Useful after a STARTTLS upgrade or on reconnection.
262    pub fn set_stream<S: Read + Write + Send + 'static>(&mut self, stream: S) {
263        self.stream = Box::new(stream);
264    }
265
266    /// Drives a standard-shape coroutine to completion. Richer yields (IDLE
267    /// events, watch deltas) need their own per-method loops.
268    pub fn run<C, T, E>(&mut self, mut coroutine: C) -> Result<T, ImapClientStdError>
269    where
270        C: ImapCoroutine<Yield = ImapYield, Return = Result<T, E>>,
271        ImapClientStdError: From<E>,
272    {
273        let mut buf = [0u8; READ_BUFFER_SIZE];
274        let mut arg: Option<&[u8]> = None;
275
276        loop {
277            match coroutine.resume(&mut self.fragmentizer, arg.take()) {
278                ImapCoroutineState::Complete(Ok(out)) => return Ok(out),
279                ImapCoroutineState::Complete(Err(err)) => return Err(err.into()),
280                ImapCoroutineState::Yielded(ImapYield::WantsRead) => {
281                    let n = self.stream.read(&mut buf)?;
282                    arg = Some(&buf[..n]);
283                }
284                ImapCoroutineState::Yielded(ImapYield::WantsWrite(bytes)) => {
285                    self.stream.write_all(&bytes)?;
286                    arg = None;
287                }
288            }
289        }
290    }
291
292    // ---- Session lifecycle ------------------------------------------------
293
294    /// Consumes the greeting and returns the advertised capabilities
295    /// (forcing a CAPABILITY round-trip if the greeting carried none).
296    pub fn greeting(&mut self) -> Result<Vec<Capability<'static>>, ImapClientStdError> {
297        Ok(self
298            .run(ImapGreetingGet::new(ImapGreetingGetOptions {
299                ensure_capabilities: true,
300            }))?
301            .capability)
302    }
303
304    /// `LOGIN`. Channel must be TLS-protected. Consumes `auto_id`.
305    pub fn login(
306        &mut self,
307        user: impl AsRef<str>,
308        password: impl AsRef<str>,
309        opts: ImapLoginOptions,
310    ) -> Result<Vec<Capability<'static>>, ImapClientStdError> {
311        self.run(ImapLogin::new(user, password, opts)?)
312    }
313
314    /// `STARTTLS`. Caller still has to upgrade the socket and refresh
315    /// capabilities. Returns any bytes pre-read past the tagged
316    /// response (a non-empty return is a STARTTLS-injection signal:
317    /// refuse the upgrade).
318    pub fn starttls(&mut self) -> Result<Vec<u8>, ImapClientStdError> {
319        self.run(ImapStartTls::new())
320    }
321
322    /// SASL `AUTHENTICATE ANONYMOUS`. Consumes `auto_id`.
323    pub fn auth_anonymous(
324        &mut self,
325        message: Option<impl AsRef<str>>,
326        opts: ImapAuthAnonymousOptions,
327    ) -> Result<Vec<Capability<'static>>, ImapClientStdError> {
328        self.run(ImapAuthAnonymous::new(message, opts))
329    }
330
331    /// SASL `AUTHENTICATE LOGIN` (legacy). Prefer auth_plain or
332    /// auth_scram_sha256 when supported. Consumes `auto_id`.
333    pub fn auth_login(
334        &mut self,
335        user: impl AsRef<str>,
336        password: impl AsRef<str>,
337        opts: ImapAuthLoginOptions,
338    ) -> Result<Vec<Capability<'static>>, ImapClientStdError> {
339        self.run(ImapAuthLogin::new(user, password, opts))
340    }
341
342    /// SASL `AUTHENTICATE PLAIN`. Consumes `auto_id`.
343    pub fn auth_plain(
344        &mut self,
345        authzid: Option<impl AsRef<str>>,
346        authcid: impl AsRef<str>,
347        password: impl AsRef<str>,
348        opts: ImapAuthPlainOptions,
349    ) -> Result<Vec<Capability<'static>>, ImapClientStdError> {
350        self.run(ImapAuthPlain::new(authzid, authcid, password, opts))
351    }
352
353    /// SASL `AUTHENTICATE OAUTHBEARER`. Channel must be
354    /// TLS-protected. Consumes `auto_id`.
355    pub fn auth_oauthbearer(
356        &mut self,
357        user: impl AsRef<str>,
358        host: impl AsRef<str>,
359        port: u16,
360        token: impl AsRef<str>,
361        opts: ImapAuthOauthbearerOptions,
362    ) -> Result<Vec<Capability<'static>>, ImapClientStdError> {
363        self.run(ImapAuthOauthbearer::new(user, host, port, token, opts))
364    }
365
366    /// SASL `AUTHENTICATE XOAUTH2` (Google's pre-standard mechanism).
367    /// Prefer auth_oauthbearer when supported. Consumes `auto_id`.
368    pub fn auth_xoauth2(
369        &mut self,
370        user: impl AsRef<str>,
371        token: impl AsRef<str>,
372        opts: ImapAuthXoauth2Options,
373    ) -> Result<Vec<Capability<'static>>, ImapClientStdError> {
374        self.run(ImapAuthXoauth2::new(user, token, opts))
375    }
376
377    /// SASL `AUTHENTICATE SCRAM-SHA-256`. Consumes `auto_id`.
378    #[cfg(feature = "scram")]
379    pub fn auth_scram_sha256(
380        &mut self,
381        user: impl AsRef<str>,
382        password: impl AsRef<str>,
383        opts: ImapAuthScramSha256Options,
384    ) -> Result<Vec<Capability<'static>>, ImapClientStdError> {
385        self.run(ImapAuthScramSha256::new(user, password, opts))
386    }
387
388    pub fn logout(&mut self) -> Result<(), ImapClientStdError> {
389        self.run(ImapLogout::new())
390    }
391
392    // ---- State / introspection -------------------------------------------
393
394    pub fn capability(&mut self) -> Result<Vec<Capability<'static>>, ImapClientStdError> {
395        self.run(ImapCapabilityGet::new())
396    }
397
398    pub fn noop(&mut self) -> Result<(), ImapClientStdError> {
399        self.run(ImapNoop::new())
400    }
401
402    /// `ID`. `None` sends `ID NIL`.
403    pub fn id(
404        &mut self,
405        parameters: Option<Vec<(IString<'static>, NString<'static>)>>,
406    ) -> Result<Option<Vec<(IString<'static>, NString<'static>)>>, ImapClientStdError> {
407        self.run(ImapServerId::new(ImapServerIdOptions { parameters }))
408    }
409
410    pub fn enable(
411        &mut self,
412        capabilities: Vec1<CapabilityEnable<'static>>,
413    ) -> Result<Option<Vec<CapabilityEnable<'static>>>, ImapClientStdError> {
414        self.run(ImapExtensionEnable::new(capabilities))
415    }
416
417    // ---- Mailbox structure -----------------------------------------------
418
419    pub fn list(
420        &mut self,
421        reference: Mailbox<'static>,
422        pattern: ListMailbox<'static>,
423    ) -> Result<ImapMailboxListing, ImapClientStdError> {
424        self.run(ImapMailboxList::new(reference, pattern))
425    }
426
427    pub fn lsub(
428        &mut self,
429        reference: Mailbox<'static>,
430        pattern: ListMailbox<'static>,
431    ) -> Result<ImapMailboxListing, ImapClientStdError> {
432        self.run(ImapMailboxLsub::new(reference, pattern))
433    }
434
435    pub fn status(
436        &mut self,
437        mailbox: Mailbox<'static>,
438        item_names: impl Into<Cow<'static, [StatusDataItemName]>>,
439    ) -> Result<Vec<StatusDataItem>, ImapClientStdError> {
440        self.run(ImapMailboxStatus::new(mailbox, item_names))
441    }
442
443    pub fn create(&mut self, mailbox: Mailbox<'static>) -> Result<(), ImapClientStdError> {
444        self.run(ImapMailboxCreate::new(mailbox))
445    }
446
447    pub fn delete(&mut self, mailbox: Mailbox<'static>) -> Result<(), ImapClientStdError> {
448        self.run(ImapMailboxDelete::new(mailbox))
449    }
450
451    pub fn rename(
452        &mut self,
453        from: Mailbox<'static>,
454        to: Mailbox<'static>,
455    ) -> Result<(), ImapClientStdError> {
456        self.run(ImapMailboxRename::new(from, to))
457    }
458
459    pub fn subscribe(&mut self, mailbox: Mailbox<'static>) -> Result<(), ImapClientStdError> {
460        self.run(ImapMailboxSubscribe::new(mailbox))
461    }
462
463    pub fn unsubscribe(&mut self, mailbox: Mailbox<'static>) -> Result<(), ImapClientStdError> {
464        self.run(ImapMailboxUnsubscribe::new(mailbox))
465    }
466
467    // ---- Mailbox selection -----------------------------------------------
468
469    pub fn select(&mut self, mailbox: Mailbox<'static>) -> Result<SelectData, ImapClientStdError> {
470        self.run(ImapMailboxSelect::new(
471            mailbox,
472            ImapMailboxSelectOptions::default(),
473        ))
474    }
475
476    pub fn examine(&mut self, mailbox: Mailbox<'static>) -> Result<SelectData, ImapClientStdError> {
477        self.run(ImapMailboxExamine::new(
478            mailbox,
479            ImapMailboxExamineOptions::default(),
480        ))
481    }
482
483    /// `SELECT <mailbox> (QRESYNC ...)`. Errors with
484    /// `QresyncNotSupported` when `capability` lacks QRESYNC, with
485    /// `InvalidModSeq` when `highest_mod_seq` is 0.
486    pub fn select_qresync(
487        &mut self,
488        mailbox: Mailbox<'static>,
489        uid_validity: NonZeroU32,
490        highest_mod_seq: u64,
491        capability: &[Capability<'static>],
492    ) -> Result<SelectData, ImapClientStdError> {
493        if !capability.contains(&Capability::QResync) {
494            return Err(ImapClientStdError::QresyncNotSupported);
495        }
496
497        let Some(highest_mod_seq) = NonZeroU64::new(highest_mod_seq) else {
498            return Err(ImapClientStdError::InvalidModSeq);
499        };
500
501        let parameters = vec![SelectParameter::QResync {
502            uid_validity,
503            mod_sequence_value: highest_mod_seq,
504            known_uids: None,
505            seq_match_data: None,
506        }];
507
508        self.run(ImapMailboxSelect::new(
509            mailbox,
510            ImapMailboxSelectOptions { parameters },
511        ))
512    }
513
514    pub fn close(&mut self) -> Result<(), ImapClientStdError> {
515        self.run(ImapMailboxClose::new())
516    }
517
518    pub fn unselect(&mut self) -> Result<(), ImapClientStdError> {
519        self.run(ImapMailboxUnselect::new())
520    }
521
522    pub fn check(&mut self) -> Result<(), ImapClientStdError> {
523        self.run(ImapMailboxCheck::new())
524    }
525
526    /// `EXPUNGE`; returns the expunged sequence numbers.
527    pub fn expunge(&mut self) -> Result<Vec<NonZeroU32>, ImapClientStdError> {
528        self.run(ImapMailboxExpunge::new())
529    }
530
531    /// Consumes the client into a background watcher. Drop the
532    /// returned stream (or call its `close`) to wind down. Errors when
533    /// `capability` lacks QRESYNC.
534    pub fn watch_mailbox(
535        self,
536        mailbox: Mailbox<'static>,
537        capability: &[Capability<'static>],
538    ) -> Result<ImapMailboxWatchStream, ImapClientStdError> {
539        let shutdown = Arc::new(AtomicBool::new(false));
540        let mut watcher = ImapMailboxWatch::new(capability, mailbox, shutdown.clone())?;
541        let mut fragmentizer = self.fragmentizer;
542        let mut stream = self.stream;
543
544        let (tx, rx) = mpsc::sync_channel::<Result<ImapMailboxWatchEvent, ImapClientStdError>>(256);
545        let shutdown_handle = shutdown.clone();
546        let handle = thread::spawn(move || {
547            let mut buf = [0u8; READ_BUFFER_SIZE];
548            let mut arg: Option<Vec<u8>> = None;
549
550            loop {
551                match watcher.resume(&mut fragmentizer, arg.as_deref()) {
552                    ImapCoroutineState::Yielded(ImapMailboxWatchYield::Event(e)) => {
553                        arg = None;
554                        if tx.send(Ok(e)).is_err() {
555                            return;
556                        }
557                    }
558                    ImapCoroutineState::Complete(Ok(())) => return,
559                    ImapCoroutineState::Yielded(ImapMailboxWatchYield::WantsRead) => {
560                        match stream.read(&mut buf) {
561                            Ok(0) => {
562                                let eof = io::ErrorKind::UnexpectedEof;
563                                let err = "IMAP server closed the connection during watch";
564                                tx.send(Err(io::Error::new(eof, err).into())).ok();
565                                return;
566                            }
567                            Ok(n) => arg = Some(buf[..n].to_vec()),
568                            Err(err) => {
569                                tx.send(Err(err.into())).ok();
570                                return;
571                            }
572                        }
573                    }
574                    ImapCoroutineState::Yielded(ImapMailboxWatchYield::WantsWrite(bytes)) => {
575                        if let Err(err) = stream.write_all(&bytes) {
576                            tx.send(Err(err.into())).ok();
577                            return;
578                        }
579                        arg = None;
580                    }
581                    ImapCoroutineState::Complete(Err(err)) => {
582                        tx.send(Err(err.into())).ok();
583                        return;
584                    }
585                }
586            }
587        });
588
589        Ok(ImapMailboxWatchStream {
590            rx,
591            handle: Some(handle),
592            shutdown: shutdown_handle,
593        })
594    }
595
596    // ---- Messages --------------------------------------------------------
597
598    pub fn fetch(
599        &mut self,
600        sequence_set: SequenceSet,
601        items: MacroOrMessageDataItemNames<'static>,
602        uid: bool,
603    ) -> Result<BTreeMap<NonZeroU32, Vec1<MessageDataItem<'static>>>, ImapClientStdError> {
604        self.run(ImapMessageFetch::new(
605            sequence_set,
606            items,
607            ImapMessageFetchOptions {
608                uid,
609                ..Default::default()
610            },
611        ))
612    }
613
614    pub fn search(
615        &mut self,
616        criteria: Vec1<SearchKey<'static>>,
617        uid: bool,
618    ) -> Result<Vec<NonZeroU32>, ImapClientStdError> {
619        self.run(ImapMessageSearch::new(
620            criteria,
621            ImapMessageSearchOptions { uid },
622        ))
623    }
624
625    /// `STORE` (echo variant); returns the server-reported FETCH echoes.
626    pub fn store(
627        &mut self,
628        sequence_set: SequenceSet,
629        kind: StoreType,
630        flags: Vec<Flag<'static>>,
631        uid: bool,
632    ) -> Result<BTreeMap<NonZeroU32, Vec1<MessageDataItem<'static>>>, ImapClientStdError> {
633        self.run(ImapMessageStore::new(
634            sequence_set,
635            kind,
636            flags,
637            ImapMessageStoreOptions { uid },
638        ))
639    }
640
641    pub fn copy(
642        &mut self,
643        sequence_set: SequenceSet,
644        mailbox: Mailbox<'static>,
645        uid: bool,
646    ) -> Result<ImapCopyUid, ImapClientStdError> {
647        self.run(ImapMessageCopy::new(
648            sequence_set,
649            mailbox,
650            ImapMessageCopyOptions { uid },
651        ))
652    }
653
654    pub fn r#move(
655        &mut self,
656        sequence_set: SequenceSet,
657        mailbox: Mailbox<'static>,
658        uid: bool,
659    ) -> Result<ImapCopyUid, ImapClientStdError> {
660        self.run(ImapMessageMove::new(
661            sequence_set,
662            mailbox,
663            ImapMessageMoveOptions { uid },
664        ))
665    }
666
667    /// `APPEND`; returns the optional EXISTS count and APPENDUID pair.
668    pub fn append(
669        &mut self,
670        mailbox: Mailbox<'static>,
671        flags: Vec<Flag<'static>>,
672        date: Option<DateTime>,
673        message: LiteralOrLiteral8<'static>,
674    ) -> Result<ImapAppendOutput, ImapClientStdError> {
675        self.run(ImapMessageAppend::new(
676            mailbox,
677            message,
678            ImapMessageAppendOptions { flags, date },
679        ))
680    }
681
682    // ---- RFC 5256: SORT / THREAD ------------------------------------------
683
684    pub fn sort(
685        &mut self,
686        sort_criteria: Vec1<SortCriterion>,
687        search_criteria: Vec1<SearchKey<'static>>,
688        uid: bool,
689    ) -> Result<Vec<NonZeroU32>, ImapClientStdError> {
690        self.run(ImapMailboxSort::new(
691            sort_criteria,
692            search_criteria,
693            ImapMailboxSortOptions { uid },
694        ))
695    }
696
697    pub fn thread(
698        &mut self,
699        algorithm: ThreadingAlgorithm<'static>,
700        search_criteria: Vec1<SearchKey<'static>>,
701        uid: bool,
702    ) -> Result<Vec<Thread>, ImapClientStdError> {
703        self.run(ImapMessageThread::new(
704            algorithm,
705            search_criteria,
706            ImapMessageThreadOptions { uid },
707        ))
708    }
709}
710
711impl fmt::Debug for ImapClientStd {
712    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
713        f.debug_struct("ImapClientStd")
714            .field("fragmentizer", &self.fragmentizer)
715            .finish_non_exhaustive()
716    }
717}
718
719/// Background-worker watch stream; drop or [`Self::close`] to wind down.
720pub struct ImapMailboxWatchStream {
721    rx: Receiver<Result<ImapMailboxWatchEvent, ImapClientStdError>>,
722    handle: Option<JoinHandle<()>>,
723    shutdown: Arc<AtomicBool>,
724}
725
726impl ImapMailboxWatchStream {
727    /// Non-blocking probe for the next event.
728    pub fn try_recv(
729        &self,
730    ) -> Result<Result<ImapMailboxWatchEvent, ImapClientStdError>, TryRecvError> {
731        self.rx.try_recv()
732    }
733
734    /// Waits up to `timeout` for the next event.
735    pub fn recv_timeout(
736        &self,
737        timeout: Duration,
738    ) -> Result<Result<ImapMailboxWatchEvent, ImapClientStdError>, RecvTimeoutError> {
739        self.rx.recv_timeout(timeout)
740    }
741
742    /// Signals shutdown and joins the worker.
743    pub fn close(mut self) -> Result<(), ImapClientStdError> {
744        self.shutdown.store(true, Ordering::SeqCst);
745        if let Some(handle) = self.handle.take() {
746            handle
747                .join()
748                .map_err(|_| io::Error::other("IMAP watch worker panicked"))?;
749        }
750        Ok(())
751    }
752}
753
754impl Iterator for ImapMailboxWatchStream {
755    type Item = Result<ImapMailboxWatchEvent, ImapClientStdError>;
756
757    fn next(&mut self) -> Option<Self::Item> {
758        self.rx.recv().ok()
759    }
760}
761
762impl Drop for ImapMailboxWatchStream {
763    fn drop(&mut self) {
764        self.shutdown.store(true, Ordering::SeqCst);
765
766        if let Some(handle) = self.handle.take() {
767            handle.join().ok();
768        }
769    }
770}
771
772#[cfg(any(
773    feature = "rustls-aws",
774    feature = "rustls-ring",
775    feature = "native-tls"
776))]
777impl ImapClientStd {
778    /// End-to-end connect: TCP/TLS, optional STARTTLS, greeting,
779    /// optional SASL. `imap://` is plain TCP (143), `imaps://` is
780    /// implicit TLS (993). `starttls = true` is only valid on
781    /// `imap://`. Pass `Sasl::None` to skip auth.
782    pub fn connect(
783        url: &Url,
784        tls: &Tls,
785        starttls: bool,
786        sasl: Option<impl Into<Sasl>>,
787        auto_id: Option<Vec<(IString<'static>, NString<'static>)>>,
788    ) -> Result<(Self, Vec<Capability<'static>>), ImapClientStdError> {
789        let Some(host) = url.host_str() else {
790            return Err(ImapClientStdError::UrlMissingHost(url.to_string()));
791        };
792
793        let (stream, is_tls) = match url.scheme() {
794            scheme if scheme.eq_ignore_ascii_case("imap") => (
795                StreamStd::connect_tcp(host, url.port().unwrap_or(143))?,
796                false,
797            ),
798            scheme if scheme.eq_ignore_ascii_case("imaps") => (
799                StreamStd::connect_tls(host, url.port().unwrap_or(993), tls)?,
800                true,
801            ),
802            scheme => {
803                let url = url.to_string();
804                let scheme = scheme.to_string();
805                return Err(ImapClientStdError::UrlUnsupportedScheme(url, scheme));
806            }
807        };
808
809        if starttls && is_tls {
810            return Err(ImapClientStdError::StartTlsOverTls);
811        }
812
813        // NOTE: STARTTLS needs the concrete StreamStd for upgrade_tls,
814        // so run it inline before boxing the stream.
815        let stream = if starttls {
816            let mut stream = stream;
817            let mut fragmentizer = Fragmentizer::new(FRAGMENTIZER_MAX_MESSAGE_SIZE);
818            run_starttls(&mut stream, &mut fragmentizer)?;
819            stream.upgrade_tls(tls)?
820        } else {
821            stream
822        };
823
824        // NOTE: 5s per-read timeout lets watch_mailbox poll shutdown
825        // during a silent IDLE; long FETCHes are unaffected.
826        stream.set_read_timeout(Some(Duration::from_secs(5)))?;
827
828        let mut client = Self::new(stream);
829        client.auto_id = auto_id;
830
831        let mut capability = if starttls {
832            client.capability()?
833        } else {
834            client.greeting()?
835        };
836
837        if let Some(sasl) = sasl.map(Into::into) {
838            let ir = capability.contains(&Capability::SaslIr);
839
840            capability = match sasl {
841                Sasl::Anonymous(SaslAnonymous { message }) => {
842                    let opts = ImapAuthAnonymousOptions {
843                        initial_request: ir,
844                        ensure_capabilities: true,
845                        auto_id: client.auto_id.take(),
846                    };
847
848                    client.auth_anonymous(message, opts)?
849                }
850                Sasl::Login(SaslLogin { username, password }) => {
851                    let opts = ImapLoginOptions {
852                        ensure_capabilities: true,
853                        auto_id: client.auto_id.take(),
854                    };
855
856                    client.login(username, password.expose_secret(), opts)?
857                }
858                Sasl::Plain(SaslPlain {
859                    authzid,
860                    authcid,
861                    passwd,
862                }) => {
863                    let opts = ImapAuthPlainOptions {
864                        initial_request: ir,
865                        ensure_capabilities: true,
866                        auto_id: client.auto_id.take(),
867                    };
868
869                    client.auth_plain(authzid, authcid, passwd.expose_secret(), opts)?
870                }
871                Sasl::Oauthbearer(SaslOauthbearer {
872                    username,
873                    host,
874                    port,
875                    token,
876                }) => {
877                    let opts = ImapAuthOauthbearerOptions {
878                        initial_request: ir,
879                        ensure_capabilities: true,
880                        auto_id: client.auto_id.take(),
881                    };
882
883                    client.auth_oauthbearer(username, host, port, token.expose_secret(), opts)?
884                }
885                Sasl::Xoauth2(SaslXoauth2 { username, token }) => {
886                    let opts = ImapAuthXoauth2Options {
887                        initial_request: ir,
888                        ensure_capabilities: true,
889                        auto_id: client.auto_id.take(),
890                    };
891
892                    client.auth_xoauth2(username, token.expose_secret(), opts)?
893                }
894                #[cfg(feature = "scram")]
895                Sasl::ScramSha256(SaslScramSha256 { username, password }) => {
896                    let opts = ImapAuthScramSha256Options {
897                        initial_request: ir,
898                        ensure_capabilities: true,
899                        auto_id: client.auto_id.take(),
900                    };
901
902                    client.auth_scram_sha256(username, password.expose_secret(), opts)?
903                }
904                #[cfg(not(feature = "scram"))]
905                Sasl::ScramSha256(_) => {
906                    return Err(ImapClientStdError::ScramSha256NotEnabled);
907                }
908            };
909        }
910
911        Ok((client, capability))
912    }
913}
914
915/// Inline STARTTLS driver: keeps the concrete `StreamStd` so that
916/// `upgrade_tls` can swap the underlying socket afterwards.
917#[cfg(any(
918    feature = "rustls-aws",
919    feature = "rustls-ring",
920    feature = "native-tls"
921))]
922fn run_starttls(
923    stream: &mut StreamStd,
924    fragmentizer: &mut Fragmentizer,
925) -> Result<(), ImapClientStdError> {
926    let mut coroutine = ImapStartTls::new();
927    let mut buf = [0u8; READ_BUFFER_SIZE];
928    let mut arg: Option<&[u8]> = None;
929
930    loop {
931        match coroutine.resume(fragmentizer, arg.take()) {
932            ImapCoroutineState::Complete(Ok(_)) => return Ok(()),
933            ImapCoroutineState::Complete(Err(err)) => return Err(err.into()),
934            ImapCoroutineState::Yielded(ImapYield::WantsRead) => {
935                let n = stream.read(&mut buf)?;
936                arg = Some(&buf[..n]);
937            }
938            ImapCoroutineState::Yielded(ImapYield::WantsWrite(bytes)) => {
939                stream.write_all(&bytes)?;
940            }
941        }
942    }
943}
944
945/// Auto-implemented for `Read + Write + Send + 'static`. `as_any_mut` supports
946/// downcasting back to the concrete stream when needed (e.g. for
947/// `set_read_timeout`).
948pub trait ImapStream: Read + Write + Send + Any {
949    fn as_any_mut(&mut self) -> &mut dyn Any;
950}
951
952impl<T: Read + Write + Send + Any> ImapStream for T {
953    fn as_any_mut(&mut self) -> &mut dyn Any {
954        self
955    }
956}