Skip to main content

imap_client/client/
tokio.rs

1use std::{
2    cmp::Ordering,
3    collections::HashMap,
4    io,
5    num::NonZeroU32,
6    path::PathBuf,
7    pin::Pin,
8    sync::Arc,
9    task::{Context, Poll},
10};
11
12use imap_next::{
13    client::{Error as NextError, Event, Options as ClientOptions},
14    imap_types::{
15        command::{Command, CommandBody, FetchModifier},
16        core::{AString, IString, Literal, LiteralMode, NString, QuotedChar, Tag, Vec1},
17        error::ValidationError,
18        extensions::{
19            binary::{Literal8, LiteralOrLiteral8},
20            enable::CapabilityEnable,
21            sort::{SortCriterion, SortKey},
22            thread::{Thread, ThreadingAlgorithm},
23        },
24        fetch::{MacroOrMessageDataItemNames, MessageDataItem, MessageDataItemName},
25        flag::{Flag, FlagNameAttribute, StoreType},
26        mailbox::{ListMailbox, Mailbox},
27        response::{Code, Status, Tagged},
28        search::SearchKey,
29        secret::Secret,
30        sequence::SequenceSet,
31        IntoStatic,
32    },
33};
34use rip_starttls::imap::tokio::RipStarttls;
35use rustls_platform_verifier::ConfigVerifierExt;
36use thiserror::Error;
37use tokio::{
38    fs,
39    io::{AsyncRead, AsyncWrite, ReadBuf},
40    net::TcpStream,
41    time::timeout,
42};
43use tokio_rustls::{
44    rustls::{
45        pki_types::{pem::PemObject, CertificateDer, ServerName},
46        ClientConfig,
47    },
48    TlsConnector,
49};
50use tracing::{debug, trace};
51
52use crate::{
53    client::verifier::FingerprintVerifier,
54    stream::{self, Stream},
55    tasks::{
56        tasks::{
57            append::{AppendTask, PostAppendCheckTask, PostAppendNoOpTask},
58            appenduid::AppendUidTask,
59            authenticate::AuthenticateTask,
60            capability::CapabilityTask,
61            check::CheckTask,
62            copy::CopyTask,
63            create::CreateTask,
64            delete::DeleteTask,
65            enable::EnableTask,
66            expunge::ExpungeTask,
67            fetch::{FetchFirstTask, FetchTask},
68            id::IdTask,
69            list::ListTask,
70            login::LoginTask,
71            noop::NoOpTask,
72            r#move::MoveTask,
73            search::SearchTask,
74            select::{SelectDataUnvalidated, SelectTask},
75            sort::SortTask,
76            store::StoreTask,
77            thread::ThreadTask,
78            TaskError,
79        },
80        SchedulerError, SchedulerEvent, Task,
81    },
82};
83
84static MAX_SEQUENCE_SIZE: u8 = u8::MAX; // 255
85
86#[derive(Debug, Error)]
87pub enum ClientError {
88    #[error("cannot upgrade client to TLS: client is already in TLS state")]
89    ClientAlreadyTlsError,
90    #[error("cannot do STARTTLS prefix")]
91    DoStarttlsPrefixError(#[from] io::Error),
92    #[error("stream error")]
93    Stream(#[from] stream::Error<SchedulerError>),
94    #[error("validation error")]
95    Validation(#[from] ValidationError),
96    #[error("cannot connect to TCP stream")]
97    ConnectToTcpStreamError(#[source] io::Error),
98    #[error("cannot connect to TLS stream")]
99    ConnectToTlsStreamError(#[source] io::Error),
100    #[error("cannot create TLS client config")]
101    TlsConfig(#[source] tokio_rustls::rustls::Error),
102
103    #[error("cannot receive greeting from server")]
104    ReceiveGreeting(#[source] stream::Error<SchedulerError>),
105    #[error("cannot resolve IMAP task")]
106    ResolveTask(#[from] TaskError),
107    #[error("{0} should contain at least one valid certificate")]
108    EmptyCert(PathBuf),
109    #[error("{1} does not contain a valid certificate")]
110    InvalidCert(tokio_rustls::rustls::pki_types::pem::Error, PathBuf),
111}
112
113pub enum MaybeTlsStream {
114    Plain(TcpStream),
115    Rustls(tokio_rustls::client::TlsStream<TcpStream>),
116}
117
118impl AsyncRead for MaybeTlsStream {
119    fn poll_read(
120        self: Pin<&mut Self>,
121        cx: &mut Context<'_>,
122        buf: &mut ReadBuf<'_>,
123    ) -> Poll<io::Result<()>> {
124        match self.get_mut() {
125            Self::Plain(s) => Pin::new(s).poll_read(cx, buf),
126            Self::Rustls(s) => Pin::new(s).poll_read(cx, buf),
127        }
128    }
129}
130
131impl AsyncWrite for MaybeTlsStream {
132    fn poll_write(
133        self: Pin<&mut Self>,
134        cx: &mut Context<'_>,
135        buf: &[u8],
136    ) -> Poll<io::Result<usize>> {
137        match self.get_mut() {
138            Self::Plain(s) => Pin::new(s).poll_write(cx, buf),
139            Self::Rustls(s) => Pin::new(s).poll_write(cx, buf),
140        }
141    }
142
143    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
144        match self.get_mut() {
145            Self::Plain(s) => Pin::new(s).poll_flush(cx),
146            Self::Rustls(s) => Pin::new(s).poll_flush(cx),
147        }
148    }
149
150    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
151        match self.get_mut() {
152            Self::Plain(s) => Pin::new(s).poll_shutdown(cx),
153            Self::Rustls(s) => Pin::new(s).poll_shutdown(cx),
154        }
155    }
156}
157
158pub struct Client {
159    host: String,
160    pub state: super::Client,
161    pub stream: Stream<MaybeTlsStream>,
162}
163
164/// Client constructors.
165///
166/// This section defines 3 public constructors for [`Client`]:
167/// `insecure`, `tls` and `starttls`.
168impl Client {
169    /// Creates an insecure client, using TCP.
170    ///
171    /// This constructor creates a client based on an raw
172    /// [`TcpStream`], receives greeting then saves server
173    /// capabilities.
174    pub async fn insecure(host: impl ToString, port: u16) -> Result<Self, ClientError> {
175        let mut client = Self::tcp(host, port, false).await?;
176
177        if !client.receive_greeting().await? {
178            client.refresh_capabilities().await?;
179        }
180
181        Ok(client)
182    }
183
184    /// Creates a secure client, using SSL/TLS or STARTTLS.
185    ///
186    /// This constructor creates an client based on a secure
187    /// [`TcpStream`] wrapped into a [`TlsStream`], receives greeting
188    /// then saves server capabilities.
189    pub async fn rustls(
190        host: impl ToString,
191        port: u16,
192        starttls: bool,
193        cert: Option<PathBuf>,
194    ) -> Result<Self, ClientError> {
195        let tcp = Self::tcp(host, port, starttls).await?;
196        Self::upgrade_rustls(tcp, starttls, cert).await
197    }
198
199    /// Creates an insecure client based on a raw [`TcpStream`].
200    ///
201    /// This function is internally used by public constructors
202    /// `insecure`, `tls` and `starttls`.
203    async fn tcp(
204        host: impl ToString,
205        port: u16,
206        discard_greeting: bool,
207    ) -> Result<Self, ClientError> {
208        let host = host.to_string();
209
210        let tcp_stream = TcpStream::connect((host.as_str(), port))
211            .await
212            .map_err(ClientError::ConnectToTcpStreamError)?;
213
214        let stream = Stream::new(MaybeTlsStream::Plain(tcp_stream));
215
216        let mut opts = ClientOptions::default();
217        opts.crlf_relaxed = true;
218        opts.discard_greeting = discard_greeting;
219
220        let state = super::Client::new(opts);
221
222        Ok(Self {
223            host,
224            stream,
225            state,
226        })
227    }
228
229    /// Turns an insecure client into a secure one.
230    ///
231    /// The flow changes depending on the `starttls` parameter:
232    ///
233    /// If `true`: receives greeting, sends STARTTLS command, upgrades
234    /// to TLS then force-refreshes server capabilities.
235    ///
236    /// If `false`: upgrades straight to TLS, receives greeting then
237    /// refreshes server capabilities if needed.
238    async fn upgrade_rustls(
239        mut self,
240        starttls: bool,
241        cert: Option<PathBuf>,
242    ) -> Result<Self, ClientError> {
243        let MaybeTlsStream::Plain(mut tcp_stream) = self.stream.into_inner() else {
244            return Err(ClientError::ClientAlreadyTlsError);
245        };
246
247        if starttls {
248            tcp_stream = RipStarttls::default()
249                .do_starttls_prefix(tcp_stream)
250                .await
251                .map_err(ClientError::DoStarttlsPrefixError)?;
252        }
253
254        let mut config = if let Some(pem_path) = cert {
255            let pem = fs::read(&pem_path).await?;
256
257            let Some(cert) = CertificateDer::pem_slice_iter(&pem).next() else {
258                return Err(ClientError::EmptyCert(pem_path));
259            };
260
261            let cert = match cert {
262                Ok(cert) => cert,
263                Err(err) => return Err(ClientError::InvalidCert(err, pem_path)),
264            };
265
266            let verifier = FingerprintVerifier::new(&cert);
267
268            ClientConfig::builder()
269                .dangerous()
270                .with_custom_certificate_verifier(Arc::new(verifier))
271                .with_no_client_auth()
272        } else {
273            ClientConfig::with_platform_verifier().map_err(ClientError::TlsConfig)?
274        };
275
276        // See <https://www.iana.org/assignments/tls-extensiontype-values/tls-extensiontype-values.xhtml#alpn-protocol-ids>
277        config.alpn_protocols = vec![b"imap".to_vec()];
278
279        let connector = TlsConnector::from(Arc::new(config));
280        let dnsname = ServerName::try_from(self.host.clone()).unwrap();
281
282        let tls_stream = connector
283            .connect(dnsname, tcp_stream)
284            .await
285            .map_err(ClientError::ConnectToTlsStreamError)?;
286
287        self.stream = Stream::new(MaybeTlsStream::Rustls(tls_stream));
288
289        if starttls || !self.receive_greeting().await? {
290            self.refresh_capabilities().await?;
291        }
292
293        Ok(self)
294    }
295
296    /// Receives server greeting.
297    ///
298    /// Returns `true` if server capabilities were found in the
299    /// greeting, otherwise `false`. This boolean is internally used
300    /// to determine if server capabilities need to be explicitly
301    /// requested or not.
302    async fn receive_greeting(&mut self) -> Result<bool, ClientError> {
303        let evt = self
304            .stream
305            .next(&mut self.state.resolver)
306            .await
307            .map_err(ClientError::ReceiveGreeting)?;
308
309        if let SchedulerEvent::GreetingReceived(greeting) = evt {
310            if let Some(Code::Capability(capabilities)) = greeting.code {
311                self.state.capabilities = capabilities;
312                return Ok(true);
313            }
314        }
315
316        Ok(false)
317    }
318}
319
320/// Client low-level API.
321///
322/// This section defines the low-level API of the client, by exposing
323/// convenient wrappers around [`Task`]s. They do not contain any
324/// logic.
325impl Client {
326    /// Resolves the given [`Task`].
327    pub async fn resolve<T: Task>(&mut self, task: T) -> Result<T::Output, ClientError> {
328        Ok(self.stream.next(self.state.resolver.resolve(task)).await?)
329    }
330
331    /// Enables the given capabilities.
332    pub async fn enable(
333        &mut self,
334        capabilities: impl IntoIterator<Item = CapabilityEnable<'_>>,
335    ) -> Result<Option<Vec<CapabilityEnable<'_>>>, ClientError> {
336        if !self.state.ext_enable_supported() {
337            debug!("IMAP ENABLE extension not supported, skipping");
338            return Ok(None);
339        }
340
341        let capabilities: Vec<_> = capabilities
342            .into_iter()
343            .map(IntoStatic::into_static)
344            .collect();
345
346        if capabilities.is_empty() {
347            return Ok(None);
348        }
349
350        let capabilities = Vec1::try_from(capabilities).unwrap();
351
352        Ok(self.resolve(EnableTask::new(capabilities)).await??)
353    }
354
355    /// Enables the CONDSTORE extension if supported by the server.
356    pub async fn enable_condstore_if_supported(&mut self) -> Result<bool, ClientError> {
357        if !self.state.ext_condstore_supported() || !self.state.ext_enable_supported() {
358            return Ok(false);
359        }
360
361        let enable = self.enable(vec![CapabilityEnable::CondStore]);
362
363        let enabled = (enable.await?)
364            .map(|capabilities| capabilities.contains(&CapabilityEnable::CondStore))
365            .unwrap_or(false);
366        self.state.condstore_enabled = enabled;
367        Ok(enabled)
368    }
369
370    /// Creates a new mailbox.
371    pub async fn create(
372        &mut self,
373        mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError>,
374    ) -> Result<(), ClientError> {
375        let mbox = mailbox.try_into()?.into_static();
376        Ok(self.resolve(CreateTask::new(mbox)).await??)
377    }
378
379    /// Lists mailboxes.
380    pub async fn list(
381        &mut self,
382        mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError>,
383        mailbox_wildcard: impl TryInto<ListMailbox<'_>, Error = ValidationError>,
384    ) -> Result<
385        Vec<(
386            Mailbox<'static>,
387            Option<QuotedChar>,
388            Vec<FlagNameAttribute<'static>>,
389        )>,
390        ClientError,
391    > {
392        let mbox = mailbox.try_into()?.into_static();
393        let mbox_wcard = mailbox_wildcard.try_into()?.into_static();
394        Ok(self.resolve(ListTask::new(mbox, mbox_wcard)).await??)
395    }
396
397    /// Selects the given mailbox.
398    pub async fn select(
399        &mut self,
400        mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError>,
401    ) -> Result<SelectDataUnvalidated, ClientError> {
402        let mbox = mailbox.try_into()?.into_static();
403        let task = SelectTask::new(mbox).with_condstore(self.state.condstore_enabled());
404        Ok(self.resolve(task).await??)
405    }
406
407    /// Selects the given mailbox in read-only mode.
408    pub async fn examine(
409        &mut self,
410        mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError>,
411    ) -> Result<SelectDataUnvalidated, ClientError> {
412        let mbox = mailbox.try_into()?.into_static();
413        let task = SelectTask::read_only(mbox).with_condstore(self.state.condstore_enabled());
414        Ok(self.resolve(task).await??)
415    }
416
417    /// Expunges the selected mailbox.
418    ///
419    /// A mailbox needs to be selected before, otherwise this function
420    /// will fail.
421    pub async fn expunge(&mut self) -> Result<Vec<NonZeroU32>, ClientError> {
422        Ok(self.resolve(ExpungeTask::new()).await??)
423    }
424
425    /// Deletes the given mailbox.
426    pub async fn delete(
427        &mut self,
428        mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError>,
429    ) -> Result<(), ClientError> {
430        let mbox = mailbox.try_into()?.into_static();
431        Ok(self.resolve(DeleteTask::new(mbox)).await??)
432    }
433
434    /// Searches messages matching the given criteria.
435    async fn _search(
436        &mut self,
437        criteria: impl IntoIterator<Item = SearchKey<'_>>,
438        uid: bool,
439    ) -> Result<Vec<NonZeroU32>, ClientError> {
440        let criteria: Vec<_> = criteria.into_iter().map(IntoStatic::into_static).collect();
441
442        let criteria = if criteria.is_empty() {
443            Vec1::from(SearchKey::All)
444        } else {
445            Vec1::try_from(criteria).unwrap()
446        };
447
448        Ok(self
449            .resolve(SearchTask::new(criteria).with_uid(uid))
450            .await??)
451    }
452
453    /// Searches messages matching the given criteria.
454    ///
455    /// This function returns sequence numbers, if you need UID see
456    /// [`Client::uid_search`].
457    pub async fn search(
458        &mut self,
459        criteria: impl IntoIterator<Item = SearchKey<'_>>,
460    ) -> Result<Vec<NonZeroU32>, ClientError> {
461        self._search(criteria, false).await
462    }
463
464    /// Searches messages matching the given criteria.
465    ///
466    /// This function returns UIDs, if you need sequence numbers see
467    /// [`Client::search`].
468    pub async fn uid_search(
469        &mut self,
470        criteria: impl IntoIterator<Item = SearchKey<'_>>,
471    ) -> Result<Vec<NonZeroU32>, ClientError> {
472        self._search(criteria, true).await
473    }
474
475    /// Searches messages matching the given search criteria, sorted
476    /// by the given sort criteria.
477    async fn _sort(
478        &mut self,
479        sort_criteria: impl IntoIterator<Item = SortCriterion>,
480        search_criteria: impl IntoIterator<Item = SearchKey<'_>>,
481        uid: bool,
482    ) -> Result<Vec<NonZeroU32>, ClientError> {
483        let sort: Vec<_> = sort_criteria.into_iter().collect();
484        let sort = if sort.is_empty() {
485            Vec1::from(SortCriterion {
486                reverse: true,
487                key: SortKey::Date,
488            })
489        } else {
490            Vec1::try_from(sort).unwrap()
491        };
492
493        let search: Vec<_> = search_criteria
494            .into_iter()
495            .map(IntoStatic::into_static)
496            .collect();
497        let search = if search.is_empty() {
498            Vec1::from(SearchKey::All)
499        } else {
500            Vec1::try_from(search).unwrap()
501        };
502
503        Ok(self
504            .resolve(SortTask::new(sort, search).with_uid(uid))
505            .await??)
506    }
507
508    /// Searches messages matching the given search criteria, sorted
509    /// by the given sort criteria.
510    ///
511    /// This function returns sequence numbers, if you need UID see
512    /// [`Client::uid_sort`].
513    pub async fn sort(
514        &mut self,
515        sort_criteria: impl IntoIterator<Item = SortCriterion>,
516        search_criteria: impl IntoIterator<Item = SearchKey<'_>>,
517    ) -> Result<Vec<NonZeroU32>, ClientError> {
518        self._sort(sort_criteria, search_criteria, false).await
519    }
520
521    /// Searches messages matching the given search criteria, sorted
522    /// by the given sort criteria.
523    ///
524    /// This function returns UIDs, if you need sequence numbers see
525    /// [`Client::sort`].
526    pub async fn uid_sort(
527        &mut self,
528        sort_criteria: impl IntoIterator<Item = SortCriterion>,
529        search_criteria: impl IntoIterator<Item = SearchKey<'_>>,
530    ) -> Result<Vec<NonZeroU32>, ClientError> {
531        self._sort(sort_criteria, search_criteria, true).await
532    }
533
534    async fn _thread(
535        &mut self,
536        algorithm: ThreadingAlgorithm<'_>,
537        search_criteria: impl IntoIterator<Item = SearchKey<'_>>,
538        uid: bool,
539    ) -> Result<Vec<Thread>, ClientError> {
540        let alg = algorithm.into_static();
541
542        let search: Vec<_> = search_criteria
543            .into_iter()
544            .map(IntoStatic::into_static)
545            .collect();
546        let search = if search.is_empty() {
547            Vec1::from(SearchKey::All)
548        } else {
549            Vec1::try_from(search).unwrap()
550        };
551
552        Ok(self
553            .resolve(ThreadTask::new(alg, search).with_uid(uid))
554            .await??)
555    }
556
557    pub async fn thread(
558        &mut self,
559        algorithm: ThreadingAlgorithm<'_>,
560        search_criteria: impl IntoIterator<Item = SearchKey<'_>>,
561    ) -> Result<Vec<Thread>, ClientError> {
562        self._thread(algorithm, search_criteria, false).await
563    }
564
565    pub async fn uid_thread(
566        &mut self,
567        algorithm: ThreadingAlgorithm<'_>,
568        search_criteria: impl IntoIterator<Item = SearchKey<'_>>,
569    ) -> Result<Vec<Thread>, ClientError> {
570        self._thread(algorithm, search_criteria, true).await
571    }
572
573    async fn _store(
574        &mut self,
575        sequence_set: SequenceSet,
576        kind: StoreType,
577        flags: impl IntoIterator<Item = Flag<'_>>,
578        uid: bool,
579    ) -> Result<HashMap<NonZeroU32, Vec1<MessageDataItem<'static>>>, ClientError> {
580        let flags: Vec<_> = flags.into_iter().map(IntoStatic::into_static).collect();
581
582        Ok(self
583            .resolve(StoreTask::new(sequence_set, kind, flags).with_uid(uid))
584            .await??)
585    }
586
587    pub async fn store(
588        &mut self,
589        sequence_set: SequenceSet,
590        kind: StoreType,
591        flags: impl IntoIterator<Item = Flag<'_>>,
592    ) -> Result<HashMap<NonZeroU32, Vec1<MessageDataItem<'static>>>, ClientError> {
593        self._store(sequence_set, kind, flags, false).await
594    }
595
596    pub async fn uid_store(
597        &mut self,
598        sequence_set: SequenceSet,
599        kind: StoreType,
600        flags: impl IntoIterator<Item = Flag<'_>>,
601    ) -> Result<HashMap<NonZeroU32, Vec1<MessageDataItem<'static>>>, ClientError> {
602        self._store(sequence_set, kind, flags, true).await
603    }
604
605    async fn _silent_store(
606        &mut self,
607        sequence_set: SequenceSet,
608        kind: StoreType,
609        flags: impl IntoIterator<Item = Flag<'_>>,
610        uid: bool,
611    ) -> Result<(), ClientError> {
612        let flags: Vec<_> = flags.into_iter().map(IntoStatic::into_static).collect();
613
614        let task = StoreTask::new(sequence_set, kind, flags)
615            .with_uid(uid)
616            .silent();
617
618        Ok(self.resolve(task).await??)
619    }
620
621    pub async fn silent_store(
622        &mut self,
623        sequence_set: SequenceSet,
624        kind: StoreType,
625        flags: impl IntoIterator<Item = Flag<'_>>,
626    ) -> Result<(), ClientError> {
627        self._silent_store(sequence_set, kind, flags, false).await
628    }
629
630    pub async fn uid_silent_store(
631        &mut self,
632        sequence_set: SequenceSet,
633        kind: StoreType,
634        flags: impl IntoIterator<Item = Flag<'_>>,
635    ) -> Result<(), ClientError> {
636        self._silent_store(sequence_set, kind, flags, true).await
637    }
638
639    pub async fn post_append_noop(&mut self) -> Result<Option<u32>, ClientError> {
640        Ok(self.resolve(PostAppendNoOpTask::new()).await??)
641    }
642
643    pub async fn post_append_check(&mut self) -> Result<Option<u32>, ClientError> {
644        Ok(self.resolve(PostAppendCheckTask::new()).await??)
645    }
646
647    async fn _fetch_first(
648        &mut self,
649        id: NonZeroU32,
650        items: MacroOrMessageDataItemNames<'_>,
651        uid: bool,
652    ) -> Result<Vec1<MessageDataItem<'static>>, ClientError> {
653        let items = items.into_static();
654
655        Ok(self
656            .resolve(FetchFirstTask::new(id, items).with_uid(uid))
657            .await??)
658    }
659
660    pub async fn fetch_first(
661        &mut self,
662        id: NonZeroU32,
663        items: MacroOrMessageDataItemNames<'_>,
664    ) -> Result<Vec1<MessageDataItem<'static>>, ClientError> {
665        self._fetch_first(id, items, false).await
666    }
667
668    pub async fn uid_fetch_first(
669        &mut self,
670        id: NonZeroU32,
671        items: MacroOrMessageDataItemNames<'_>,
672    ) -> Result<Vec1<MessageDataItem<'static>>, ClientError> {
673        self._fetch_first(id, items, true).await
674    }
675
676    async fn _copy(
677        &mut self,
678        sequence_set: SequenceSet,
679        mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError>,
680        uid: bool,
681    ) -> Result<(), ClientError> {
682        let mbox = mailbox.try_into()?.into_static();
683
684        Ok(self
685            .resolve(CopyTask::new(sequence_set, mbox).with_uid(uid))
686            .await??)
687    }
688
689    pub async fn copy(
690        &mut self,
691        sequence_set: SequenceSet,
692        mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError>,
693    ) -> Result<(), ClientError> {
694        self._copy(sequence_set, mailbox, false).await
695    }
696
697    pub async fn uid_copy(
698        &mut self,
699        sequence_set: SequenceSet,
700        mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError>,
701    ) -> Result<(), ClientError> {
702        self._copy(sequence_set, mailbox, true).await
703    }
704
705    async fn _move(
706        &mut self,
707        sequence_set: SequenceSet,
708        mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError>,
709        uid: bool,
710    ) -> Result<(), ClientError> {
711        let mbox = mailbox.try_into()?.into_static();
712
713        Ok(self
714            .resolve(MoveTask::new(sequence_set, mbox).with_uid(uid))
715            .await??)
716    }
717
718    pub async fn r#move(
719        &mut self,
720        sequence_set: SequenceSet,
721        mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError>,
722    ) -> Result<(), ClientError> {
723        self._move(sequence_set, mailbox, false).await
724    }
725
726    pub async fn uid_move(
727        &mut self,
728        sequence_set: SequenceSet,
729        mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError>,
730    ) -> Result<(), ClientError> {
731        self._move(sequence_set, mailbox, true).await
732    }
733
734    /// Executes the `CHECK` command.
735    pub async fn check(&mut self) -> Result<(), ClientError> {
736        Ok(self.resolve(CheckTask::new()).await??)
737    }
738
739    /// Executes the `NOOP` command.
740    pub async fn noop(&mut self) -> Result<(), ClientError> {
741        Ok(self.resolve(NoOpTask::new()).await??)
742    }
743}
744
745/// Client medium-level API.
746///
747/// This section defines the medium-level API of the client (based on
748/// the low-level one), by exposing helpers that update client state
749/// and use a small amount of logic (mostly conditional code depending
750/// on available server capabilities).
751impl Client {
752    /// Fetches server capabilities, then saves them.
753    pub async fn refresh_capabilities(&mut self) -> Result<(), ClientError> {
754        self.state.capabilities = self.resolve(CapabilityTask::new()).await??;
755
756        Ok(())
757    }
758
759    /// Identifies the user using the given username and password.
760    pub async fn login(
761        &mut self,
762        username: impl TryInto<AString<'_>, Error = ValidationError>,
763        password: impl TryInto<AString<'_>, Error = ValidationError>,
764    ) -> Result<(), ClientError> {
765        let username = username.try_into()?.into_static();
766        let password = password.try_into()?.into_static();
767        let login = self.resolve(LoginTask::new(username, Secret::new(password)));
768
769        match login.await?? {
770            Some(capabilities) => {
771                self.state.capabilities = capabilities;
772            }
773            None => {
774                self.refresh_capabilities().await?;
775            }
776        };
777
778        Ok(())
779    }
780
781    /// Authenticates the user using the given [`AuthenticateTask`].
782    ///
783    /// This function also refreshes capabilities (either from the
784    /// task output or from explicit request).
785    async fn authenticate(&mut self, task: AuthenticateTask) -> Result<(), ClientError> {
786        match self.resolve(task).await?? {
787            Some(capabilities) => {
788                self.state.capabilities = capabilities;
789            }
790            None => {
791                self.refresh_capabilities().await?;
792            }
793        };
794
795        Ok(())
796    }
797
798    /// Authenticates the user using the `PLAIN` mechanism.
799    pub async fn authenticate_plain(
800        &mut self,
801        login: impl AsRef<str>,
802        password: impl AsRef<str>,
803    ) -> Result<(), ClientError> {
804        self.authenticate(AuthenticateTask::plain(
805            login.as_ref(),
806            password.as_ref(),
807            self.state.ext_sasl_ir_supported(),
808        ))
809        .await
810    }
811
812    /// Authenticates the user using the `XOAUTH2` mechanism.
813    pub async fn authenticate_xoauth2(
814        &mut self,
815        login: impl AsRef<str>,
816        token: impl AsRef<str>,
817    ) -> Result<(), ClientError> {
818        self.authenticate(AuthenticateTask::xoauth2(
819            login.as_ref(),
820            token.as_ref(),
821            self.state.ext_sasl_ir_supported(),
822        ))
823        .await
824    }
825
826    /// Authenticates the user using the `OAUTHBEARER` mechanism.
827    pub async fn authenticate_oauthbearer(
828        &mut self,
829        user: impl AsRef<str>,
830        host: impl AsRef<str>,
831        port: u16,
832        token: impl AsRef<str>,
833    ) -> Result<(), ClientError> {
834        self.authenticate(AuthenticateTask::oauthbearer(
835            user.as_ref(),
836            host.as_ref(),
837            port,
838            token.as_ref(),
839            self.state.ext_sasl_ir_supported(),
840        ))
841        .await
842    }
843
844    /// Exchanges client/server ids.
845    ///
846    /// If the server does not support the `ID` extension, this
847    /// function has no effect.
848    pub async fn id(
849        &mut self,
850        params: Option<Vec<(IString<'static>, NString<'static>)>>,
851    ) -> Result<Option<Vec<(IString<'static>, NString<'static>)>>, ClientError> {
852        Ok(if self.state.ext_id_supported() {
853            self.resolve(IdTask::new(params)).await??
854        } else {
855            debug!("IMAP ID extension not supported, skipping");
856            None
857        })
858    }
859
860    pub async fn append(
861        &mut self,
862        mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError>,
863        flags: impl IntoIterator<Item = Flag<'_>>,
864        message: impl AsRef<[u8]>,
865    ) -> Result<Option<u32>, ClientError> {
866        let mbox = mailbox.try_into()?.into_static();
867
868        let flags: Vec<_> = flags.into_iter().map(IntoStatic::into_static).collect();
869
870        let msg = to_static_literal(message, self.state.ext_binary_supported())?;
871
872        Ok(self
873            .resolve(AppendTask::new(mbox, msg).with_flags(flags))
874            .await??)
875    }
876
877    pub async fn appenduid(
878        &mut self,
879        mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError>,
880        flags: impl IntoIterator<Item = Flag<'_>>,
881        message: impl AsRef<[u8]>,
882    ) -> Result<Option<(NonZeroU32, NonZeroU32)>, ClientError> {
883        let mbox = mailbox.try_into()?.into_static();
884
885        let flags: Vec<_> = flags.into_iter().map(IntoStatic::into_static).collect();
886
887        let msg = to_static_literal(message, self.state.ext_binary_supported())?;
888
889        Ok(self
890            .resolve(AppendUidTask::new(mbox, msg).with_flags(flags))
891            .await??)
892    }
893}
894
895/// Client high-level API.
896///
897/// This section defines the high-level API of the client (based on
898/// the low and medium ones), by exposing opinionated helpers. They
899/// contain more logic, and make use of fallbacks depending on
900/// available server capabilities.
901impl Client {
902    async fn _fetch(
903        &mut self,
904        sequence_set: SequenceSet,
905        items: MacroOrMessageDataItemNames<'_>,
906        uid: bool,
907        modifiers: Option<Vec<FetchModifier>>,
908    ) -> Result<HashMap<NonZeroU32, Vec1<MessageDataItem<'static>>>, ClientError> {
909        let mut items = match items {
910            MacroOrMessageDataItemNames::Macro(m) => m.expand().into_static(),
911            MacroOrMessageDataItemNames::MessageDataItemNames(items) => items.into_static(),
912        };
913
914        if uid {
915            items.push(MessageDataItemName::Uid);
916        }
917
918        let mut task = FetchTask::new(sequence_set, items.into()).with_uid(uid);
919        modifiers.map(|m| task.set_modifiers(m));
920
921        let seq_map = self.resolve(task).await??;
922
923        if uid {
924            let mut uid_map = HashMap::new();
925
926            for (seq, items) in seq_map {
927                let uid = items.as_ref().iter().find_map(|item| {
928                    if let MessageDataItem::Uid(uid) = item {
929                        Some(*uid)
930                    } else {
931                        None
932                    }
933                });
934
935                match uid {
936                    Some(uid) => {
937                        uid_map.insert(uid, items);
938                    }
939                    None => {
940                        debug!(?seq, "cannot get message uid, skipping it");
941                    }
942                }
943            }
944
945            Ok(uid_map)
946        } else {
947            Ok(seq_map)
948        }
949    }
950
951    pub async fn fetch(
952        &mut self,
953        sequence_set: SequenceSet,
954        items: MacroOrMessageDataItemNames<'_>,
955    ) -> Result<HashMap<NonZeroU32, Vec1<MessageDataItem<'static>>>, ClientError> {
956        self._fetch(sequence_set, items, false, None).await
957    }
958
959    pub async fn fetch_with_modifiers(
960        &mut self,
961        sequence_set: SequenceSet,
962        items: MacroOrMessageDataItemNames<'_>,
963        modifiers: Vec<FetchModifier>,
964    ) -> Result<HashMap<NonZeroU32, Vec1<MessageDataItem<'static>>>, ClientError> {
965        self._fetch(sequence_set, items, false, Some(modifiers))
966            .await
967    }
968
969    pub async fn uid_fetch(
970        &mut self,
971        sequence_set: SequenceSet,
972        items: MacroOrMessageDataItemNames<'_>,
973    ) -> Result<HashMap<NonZeroU32, Vec1<MessageDataItem<'static>>>, ClientError> {
974        self._fetch(sequence_set, items, true, None).await
975    }
976
977    pub async fn uid_fetch_with_modifiers(
978        &mut self,
979        sequence_set: SequenceSet,
980        items: MacroOrMessageDataItemNames<'_>,
981        modifiers: Vec<FetchModifier>,
982    ) -> Result<HashMap<NonZeroU32, Vec1<MessageDataItem<'static>>>, ClientError> {
983        self._fetch(sequence_set, items, true, Some(modifiers))
984            .await
985    }
986
987    async fn _sort_or_fallback(
988        &mut self,
989        sort_criteria: impl IntoIterator<Item = SortCriterion> + Clone,
990        search_criteria: impl IntoIterator<Item = SearchKey<'_>>,
991        fetch_items: MacroOrMessageDataItemNames<'_>,
992        uid: bool,
993    ) -> Result<Vec<Vec1<MessageDataItem<'static>>>, ClientError> {
994        let mut fetch_items = match fetch_items {
995            MacroOrMessageDataItemNames::Macro(m) => m.expand().into_static(),
996            MacroOrMessageDataItemNames::MessageDataItemNames(items) => items,
997        };
998
999        if uid && !fetch_items.contains(&MessageDataItemName::Uid) {
1000            fetch_items.push(MessageDataItemName::Uid);
1001        }
1002
1003        let mut fetches = HashMap::new();
1004
1005        if self.state.ext_sort_supported() {
1006            let fetch_items = MacroOrMessageDataItemNames::MessageDataItemNames(fetch_items);
1007            let ids = self._sort(sort_criteria, search_criteria, uid).await?;
1008            let ids_chunks = ids.chunks(MAX_SEQUENCE_SIZE as usize);
1009            let ids_chunks_len = ids_chunks.len();
1010
1011            for (n, ids) in ids_chunks.enumerate() {
1012                debug!(?ids, "fetching sort envelopes {}/{ids_chunks_len}", n + 1);
1013                let ids = SequenceSet::try_from(ids.to_vec())?;
1014                let items = fetch_items.clone();
1015                fetches.extend(self._fetch(ids, items, uid, None).await?);
1016            }
1017
1018            let items = ids.into_iter().flat_map(|id| fetches.remove(&id)).collect();
1019
1020            Ok(items)
1021        } else {
1022            debug!("IMAP SORT extension not supported, using fallback");
1023
1024            let ids = self._search(search_criteria, uid).await?;
1025            let ids_chunks = ids.chunks(MAX_SEQUENCE_SIZE as usize);
1026            let ids_chunks_len = ids_chunks.len();
1027
1028            sort_criteria
1029                .clone()
1030                .into_iter()
1031                .filter_map(|criterion| match criterion.key {
1032                    SortKey::Arrival => Some(MessageDataItemName::InternalDate),
1033                    SortKey::Cc => Some(MessageDataItemName::Envelope),
1034                    SortKey::Date => Some(MessageDataItemName::Envelope),
1035                    SortKey::From => Some(MessageDataItemName::Envelope),
1036                    SortKey::Size => Some(MessageDataItemName::Rfc822Size),
1037                    SortKey::Subject => Some(MessageDataItemName::Envelope),
1038                    SortKey::To => Some(MessageDataItemName::Envelope),
1039                    SortKey::DisplayFrom => None,
1040                    SortKey::DisplayTo => None,
1041                })
1042                .for_each(|item| {
1043                    if !fetch_items.contains(&item) {
1044                        fetch_items.push(item)
1045                    }
1046                });
1047
1048            for (n, ids) in ids_chunks.enumerate() {
1049                debug!(?ids, "fetching search envelopes {}/{ids_chunks_len}", n + 1);
1050                let ids = SequenceSet::try_from(ids.to_vec())?;
1051                let items = fetch_items.clone();
1052                fetches.extend(self._fetch(ids, items.into(), uid, None).await?);
1053            }
1054
1055            let mut fetches: Vec<_> = fetches.into_values().collect();
1056
1057            fetches.sort_by(|a, b| {
1058                for criterion in sort_criteria.clone().into_iter() {
1059                    let mut cmp = cmp_fetch_items(&criterion.key, a, b);
1060
1061                    if criterion.reverse {
1062                        cmp = cmp.reverse();
1063                    }
1064
1065                    if cmp.is_ne() {
1066                        return cmp;
1067                    }
1068                }
1069
1070                cmp_fetch_items(&SortKey::Date, a, b)
1071            });
1072
1073            Ok(fetches)
1074        }
1075    }
1076
1077    pub async fn sort_or_fallback(
1078        &mut self,
1079        sort_criteria: impl IntoIterator<Item = SortCriterion> + Clone,
1080        search_criteria: impl IntoIterator<Item = SearchKey<'_>>,
1081        fetch_items: MacroOrMessageDataItemNames<'_>,
1082    ) -> Result<Vec<Vec1<MessageDataItem<'static>>>, ClientError> {
1083        self._sort_or_fallback(sort_criteria, search_criteria, fetch_items, false)
1084            .await
1085    }
1086
1087    pub async fn uid_sort_or_fallback(
1088        &mut self,
1089        sort_criteria: impl IntoIterator<Item = SortCriterion> + Clone,
1090        search_criteria: impl IntoIterator<Item = SearchKey<'_>>,
1091        fetch_items: MacroOrMessageDataItemNames<'_>,
1092    ) -> Result<Vec<Vec1<MessageDataItem<'static>>>, ClientError> {
1093        self._sort_or_fallback(sort_criteria, search_criteria, fetch_items, true)
1094            .await
1095    }
1096
1097    pub async fn appenduid_or_fallback(
1098        &mut self,
1099        mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError> + Clone,
1100        flags: impl IntoIterator<Item = Flag<'_>>,
1101        message: impl AsRef<[u8]>,
1102    ) -> Result<Option<NonZeroU32>, ClientError> {
1103        if self.state.ext_uidplus_supported() {
1104            Ok(self
1105                .appenduid(mailbox, flags, message)
1106                .await?
1107                .map(|(uid, _)| uid))
1108        } else {
1109            debug!("IMAP UIDPLUS extension not supported, using fallback");
1110
1111            // If the mailbox is currently selected, the normal new
1112            // message actions SHOULD occur.  Specifically, the server
1113            // SHOULD notify the client immediately via an untagged
1114            // EXISTS response.  If the server does not do so, the
1115            // client MAY issue a NOOP command (or failing that, a
1116            // CHECK command) after one or more APPEND commands.
1117            //
1118            // <https://datatracker.ietf.org/doc/html/rfc3501#section-6.3.11>
1119            self.select(mailbox.clone()).await?;
1120
1121            let seq = match self.append(mailbox, flags, message).await? {
1122                Some(seq) => seq,
1123                None => match self.post_append_noop().await? {
1124                    Some(seq) => seq,
1125                    None => self
1126                        .post_append_check()
1127                        .await?
1128                        .ok_or(ClientError::ResolveTask(TaskError::MissingData(
1129                            "APPENDUID: seq".into(),
1130                        )))?,
1131                },
1132            };
1133
1134            let uid = self
1135                .search(Vec1::from(SearchKey::SequenceSet(seq.try_into().unwrap())))
1136                .await?
1137                .into_iter()
1138                .next();
1139
1140            Ok(uid)
1141        }
1142    }
1143
1144    async fn _move_or_fallback(
1145        &mut self,
1146        sequence_set: SequenceSet,
1147        mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError>,
1148        uid: bool,
1149    ) -> Result<(), ClientError> {
1150        if self.state.ext_move_supported() {
1151            self._move(sequence_set, mailbox, uid).await
1152        } else {
1153            debug!("IMAP MOVE extension not supported, using fallback");
1154            self._copy(sequence_set.clone(), mailbox, uid).await?;
1155            self._silent_store(sequence_set, StoreType::Add, Some(Flag::Deleted), uid)
1156                .await?;
1157            self.expunge().await?;
1158            Ok(())
1159        }
1160    }
1161
1162    pub async fn move_or_fallback(
1163        &mut self,
1164        sequence_set: SequenceSet,
1165        mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError>,
1166    ) -> Result<(), ClientError> {
1167        self._move_or_fallback(sequence_set, mailbox, false).await
1168    }
1169
1170    pub async fn uid_move_or_fallback(
1171        &mut self,
1172        sequence_set: SequenceSet,
1173        mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError>,
1174    ) -> Result<(), ClientError> {
1175        self._move_or_fallback(sequence_set, mailbox, true).await
1176    }
1177
1178    pub fn enqueue_idle(&mut self) -> Tag<'static> {
1179        let tag = self.state.resolver.scheduler.tag_generator.generate();
1180
1181        self.state
1182            .resolver
1183            .scheduler
1184            .client_next
1185            .enqueue_command(Command {
1186                tag: tag.clone(),
1187                body: CommandBody::Idle,
1188            });
1189
1190        tag.into_static()
1191    }
1192
1193    #[tracing::instrument(name = "idle", skip_all)]
1194    pub async fn idle(&mut self, tag: Tag<'static>) -> Result<(), stream::Error<NextError>> {
1195        debug!("starting the main loop");
1196
1197        loop {
1198            let progress = self
1199                .stream
1200                .next(&mut self.state.resolver.scheduler.client_next);
1201            match timeout(self.state.idle_timeout, progress).await.ok() {
1202                None => {
1203                    debug!("timed out, sending done command…");
1204                    self.state.resolver.scheduler.client_next.set_idle_done();
1205                }
1206                Some(Err(err)) => {
1207                    break Err(err);
1208                }
1209                Some(Ok(Event::IdleCommandSent { .. })) => {
1210                    debug!("command sent");
1211                }
1212                Some(Ok(Event::IdleAccepted { .. })) => {
1213                    debug!("command accepted, entering idle mode");
1214                }
1215                Some(Ok(Event::IdleRejected { status, .. })) => {
1216                    debug!("command rejected, aborting: {status:?}");
1217                    break Ok(());
1218                }
1219                Some(Ok(Event::IdleDoneSent { .. })) => {
1220                    debug!("done command sent");
1221                }
1222                Some(Ok(Event::DataReceived { data })) => {
1223                    debug!("received data, sending done command…");
1224                    trace!("{data:#?}");
1225                    self.state.resolver.scheduler.client_next.set_idle_done();
1226                }
1227                Some(Ok(Event::StatusReceived {
1228                    status:
1229                        Status::Tagged(Tagged {
1230                            tag: ref got_tag, ..
1231                        }),
1232                })) if *got_tag == tag => {
1233                    debug!("received tagged response, exiting");
1234                    break Ok(());
1235                }
1236                Some(event) => {
1237                    debug!("received unknown event, ignoring: {event:?}");
1238                }
1239            }
1240        }
1241    }
1242
1243    #[tracing::instrument(name = "idle/done", skip_all)]
1244    pub async fn idle_done(&mut self, tag: Tag<'static>) -> Result<(), stream::Error<NextError>> {
1245        self.state.resolver.scheduler.client_next.set_idle_done();
1246
1247        loop {
1248            let progress = self
1249                .stream
1250                .next(&mut self.state.resolver.scheduler.client_next)
1251                .await?;
1252
1253            match progress {
1254                Event::IdleDoneSent { .. } => {
1255                    debug!("done command sent");
1256                }
1257                Event::StatusReceived {
1258                    status:
1259                        Status::Tagged(Tagged {
1260                            tag: ref got_tag, ..
1261                        }),
1262                } if *got_tag == tag => {
1263                    debug!("received tagged response, exiting");
1264                    break Ok(());
1265                }
1266                event => {
1267                    debug!("received unknown event, ignoring: {event:?}");
1268                }
1269            }
1270        }
1271    }
1272}
1273
1274pub(crate) fn cmp_fetch_items(
1275    criterion: &SortKey,
1276    a: &Vec1<MessageDataItem>,
1277    b: &Vec1<MessageDataItem>,
1278) -> Ordering {
1279    use MessageDataItem::*;
1280
1281    match &criterion {
1282        SortKey::Arrival => {
1283            let a = a.as_ref().iter().find_map(|a| {
1284                if let InternalDate(dt) = a {
1285                    Some(dt.as_ref())
1286                } else {
1287                    None
1288                }
1289            });
1290
1291            let b = b.as_ref().iter().find_map(|b| {
1292                if let InternalDate(dt) = b {
1293                    Some(dt.as_ref())
1294                } else {
1295                    None
1296                }
1297            });
1298
1299            a.cmp(&b)
1300        }
1301        SortKey::Date => {
1302            let a = a.as_ref().iter().find_map(|a| {
1303                if let Envelope(envelope) = a {
1304                    envelope.date.0.as_ref().map(AsRef::as_ref)
1305                } else {
1306                    None
1307                }
1308            });
1309
1310            let b = b.as_ref().iter().find_map(|b| {
1311                if let Envelope(envelope) = b {
1312                    envelope.date.0.as_ref().map(AsRef::as_ref)
1313                } else {
1314                    None
1315                }
1316            });
1317
1318            a.cmp(&b)
1319        }
1320        SortKey::Size => {
1321            let a = a.as_ref().iter().find_map(|a| {
1322                if let Rfc822Size(size) = a {
1323                    Some(size)
1324                } else {
1325                    None
1326                }
1327            });
1328
1329            let b = b.as_ref().iter().find_map(|b| {
1330                if let Rfc822Size(size) = b {
1331                    Some(size)
1332                } else {
1333                    None
1334                }
1335            });
1336
1337            a.cmp(&b)
1338        }
1339        SortKey::Subject => {
1340            let a = a.as_ref().iter().find_map(|a| {
1341                if let Envelope(envelope) = a {
1342                    envelope.subject.0.as_ref().map(AsRef::as_ref)
1343                } else {
1344                    None
1345                }
1346            });
1347
1348            let b = b.as_ref().iter().find_map(|b| {
1349                if let Envelope(envelope) = b {
1350                    envelope.subject.0.as_ref().map(AsRef::as_ref)
1351                } else {
1352                    None
1353                }
1354            });
1355
1356            a.cmp(&b)
1357        }
1358        // FIXME: Address missing Ord derive in imap-types
1359        SortKey::Cc | SortKey::From | SortKey::To | SortKey::DisplayFrom | SortKey::DisplayTo => {
1360            Ordering::Equal
1361        }
1362    }
1363}
1364
1365pub(crate) fn to_static_literal(
1366    message: impl AsRef<[u8]>,
1367    ext_binary_supported: bool,
1368) -> Result<LiteralOrLiteral8<'static>, ValidationError> {
1369    let message = if ext_binary_supported {
1370        LiteralOrLiteral8::Literal8(Literal8 {
1371            data: message.as_ref().into(),
1372            mode: LiteralMode::Sync,
1373        })
1374    } else {
1375        debug!("IMAP BINARY extension not supported, using fallback");
1376        Literal::validate(message.as_ref())?;
1377        LiteralOrLiteral8::Literal(Literal::unvalidated(message.as_ref()))
1378    };
1379
1380    Ok(message.into_static())
1381}