Skip to main content

imap_client/client/
tokio.rs

1use std::{
2    cmp::Ordering,
3    collections::HashMap,
4    io,
5    num::NonZeroU32,
6    path::PathBuf,
7    pin::Pin,
8    sync::Arc,
9    task::{Context, Poll},
10};
11
12use imap_next::{
13    client::{Error as NextError, Event, Options as ClientOptions},
14    imap_types::{
15        command::{Command, CommandBody},
16        core::{AString, IString, Literal, LiteralMode, NString, QuotedChar, Tag, Vec1},
17        error::ValidationError,
18        extensions::{
19            binary::{Literal8, LiteralOrLiteral8},
20            enable::CapabilityEnable,
21            sort::{SortCriterion, SortKey},
22            thread::{Thread, ThreadingAlgorithm},
23        },
24        fetch::{MacroOrMessageDataItemNames, MessageDataItem, MessageDataItemName},
25        flag::{Flag, FlagNameAttribute, StoreType},
26        mailbox::{ListMailbox, Mailbox},
27        response::{Code, Status, Tagged},
28        search::SearchKey,
29        secret::Secret,
30        sequence::SequenceSet,
31        IntoStatic,
32    },
33};
34use rip_starttls::imap::tokio::RipStarttls;
35use rustls_platform_verifier::ConfigVerifierExt;
36use thiserror::Error;
37use tokio::{
38    fs,
39    io::{AsyncRead, AsyncWrite, ReadBuf},
40    net::TcpStream,
41    time::timeout,
42};
43use tokio_rustls::{
44    rustls::{
45        pki_types::{pem::PemObject, CertificateDer, ServerName},
46        ClientConfig,
47    },
48    TlsConnector,
49};
50use tracing::{debug, trace};
51
52use crate::{
53    client::verifier::FingerprintVerifier,
54    stream::{self, Stream},
55    tasks::{
56        tasks::{
57            append::{AppendTask, PostAppendCheckTask, PostAppendNoOpTask},
58            appenduid::AppendUidTask,
59            authenticate::AuthenticateTask,
60            capability::CapabilityTask,
61            check::CheckTask,
62            copy::CopyTask,
63            create::CreateTask,
64            delete::DeleteTask,
65            enable::EnableTask,
66            expunge::ExpungeTask,
67            fetch::{FetchFirstTask, FetchTask},
68            id::IdTask,
69            list::ListTask,
70            login::LoginTask,
71            noop::NoOpTask,
72            r#move::MoveTask,
73            search::SearchTask,
74            select::{SelectDataUnvalidated, SelectTask},
75            sort::SortTask,
76            store::StoreTask,
77            thread::ThreadTask,
78            TaskError,
79        },
80        SchedulerError, SchedulerEvent, Task,
81    },
82};
83
84static MAX_SEQUENCE_SIZE: u8 = u8::MAX; // 255
85
86#[derive(Debug, Error)]
87pub enum ClientError {
88    #[error("cannot upgrade client to TLS: client is already in TLS state")]
89    ClientAlreadyTlsError,
90    #[error("cannot do STARTTLS prefix")]
91    DoStarttlsPrefixError(#[from] io::Error),
92    #[error("stream error")]
93    Stream(#[from] stream::Error<SchedulerError>),
94    #[error("validation error")]
95    Validation(#[from] ValidationError),
96    #[error("cannot connect to TCP stream")]
97    ConnectToTcpStreamError(#[source] io::Error),
98    #[error("cannot connect to TLS stream")]
99    ConnectToTlsStreamError(#[source] io::Error),
100    #[error("cannot create TLS client config")]
101    TlsConfig(#[source] tokio_rustls::rustls::Error),
102
103    #[error("cannot receive greeting from server")]
104    ReceiveGreeting(#[source] stream::Error<SchedulerError>),
105    #[error("cannot resolve IMAP task")]
106    ResolveTask(#[from] TaskError),
107    #[error("{0} should contain at least one valid certificate")]
108    EmptyCert(PathBuf),
109    #[error("{1} does not contain a valid certificate")]
110    InvalidCert(tokio_rustls::rustls::pki_types::pem::Error, PathBuf),
111}
112
113pub enum MaybeTlsStream {
114    Plain(TcpStream),
115    Rustls(tokio_rustls::client::TlsStream<TcpStream>),
116}
117
118impl AsyncRead for MaybeTlsStream {
119    fn poll_read(
120        self: Pin<&mut Self>,
121        cx: &mut Context<'_>,
122        buf: &mut ReadBuf<'_>,
123    ) -> Poll<io::Result<()>> {
124        match self.get_mut() {
125            Self::Plain(s) => Pin::new(s).poll_read(cx, buf),
126            Self::Rustls(s) => Pin::new(s).poll_read(cx, buf),
127        }
128    }
129}
130
131impl AsyncWrite for MaybeTlsStream {
132    fn poll_write(
133        self: Pin<&mut Self>,
134        cx: &mut Context<'_>,
135        buf: &[u8],
136    ) -> Poll<io::Result<usize>> {
137        match self.get_mut() {
138            Self::Plain(s) => Pin::new(s).poll_write(cx, buf),
139            Self::Rustls(s) => Pin::new(s).poll_write(cx, buf),
140        }
141    }
142
143    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
144        match self.get_mut() {
145            Self::Plain(s) => Pin::new(s).poll_flush(cx),
146            Self::Rustls(s) => Pin::new(s).poll_flush(cx),
147        }
148    }
149
150    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
151        match self.get_mut() {
152            Self::Plain(s) => Pin::new(s).poll_shutdown(cx),
153            Self::Rustls(s) => Pin::new(s).poll_shutdown(cx),
154        }
155    }
156}
157
158pub struct Client {
159    host: String,
160    pub state: super::Client,
161    pub stream: Stream<MaybeTlsStream>,
162}
163
164/// Client constructors.
165///
166/// This section defines 3 public constructors for [`Client`]:
167/// `insecure`, `tls` and `starttls`.
168impl Client {
169    /// Creates an insecure client, using TCP.
170    ///
171    /// This constructor creates a client based on an raw
172    /// [`TcpStream`], receives greeting then saves server
173    /// capabilities.
174    pub async fn insecure(host: impl ToString, port: u16) -> Result<Self, ClientError> {
175        let mut client = Self::tcp(host, port, false).await?;
176
177        if !client.receive_greeting().await? {
178            client.refresh_capabilities().await?;
179        }
180
181        Ok(client)
182    }
183
184    /// Creates a secure client, using SSL/TLS or STARTTLS.
185    ///
186    /// This constructor creates an client based on a secure
187    /// [`TcpStream`] wrapped into a [`TlsStream`], receives greeting
188    /// then saves server capabilities.
189    pub async fn rustls(
190        host: impl ToString,
191        port: u16,
192        starttls: bool,
193        cert: Option<PathBuf>,
194    ) -> Result<Self, ClientError> {
195        let tcp = Self::tcp(host, port, starttls).await?;
196        Self::upgrade_rustls(tcp, starttls, cert).await
197    }
198
199    /// Creates an insecure client based on a raw [`TcpStream`].
200    ///
201    /// This function is internally used by public constructors
202    /// `insecure`, `tls` and `starttls`.
203    async fn tcp(
204        host: impl ToString,
205        port: u16,
206        discard_greeting: bool,
207    ) -> Result<Self, ClientError> {
208        let host = host.to_string();
209
210        let tcp_stream = TcpStream::connect((host.as_str(), port))
211            .await
212            .map_err(ClientError::ConnectToTcpStreamError)?;
213
214        let stream = Stream::new(MaybeTlsStream::Plain(tcp_stream));
215
216        let mut opts = ClientOptions::default();
217        opts.crlf_relaxed = true;
218        opts.discard_greeting = discard_greeting;
219
220        let state = super::Client::new(opts);
221
222        Ok(Self {
223            host,
224            stream,
225            state,
226        })
227    }
228
229    /// Turns an insecure client into a secure one.
230    ///
231    /// The flow changes depending on the `starttls` parameter:
232    ///
233    /// If `true`: receives greeting, sends STARTTLS command, upgrades
234    /// to TLS then force-refreshes server capabilities.
235    ///
236    /// If `false`: upgrades straight to TLS, receives greeting then
237    /// refreshes server capabilities if needed.
238    async fn upgrade_rustls(
239        mut self,
240        starttls: bool,
241        cert: Option<PathBuf>,
242    ) -> Result<Self, ClientError> {
243        let MaybeTlsStream::Plain(mut tcp_stream) = self.stream.into_inner() else {
244            return Err(ClientError::ClientAlreadyTlsError);
245        };
246
247        if starttls {
248            tcp_stream = RipStarttls::default()
249                .do_starttls_prefix(tcp_stream)
250                .await
251                .map_err(ClientError::DoStarttlsPrefixError)?;
252        }
253
254        let mut config = if let Some(pem_path) = cert {
255            let pem = fs::read(&pem_path).await?;
256
257            let Some(cert) = CertificateDer::pem_slice_iter(&pem).next() else {
258                return Err(ClientError::EmptyCert(pem_path));
259            };
260
261            let cert = match cert {
262                Ok(cert) => cert,
263                Err(err) => return Err(ClientError::InvalidCert(err, pem_path)),
264            };
265
266            let verifier = FingerprintVerifier::new(&cert);
267
268            ClientConfig::builder()
269                .dangerous()
270                .with_custom_certificate_verifier(Arc::new(verifier))
271                .with_no_client_auth()
272        } else {
273            ClientConfig::with_platform_verifier().map_err(ClientError::TlsConfig)?
274        };
275
276        // See <https://www.iana.org/assignments/tls-extensiontype-values/tls-extensiontype-values.xhtml#alpn-protocol-ids>
277        config.alpn_protocols = vec![b"imap".to_vec()];
278
279        let connector = TlsConnector::from(Arc::new(config));
280        let dnsname = ServerName::try_from(self.host.clone()).unwrap();
281
282        let tls_stream = connector
283            .connect(dnsname, tcp_stream)
284            .await
285            .map_err(ClientError::ConnectToTlsStreamError)?;
286
287        self.stream = Stream::new(MaybeTlsStream::Rustls(tls_stream));
288
289        if starttls || !self.receive_greeting().await? {
290            self.refresh_capabilities().await?;
291        }
292
293        Ok(self)
294    }
295
296    /// Receives server greeting.
297    ///
298    /// Returns `true` if server capabilities were found in the
299    /// greeting, otherwise `false`. This boolean is internally used
300    /// to determine if server capabilities need to be explicitly
301    /// requested or not.
302    async fn receive_greeting(&mut self) -> Result<bool, ClientError> {
303        let evt = self
304            .stream
305            .next(&mut self.state.resolver)
306            .await
307            .map_err(ClientError::ReceiveGreeting)?;
308
309        if let SchedulerEvent::GreetingReceived(greeting) = evt {
310            if let Some(Code::Capability(capabilities)) = greeting.code {
311                self.state.capabilities = capabilities;
312                return Ok(true);
313            }
314        }
315
316        Ok(false)
317    }
318}
319
320/// Client low-level API.
321///
322/// This section defines the low-level API of the client, by exposing
323/// convenient wrappers around [`Task`]s. They do not contain any
324/// logic.
325impl Client {
326    /// Resolves the given [`Task`].
327    pub async fn resolve<T: Task>(&mut self, task: T) -> Result<T::Output, ClientError> {
328        Ok(self.stream.next(self.state.resolver.resolve(task)).await?)
329    }
330
331    /// Enables the given capabilities.
332    pub async fn enable(
333        &mut self,
334        capabilities: impl IntoIterator<Item = CapabilityEnable<'_>>,
335    ) -> Result<Option<Vec<CapabilityEnable<'_>>>, ClientError> {
336        if !self.state.ext_enable_supported() {
337            debug!("IMAP ENABLE extension not supported, skipping");
338            return Ok(None);
339        }
340
341        let capabilities: Vec<_> = capabilities
342            .into_iter()
343            .map(IntoStatic::into_static)
344            .collect();
345
346        if capabilities.is_empty() {
347            return Ok(None);
348        }
349
350        let capabilities = Vec1::try_from(capabilities).unwrap();
351
352        Ok(self.resolve(EnableTask::new(capabilities)).await??)
353    }
354
355    /// Creates a new mailbox.
356    pub async fn create(
357        &mut self,
358        mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError>,
359    ) -> Result<(), ClientError> {
360        let mbox = mailbox.try_into()?.into_static();
361        Ok(self.resolve(CreateTask::new(mbox)).await??)
362    }
363
364    /// Lists mailboxes.
365    pub async fn list(
366        &mut self,
367        mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError>,
368        mailbox_wildcard: impl TryInto<ListMailbox<'_>, Error = ValidationError>,
369    ) -> Result<
370        Vec<(
371            Mailbox<'static>,
372            Option<QuotedChar>,
373            Vec<FlagNameAttribute<'static>>,
374        )>,
375        ClientError,
376    > {
377        let mbox = mailbox.try_into()?.into_static();
378        let mbox_wcard = mailbox_wildcard.try_into()?.into_static();
379        Ok(self.resolve(ListTask::new(mbox, mbox_wcard)).await??)
380    }
381
382    /// Selects the given mailbox.
383    pub async fn select(
384        &mut self,
385        mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError>,
386    ) -> Result<SelectDataUnvalidated, ClientError> {
387        let mbox = mailbox.try_into()?.into_static();
388        Ok(self.resolve(SelectTask::new(mbox)).await??)
389    }
390
391    /// Selects the given mailbox in read-only mode.
392    pub async fn examine(
393        &mut self,
394        mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError>,
395    ) -> Result<SelectDataUnvalidated, ClientError> {
396        let mbox = mailbox.try_into()?.into_static();
397        Ok(self.resolve(SelectTask::read_only(mbox)).await??)
398    }
399
400    /// Expunges the selected mailbox.
401    ///
402    /// A mailbox needs to be selected before, otherwise this function
403    /// will fail.
404    pub async fn expunge(&mut self) -> Result<Vec<NonZeroU32>, ClientError> {
405        Ok(self.resolve(ExpungeTask::new()).await??)
406    }
407
408    /// Deletes the given mailbox.
409    pub async fn delete(
410        &mut self,
411        mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError>,
412    ) -> Result<(), ClientError> {
413        let mbox = mailbox.try_into()?.into_static();
414        Ok(self.resolve(DeleteTask::new(mbox)).await??)
415    }
416
417    /// Searches messages matching the given criteria.
418    async fn _search(
419        &mut self,
420        criteria: impl IntoIterator<Item = SearchKey<'_>>,
421        uid: bool,
422    ) -> Result<Vec<NonZeroU32>, ClientError> {
423        let criteria: Vec<_> = criteria.into_iter().map(IntoStatic::into_static).collect();
424
425        let criteria = if criteria.is_empty() {
426            Vec1::from(SearchKey::All)
427        } else {
428            Vec1::try_from(criteria).unwrap()
429        };
430
431        Ok(self
432            .resolve(SearchTask::new(criteria).with_uid(uid))
433            .await??)
434    }
435
436    /// Searches messages matching the given criteria.
437    ///
438    /// This function returns sequence numbers, if you need UID see
439    /// [`Client::uid_search`].
440    pub async fn search(
441        &mut self,
442        criteria: impl IntoIterator<Item = SearchKey<'_>>,
443    ) -> Result<Vec<NonZeroU32>, ClientError> {
444        self._search(criteria, false).await
445    }
446
447    /// Searches messages matching the given criteria.
448    ///
449    /// This function returns UIDs, if you need sequence numbers see
450    /// [`Client::search`].
451    pub async fn uid_search(
452        &mut self,
453        criteria: impl IntoIterator<Item = SearchKey<'_>>,
454    ) -> Result<Vec<NonZeroU32>, ClientError> {
455        self._search(criteria, true).await
456    }
457
458    /// Searches messages matching the given search criteria, sorted
459    /// by the given sort criteria.
460    async fn _sort(
461        &mut self,
462        sort_criteria: impl IntoIterator<Item = SortCriterion>,
463        search_criteria: impl IntoIterator<Item = SearchKey<'_>>,
464        uid: bool,
465    ) -> Result<Vec<NonZeroU32>, ClientError> {
466        let sort: Vec<_> = sort_criteria.into_iter().collect();
467        let sort = if sort.is_empty() {
468            Vec1::from(SortCriterion {
469                reverse: true,
470                key: SortKey::Date,
471            })
472        } else {
473            Vec1::try_from(sort).unwrap()
474        };
475
476        let search: Vec<_> = search_criteria
477            .into_iter()
478            .map(IntoStatic::into_static)
479            .collect();
480        let search = if search.is_empty() {
481            Vec1::from(SearchKey::All)
482        } else {
483            Vec1::try_from(search).unwrap()
484        };
485
486        Ok(self
487            .resolve(SortTask::new(sort, search).with_uid(uid))
488            .await??)
489    }
490
491    /// Searches messages matching the given search criteria, sorted
492    /// by the given sort criteria.
493    ///
494    /// This function returns sequence numbers, if you need UID see
495    /// [`Client::uid_sort`].
496    pub async fn sort(
497        &mut self,
498        sort_criteria: impl IntoIterator<Item = SortCriterion>,
499        search_criteria: impl IntoIterator<Item = SearchKey<'_>>,
500    ) -> Result<Vec<NonZeroU32>, ClientError> {
501        self._sort(sort_criteria, search_criteria, false).await
502    }
503
504    /// Searches messages matching the given search criteria, sorted
505    /// by the given sort criteria.
506    ///
507    /// This function returns UIDs, if you need sequence numbers see
508    /// [`Client::sort`].
509    pub async fn uid_sort(
510        &mut self,
511        sort_criteria: impl IntoIterator<Item = SortCriterion>,
512        search_criteria: impl IntoIterator<Item = SearchKey<'_>>,
513    ) -> Result<Vec<NonZeroU32>, ClientError> {
514        self._sort(sort_criteria, search_criteria, true).await
515    }
516
517    async fn _thread(
518        &mut self,
519        algorithm: ThreadingAlgorithm<'_>,
520        search_criteria: impl IntoIterator<Item = SearchKey<'_>>,
521        uid: bool,
522    ) -> Result<Vec<Thread>, ClientError> {
523        let alg = algorithm.into_static();
524
525        let search: Vec<_> = search_criteria
526            .into_iter()
527            .map(IntoStatic::into_static)
528            .collect();
529        let search = if search.is_empty() {
530            Vec1::from(SearchKey::All)
531        } else {
532            Vec1::try_from(search).unwrap()
533        };
534
535        Ok(self
536            .resolve(ThreadTask::new(alg, search).with_uid(uid))
537            .await??)
538    }
539
540    pub async fn thread(
541        &mut self,
542        algorithm: ThreadingAlgorithm<'_>,
543        search_criteria: impl IntoIterator<Item = SearchKey<'_>>,
544    ) -> Result<Vec<Thread>, ClientError> {
545        self._thread(algorithm, search_criteria, false).await
546    }
547
548    pub async fn uid_thread(
549        &mut self,
550        algorithm: ThreadingAlgorithm<'_>,
551        search_criteria: impl IntoIterator<Item = SearchKey<'_>>,
552    ) -> Result<Vec<Thread>, ClientError> {
553        self._thread(algorithm, search_criteria, true).await
554    }
555
556    async fn _store(
557        &mut self,
558        sequence_set: SequenceSet,
559        kind: StoreType,
560        flags: impl IntoIterator<Item = Flag<'_>>,
561        uid: bool,
562    ) -> Result<HashMap<NonZeroU32, Vec1<MessageDataItem<'static>>>, ClientError> {
563        let flags: Vec<_> = flags.into_iter().map(IntoStatic::into_static).collect();
564
565        Ok(self
566            .resolve(StoreTask::new(sequence_set, kind, flags).with_uid(uid))
567            .await??)
568    }
569
570    pub async fn store(
571        &mut self,
572        sequence_set: SequenceSet,
573        kind: StoreType,
574        flags: impl IntoIterator<Item = Flag<'_>>,
575    ) -> Result<HashMap<NonZeroU32, Vec1<MessageDataItem<'static>>>, ClientError> {
576        self._store(sequence_set, kind, flags, false).await
577    }
578
579    pub async fn uid_store(
580        &mut self,
581        sequence_set: SequenceSet,
582        kind: StoreType,
583        flags: impl IntoIterator<Item = Flag<'_>>,
584    ) -> Result<HashMap<NonZeroU32, Vec1<MessageDataItem<'static>>>, ClientError> {
585        self._store(sequence_set, kind, flags, true).await
586    }
587
588    async fn _silent_store(
589        &mut self,
590        sequence_set: SequenceSet,
591        kind: StoreType,
592        flags: impl IntoIterator<Item = Flag<'_>>,
593        uid: bool,
594    ) -> Result<(), ClientError> {
595        let flags: Vec<_> = flags.into_iter().map(IntoStatic::into_static).collect();
596
597        let task = StoreTask::new(sequence_set, kind, flags)
598            .with_uid(uid)
599            .silent();
600
601        Ok(self.resolve(task).await??)
602    }
603
604    pub async fn silent_store(
605        &mut self,
606        sequence_set: SequenceSet,
607        kind: StoreType,
608        flags: impl IntoIterator<Item = Flag<'_>>,
609    ) -> Result<(), ClientError> {
610        self._silent_store(sequence_set, kind, flags, false).await
611    }
612
613    pub async fn uid_silent_store(
614        &mut self,
615        sequence_set: SequenceSet,
616        kind: StoreType,
617        flags: impl IntoIterator<Item = Flag<'_>>,
618    ) -> Result<(), ClientError> {
619        self._silent_store(sequence_set, kind, flags, true).await
620    }
621
622    pub async fn post_append_noop(&mut self) -> Result<Option<u32>, ClientError> {
623        Ok(self.resolve(PostAppendNoOpTask::new()).await??)
624    }
625
626    pub async fn post_append_check(&mut self) -> Result<Option<u32>, ClientError> {
627        Ok(self.resolve(PostAppendCheckTask::new()).await??)
628    }
629
630    async fn _fetch_first(
631        &mut self,
632        id: NonZeroU32,
633        items: MacroOrMessageDataItemNames<'_>,
634        uid: bool,
635    ) -> Result<Vec1<MessageDataItem<'static>>, ClientError> {
636        let items = items.into_static();
637
638        Ok(self
639            .resolve(FetchFirstTask::new(id, items).with_uid(uid))
640            .await??)
641    }
642
643    pub async fn fetch_first(
644        &mut self,
645        id: NonZeroU32,
646        items: MacroOrMessageDataItemNames<'_>,
647    ) -> Result<Vec1<MessageDataItem<'static>>, ClientError> {
648        self._fetch_first(id, items, false).await
649    }
650
651    pub async fn uid_fetch_first(
652        &mut self,
653        id: NonZeroU32,
654        items: MacroOrMessageDataItemNames<'_>,
655    ) -> Result<Vec1<MessageDataItem<'static>>, ClientError> {
656        self._fetch_first(id, items, true).await
657    }
658
659    async fn _copy(
660        &mut self,
661        sequence_set: SequenceSet,
662        mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError>,
663        uid: bool,
664    ) -> Result<(), ClientError> {
665        let mbox = mailbox.try_into()?.into_static();
666
667        Ok(self
668            .resolve(CopyTask::new(sequence_set, mbox).with_uid(uid))
669            .await??)
670    }
671
672    pub async fn copy(
673        &mut self,
674        sequence_set: SequenceSet,
675        mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError>,
676    ) -> Result<(), ClientError> {
677        self._copy(sequence_set, mailbox, false).await
678    }
679
680    pub async fn uid_copy(
681        &mut self,
682        sequence_set: SequenceSet,
683        mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError>,
684    ) -> Result<(), ClientError> {
685        self._copy(sequence_set, mailbox, true).await
686    }
687
688    async fn _move(
689        &mut self,
690        sequence_set: SequenceSet,
691        mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError>,
692        uid: bool,
693    ) -> Result<(), ClientError> {
694        let mbox = mailbox.try_into()?.into_static();
695
696        Ok(self
697            .resolve(MoveTask::new(sequence_set, mbox).with_uid(uid))
698            .await??)
699    }
700
701    pub async fn r#move(
702        &mut self,
703        sequence_set: SequenceSet,
704        mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError>,
705    ) -> Result<(), ClientError> {
706        self._move(sequence_set, mailbox, false).await
707    }
708
709    pub async fn uid_move(
710        &mut self,
711        sequence_set: SequenceSet,
712        mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError>,
713    ) -> Result<(), ClientError> {
714        self._move(sequence_set, mailbox, true).await
715    }
716
717    /// Executes the `CHECK` command.
718    pub async fn check(&mut self) -> Result<(), ClientError> {
719        Ok(self.resolve(CheckTask::new()).await??)
720    }
721
722    /// Executes the `NOOP` command.
723    pub async fn noop(&mut self) -> Result<(), ClientError> {
724        Ok(self.resolve(NoOpTask::new()).await??)
725    }
726}
727
728/// Client medium-level API.
729///
730/// This section defines the medium-level API of the client (based on
731/// the low-level one), by exposing helpers that update client state
732/// and use a small amount of logic (mostly conditional code depending
733/// on available server capabilities).
734impl Client {
735    /// Fetches server capabilities, then saves them.
736    pub async fn refresh_capabilities(&mut self) -> Result<(), ClientError> {
737        self.state.capabilities = self.resolve(CapabilityTask::new()).await??;
738
739        Ok(())
740    }
741
742    /// Identifies the user using the given username and password.
743    pub async fn login(
744        &mut self,
745        username: impl TryInto<AString<'_>, Error = ValidationError>,
746        password: impl TryInto<AString<'_>, Error = ValidationError>,
747    ) -> Result<(), ClientError> {
748        let username = username.try_into()?.into_static();
749        let password = password.try_into()?.into_static();
750        let login = self.resolve(LoginTask::new(username, Secret::new(password)));
751
752        match login.await?? {
753            Some(capabilities) => {
754                self.state.capabilities = capabilities;
755            }
756            None => {
757                self.refresh_capabilities().await?;
758            }
759        };
760
761        Ok(())
762    }
763
764    /// Authenticates the user using the given [`AuthenticateTask`].
765    ///
766    /// This function also refreshes capabilities (either from the
767    /// task output or from explicit request).
768    async fn authenticate(&mut self, task: AuthenticateTask) -> Result<(), ClientError> {
769        match self.resolve(task).await?? {
770            Some(capabilities) => {
771                self.state.capabilities = capabilities;
772            }
773            None => {
774                self.refresh_capabilities().await?;
775            }
776        };
777
778        Ok(())
779    }
780
781    /// Authenticates the user using the `PLAIN` mechanism.
782    pub async fn authenticate_plain(
783        &mut self,
784        login: impl AsRef<str>,
785        password: impl AsRef<str>,
786    ) -> Result<(), ClientError> {
787        self.authenticate(AuthenticateTask::plain(
788            login.as_ref(),
789            password.as_ref(),
790            self.state.ext_sasl_ir_supported(),
791        ))
792        .await
793    }
794
795    /// Authenticates the user using the `XOAUTH2` mechanism.
796    pub async fn authenticate_xoauth2(
797        &mut self,
798        login: impl AsRef<str>,
799        token: impl AsRef<str>,
800    ) -> Result<(), ClientError> {
801        self.authenticate(AuthenticateTask::xoauth2(
802            login.as_ref(),
803            token.as_ref(),
804            self.state.ext_sasl_ir_supported(),
805        ))
806        .await
807    }
808
809    /// Authenticates the user using the `OAUTHBEARER` mechanism.
810    pub async fn authenticate_oauthbearer(
811        &mut self,
812        user: impl AsRef<str>,
813        host: impl AsRef<str>,
814        port: u16,
815        token: impl AsRef<str>,
816    ) -> Result<(), ClientError> {
817        self.authenticate(AuthenticateTask::oauthbearer(
818            user.as_ref(),
819            host.as_ref(),
820            port,
821            token.as_ref(),
822            self.state.ext_sasl_ir_supported(),
823        ))
824        .await
825    }
826
827    /// Exchanges client/server ids.
828    ///
829    /// If the server does not support the `ID` extension, this
830    /// function has no effect.
831    pub async fn id(
832        &mut self,
833        params: Option<Vec<(IString<'static>, NString<'static>)>>,
834    ) -> Result<Option<Vec<(IString<'static>, NString<'static>)>>, ClientError> {
835        Ok(if self.state.ext_id_supported() {
836            self.resolve(IdTask::new(params)).await??
837        } else {
838            debug!("IMAP ID extension not supported, skipping");
839            None
840        })
841    }
842
843    pub async fn append(
844        &mut self,
845        mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError>,
846        flags: impl IntoIterator<Item = Flag<'_>>,
847        message: impl AsRef<[u8]>,
848    ) -> Result<Option<u32>, ClientError> {
849        let mbox = mailbox.try_into()?.into_static();
850
851        let flags: Vec<_> = flags.into_iter().map(IntoStatic::into_static).collect();
852
853        let msg = to_static_literal(message, self.state.ext_binary_supported())?;
854
855        Ok(self
856            .resolve(AppendTask::new(mbox, msg).with_flags(flags))
857            .await??)
858    }
859
860    pub async fn appenduid(
861        &mut self,
862        mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError>,
863        flags: impl IntoIterator<Item = Flag<'_>>,
864        message: impl AsRef<[u8]>,
865    ) -> Result<Option<(NonZeroU32, NonZeroU32)>, ClientError> {
866        let mbox = mailbox.try_into()?.into_static();
867
868        let flags: Vec<_> = flags.into_iter().map(IntoStatic::into_static).collect();
869
870        let msg = to_static_literal(message, self.state.ext_binary_supported())?;
871
872        Ok(self
873            .resolve(AppendUidTask::new(mbox, msg).with_flags(flags))
874            .await??)
875    }
876}
877
878/// Client high-level API.
879///
880/// This section defines the high-level API of the client (based on
881/// the low and medium ones), by exposing opinionated helpers. They
882/// contain more logic, and make use of fallbacks depending on
883/// available server capabilities.
884impl Client {
885    async fn _fetch(
886        &mut self,
887        sequence_set: SequenceSet,
888        items: MacroOrMessageDataItemNames<'_>,
889        uid: bool,
890    ) -> Result<HashMap<NonZeroU32, Vec1<MessageDataItem<'static>>>, ClientError> {
891        let mut items = match items {
892            MacroOrMessageDataItemNames::Macro(m) => m.expand().into_static(),
893            MacroOrMessageDataItemNames::MessageDataItemNames(items) => items.into_static(),
894        };
895
896        if uid {
897            items.push(MessageDataItemName::Uid);
898        }
899
900        let seq_map = self
901            .resolve(FetchTask::new(sequence_set, items.into()).with_uid(uid))
902            .await??;
903
904        if uid {
905            let mut uid_map = HashMap::new();
906
907            for (seq, items) in seq_map {
908                let uid = items.as_ref().iter().find_map(|item| {
909                    if let MessageDataItem::Uid(uid) = item {
910                        Some(*uid)
911                    } else {
912                        None
913                    }
914                });
915
916                match uid {
917                    Some(uid) => {
918                        uid_map.insert(uid, items);
919                    }
920                    None => {
921                        debug!(?seq, "cannot get message uid, skipping it");
922                    }
923                }
924            }
925
926            Ok(uid_map)
927        } else {
928            Ok(seq_map)
929        }
930    }
931
932    pub async fn fetch(
933        &mut self,
934        sequence_set: SequenceSet,
935        items: MacroOrMessageDataItemNames<'_>,
936    ) -> Result<HashMap<NonZeroU32, Vec1<MessageDataItem<'static>>>, ClientError> {
937        self._fetch(sequence_set, items, false).await
938    }
939
940    pub async fn uid_fetch(
941        &mut self,
942        sequence_set: SequenceSet,
943        items: MacroOrMessageDataItemNames<'_>,
944    ) -> Result<HashMap<NonZeroU32, Vec1<MessageDataItem<'static>>>, ClientError> {
945        self._fetch(sequence_set, items, true).await
946    }
947
948    async fn _sort_or_fallback(
949        &mut self,
950        sort_criteria: impl IntoIterator<Item = SortCriterion> + Clone,
951        search_criteria: impl IntoIterator<Item = SearchKey<'_>>,
952        fetch_items: MacroOrMessageDataItemNames<'_>,
953        uid: bool,
954    ) -> Result<Vec<Vec1<MessageDataItem<'static>>>, ClientError> {
955        let mut fetch_items = match fetch_items {
956            MacroOrMessageDataItemNames::Macro(m) => m.expand().into_static(),
957            MacroOrMessageDataItemNames::MessageDataItemNames(items) => items,
958        };
959
960        if uid && !fetch_items.contains(&MessageDataItemName::Uid) {
961            fetch_items.push(MessageDataItemName::Uid);
962        }
963
964        let mut fetches = HashMap::new();
965
966        if self.state.ext_sort_supported() {
967            let fetch_items = MacroOrMessageDataItemNames::MessageDataItemNames(fetch_items);
968            let ids = self._sort(sort_criteria, search_criteria, uid).await?;
969            let ids_chunks = ids.chunks(MAX_SEQUENCE_SIZE as usize);
970            let ids_chunks_len = ids_chunks.len();
971
972            for (n, ids) in ids_chunks.enumerate() {
973                debug!(?ids, "fetching sort envelopes {}/{ids_chunks_len}", n + 1);
974                let ids = SequenceSet::try_from(ids.to_vec())?;
975                let items = fetch_items.clone();
976                fetches.extend(self._fetch(ids, items, uid).await?);
977            }
978
979            let items = ids.into_iter().flat_map(|id| fetches.remove(&id)).collect();
980
981            Ok(items)
982        } else {
983            debug!("IMAP SORT extension not supported, using fallback");
984
985            let ids = self._search(search_criteria, uid).await?;
986            let ids_chunks = ids.chunks(MAX_SEQUENCE_SIZE as usize);
987            let ids_chunks_len = ids_chunks.len();
988
989            sort_criteria
990                .clone()
991                .into_iter()
992                .filter_map(|criterion| match criterion.key {
993                    SortKey::Arrival => Some(MessageDataItemName::InternalDate),
994                    SortKey::Cc => Some(MessageDataItemName::Envelope),
995                    SortKey::Date => Some(MessageDataItemName::Envelope),
996                    SortKey::From => Some(MessageDataItemName::Envelope),
997                    SortKey::Size => Some(MessageDataItemName::Rfc822Size),
998                    SortKey::Subject => Some(MessageDataItemName::Envelope),
999                    SortKey::To => Some(MessageDataItemName::Envelope),
1000                    SortKey::DisplayFrom => None,
1001                    SortKey::DisplayTo => None,
1002                })
1003                .for_each(|item| {
1004                    if !fetch_items.contains(&item) {
1005                        fetch_items.push(item)
1006                    }
1007                });
1008
1009            for (n, ids) in ids_chunks.enumerate() {
1010                debug!(?ids, "fetching search envelopes {}/{ids_chunks_len}", n + 1);
1011                let ids = SequenceSet::try_from(ids.to_vec())?;
1012                let items = fetch_items.clone();
1013                fetches.extend(self._fetch(ids, items.into(), uid).await?);
1014            }
1015
1016            let mut fetches: Vec<_> = fetches.into_values().collect();
1017
1018            fetches.sort_by(|a, b| {
1019                for criterion in sort_criteria.clone().into_iter() {
1020                    let mut cmp = cmp_fetch_items(&criterion.key, a, b);
1021
1022                    if criterion.reverse {
1023                        cmp = cmp.reverse();
1024                    }
1025
1026                    if cmp.is_ne() {
1027                        return cmp;
1028                    }
1029                }
1030
1031                cmp_fetch_items(&SortKey::Date, a, b)
1032            });
1033
1034            Ok(fetches)
1035        }
1036    }
1037
1038    pub async fn sort_or_fallback(
1039        &mut self,
1040        sort_criteria: impl IntoIterator<Item = SortCriterion> + Clone,
1041        search_criteria: impl IntoIterator<Item = SearchKey<'_>>,
1042        fetch_items: MacroOrMessageDataItemNames<'_>,
1043    ) -> Result<Vec<Vec1<MessageDataItem<'static>>>, ClientError> {
1044        self._sort_or_fallback(sort_criteria, search_criteria, fetch_items, false)
1045            .await
1046    }
1047
1048    pub async fn uid_sort_or_fallback(
1049        &mut self,
1050        sort_criteria: impl IntoIterator<Item = SortCriterion> + Clone,
1051        search_criteria: impl IntoIterator<Item = SearchKey<'_>>,
1052        fetch_items: MacroOrMessageDataItemNames<'_>,
1053    ) -> Result<Vec<Vec1<MessageDataItem<'static>>>, ClientError> {
1054        self._sort_or_fallback(sort_criteria, search_criteria, fetch_items, true)
1055            .await
1056    }
1057
1058    pub async fn appenduid_or_fallback(
1059        &mut self,
1060        mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError> + Clone,
1061        flags: impl IntoIterator<Item = Flag<'_>>,
1062        message: impl AsRef<[u8]>,
1063    ) -> Result<Option<NonZeroU32>, ClientError> {
1064        if self.state.ext_uidplus_supported() {
1065            Ok(self
1066                .appenduid(mailbox, flags, message)
1067                .await?
1068                .map(|(uid, _)| uid))
1069        } else {
1070            debug!("IMAP UIDPLUS extension not supported, using fallback");
1071
1072            // If the mailbox is currently selected, the normal new
1073            // message actions SHOULD occur.  Specifically, the server
1074            // SHOULD notify the client immediately via an untagged
1075            // EXISTS response.  If the server does not do so, the
1076            // client MAY issue a NOOP command (or failing that, a
1077            // CHECK command) after one or more APPEND commands.
1078            //
1079            // <https://datatracker.ietf.org/doc/html/rfc3501#section-6.3.11>
1080            self.select(mailbox.clone()).await?;
1081
1082            let seq = match self.append(mailbox, flags, message).await? {
1083                Some(seq) => seq,
1084                None => match self.post_append_noop().await? {
1085                    Some(seq) => seq,
1086                    None => self
1087                        .post_append_check()
1088                        .await?
1089                        .ok_or(ClientError::ResolveTask(TaskError::MissingData(
1090                            "APPENDUID: seq".into(),
1091                        )))?,
1092                },
1093            };
1094
1095            let uid = self
1096                .search(Vec1::from(SearchKey::SequenceSet(seq.try_into().unwrap())))
1097                .await?
1098                .into_iter()
1099                .next();
1100
1101            Ok(uid)
1102        }
1103    }
1104
1105    async fn _move_or_fallback(
1106        &mut self,
1107        sequence_set: SequenceSet,
1108        mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError>,
1109        uid: bool,
1110    ) -> Result<(), ClientError> {
1111        if self.state.ext_move_supported() {
1112            self._move(sequence_set, mailbox, uid).await
1113        } else {
1114            debug!("IMAP MOVE extension not supported, using fallback");
1115            self._copy(sequence_set.clone(), mailbox, uid).await?;
1116            self._silent_store(sequence_set, StoreType::Add, Some(Flag::Deleted), uid)
1117                .await?;
1118            self.expunge().await?;
1119            Ok(())
1120        }
1121    }
1122
1123    pub async fn move_or_fallback(
1124        &mut self,
1125        sequence_set: SequenceSet,
1126        mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError>,
1127    ) -> Result<(), ClientError> {
1128        self._move_or_fallback(sequence_set, mailbox, false).await
1129    }
1130
1131    pub async fn uid_move_or_fallback(
1132        &mut self,
1133        sequence_set: SequenceSet,
1134        mailbox: impl TryInto<Mailbox<'_>, Error = ValidationError>,
1135    ) -> Result<(), ClientError> {
1136        self._move_or_fallback(sequence_set, mailbox, true).await
1137    }
1138
1139    pub fn enqueue_idle(&mut self) -> Tag<'static> {
1140        let tag = self.state.resolver.scheduler.tag_generator.generate();
1141
1142        self.state
1143            .resolver
1144            .scheduler
1145            .client_next
1146            .enqueue_command(Command {
1147                tag: tag.clone(),
1148                body: CommandBody::Idle,
1149            });
1150
1151        tag.into_static()
1152    }
1153
1154    #[tracing::instrument(name = "idle", skip_all)]
1155    pub async fn idle(&mut self, tag: Tag<'static>) -> Result<(), stream::Error<NextError>> {
1156        debug!("starting the main loop");
1157
1158        loop {
1159            let progress = self
1160                .stream
1161                .next(&mut self.state.resolver.scheduler.client_next);
1162            match timeout(self.state.idle_timeout, progress).await.ok() {
1163                None => {
1164                    debug!("timed out, sending done command…");
1165                    self.state.resolver.scheduler.client_next.set_idle_done();
1166                }
1167                Some(Err(err)) => {
1168                    break Err(err);
1169                }
1170                Some(Ok(Event::IdleCommandSent { .. })) => {
1171                    debug!("command sent");
1172                }
1173                Some(Ok(Event::IdleAccepted { .. })) => {
1174                    debug!("command accepted, entering idle mode");
1175                }
1176                Some(Ok(Event::IdleRejected { status, .. })) => {
1177                    debug!("command rejected, aborting: {status:?}");
1178                    break Ok(());
1179                }
1180                Some(Ok(Event::IdleDoneSent { .. })) => {
1181                    debug!("done command sent");
1182                }
1183                Some(Ok(Event::DataReceived { data })) => {
1184                    debug!("received data, sending done command…");
1185                    trace!("{data:#?}");
1186                    self.state.resolver.scheduler.client_next.set_idle_done();
1187                }
1188                Some(Ok(Event::StatusReceived {
1189                    status:
1190                        Status::Tagged(Tagged {
1191                            tag: ref got_tag, ..
1192                        }),
1193                })) if *got_tag == tag => {
1194                    debug!("received tagged response, exiting");
1195                    break Ok(());
1196                }
1197                Some(event) => {
1198                    debug!("received unknown event, ignoring: {event:?}");
1199                }
1200            }
1201        }
1202    }
1203
1204    #[tracing::instrument(name = "idle/done", skip_all)]
1205    pub async fn idle_done(&mut self, tag: Tag<'static>) -> Result<(), stream::Error<NextError>> {
1206        self.state.resolver.scheduler.client_next.set_idle_done();
1207
1208        loop {
1209            let progress = self
1210                .stream
1211                .next(&mut self.state.resolver.scheduler.client_next)
1212                .await?;
1213
1214            match progress {
1215                Event::IdleDoneSent { .. } => {
1216                    debug!("done command sent");
1217                }
1218                Event::StatusReceived {
1219                    status:
1220                        Status::Tagged(Tagged {
1221                            tag: ref got_tag, ..
1222                        }),
1223                } if *got_tag == tag => {
1224                    debug!("received tagged response, exiting");
1225                    break Ok(());
1226                }
1227                event => {
1228                    debug!("received unknown event, ignoring: {event:?}");
1229                }
1230            }
1231        }
1232    }
1233}
1234
1235pub(crate) fn cmp_fetch_items(
1236    criterion: &SortKey,
1237    a: &Vec1<MessageDataItem>,
1238    b: &Vec1<MessageDataItem>,
1239) -> Ordering {
1240    use MessageDataItem::*;
1241
1242    match &criterion {
1243        SortKey::Arrival => {
1244            let a = a.as_ref().iter().find_map(|a| {
1245                if let InternalDate(dt) = a {
1246                    Some(dt.as_ref())
1247                } else {
1248                    None
1249                }
1250            });
1251
1252            let b = b.as_ref().iter().find_map(|b| {
1253                if let InternalDate(dt) = b {
1254                    Some(dt.as_ref())
1255                } else {
1256                    None
1257                }
1258            });
1259
1260            a.cmp(&b)
1261        }
1262        SortKey::Date => {
1263            let a = a.as_ref().iter().find_map(|a| {
1264                if let Envelope(envelope) = a {
1265                    envelope.date.0.as_ref().map(AsRef::as_ref)
1266                } else {
1267                    None
1268                }
1269            });
1270
1271            let b = b.as_ref().iter().find_map(|b| {
1272                if let Envelope(envelope) = b {
1273                    envelope.date.0.as_ref().map(AsRef::as_ref)
1274                } else {
1275                    None
1276                }
1277            });
1278
1279            a.cmp(&b)
1280        }
1281        SortKey::Size => {
1282            let a = a.as_ref().iter().find_map(|a| {
1283                if let Rfc822Size(size) = a {
1284                    Some(size)
1285                } else {
1286                    None
1287                }
1288            });
1289
1290            let b = b.as_ref().iter().find_map(|b| {
1291                if let Rfc822Size(size) = b {
1292                    Some(size)
1293                } else {
1294                    None
1295                }
1296            });
1297
1298            a.cmp(&b)
1299        }
1300        SortKey::Subject => {
1301            let a = a.as_ref().iter().find_map(|a| {
1302                if let Envelope(envelope) = a {
1303                    envelope.subject.0.as_ref().map(AsRef::as_ref)
1304                } else {
1305                    None
1306                }
1307            });
1308
1309            let b = b.as_ref().iter().find_map(|b| {
1310                if let Envelope(envelope) = b {
1311                    envelope.subject.0.as_ref().map(AsRef::as_ref)
1312                } else {
1313                    None
1314                }
1315            });
1316
1317            a.cmp(&b)
1318        }
1319        // FIXME: Address missing Ord derive in imap-types
1320        SortKey::Cc | SortKey::From | SortKey::To | SortKey::DisplayFrom | SortKey::DisplayTo => {
1321            Ordering::Equal
1322        }
1323    }
1324}
1325
1326pub(crate) fn to_static_literal(
1327    message: impl AsRef<[u8]>,
1328    ext_binary_supported: bool,
1329) -> Result<LiteralOrLiteral8<'static>, ValidationError> {
1330    let message = if ext_binary_supported {
1331        LiteralOrLiteral8::Literal8(Literal8 {
1332            data: message.as_ref().into(),
1333            mode: LiteralMode::Sync,
1334        })
1335    } else {
1336        debug!("IMAP BINARY extension not supported, using fallback");
1337        Literal::validate(message.as_ref())?;
1338        LiteralOrLiteral8::Literal(Literal::unvalidated(message.as_ref()))
1339    };
1340
1341    Ok(message.into_static())
1342}